ThreadPoolExecuter线程池详解

大标 2022年3月9日18:45:02
评论
37
摘要

一、源码分析步骤   1.掌握线程池工作状态   2.任务在线程池工作流程   3.线程池任务等待队列管理策略   4.线程池任务拒绝策略   5.线程池关闭策略   6.线程池初始化策略   7.线程池中线程数量扩容方案二、线程池工作状态   1.工作状态分类:     1)Running:在线程池被创建时就处于【Running】

一、源码分析步骤
  1.掌握线程池工作状态
  2.任务在线程池工作流程
  3.线程池任务等待队列管理策略
  4.线程池任务拒绝策略
  5.线程池关闭策略
  6.线程池初始化策略
  7.线程池中线程数量扩容方案
二、线程池工作状态
  1.工作状态分类:
    1)Running:在线程池被创建时就处于【Running】状态,在这种状态下线程池可以接收新的任务,也可以任务队列已经存在的任务来进行处理
    2)Shutdown:线程池不会再接收新的任务,此时只会对任务队列中处于等待的任务进行处理
    3)Stop:线程池被外部的一段程序终止,此时线程池既不能接收新的任务,也不能对队列等待的任务进行处理
    4)Tidying:线程池处于空闲状态
    5)Terminated:线程池工作结束
  2.工作状态转换规律:
    1)Running -> Shutdown:显示调用shutdown(); 隐式调用finalize(),这个方法内部回调用shutdown()
    2)Running or Shutdown -> Stop:显示调用shutdownNow()
    3)Shutdown -> Tidying:任务等待队列中任务都执行完毕时,此时线程池活动的线程达到了0,线程池工作状态自动转变为Tidying
    4)Stop -> Tidying:任务等待队列中任务都执行完毕时,此时线程池活动的线程达到了0,线程池工作状态自动转变为Tidying
    5)Tidying -> Terminated:显示调用terminated()
三、通过线程池中属性了解线程池工作细节
  1.private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    *** ctl存储线程池【状态】和【线程数量】
  2.private final BlockingQueue<Runnable> workQueue;
    *** workQueue是【任务阻塞队列】,在线程池接收新任务后,如果发现【核心线程】都在工作,此时线程池就会将新任务添加到【任务阻塞队列】
  3.private final ReentrantLock mainLock = new ReentrantLock();
    *** 线程池中来处理并发时使用的锁ReentrantLock
  4.private int largestPoolSize;
    *** 记录线程池在工作过程中出现过最大线程数。这个属性与线程池可以管理最大线程数量,和线程数量没有任何关系
  5.private long completedTaskCount;
    *** 线程池已经完成的任务数量
  6.private volatile ThreadFactory threadFactory;
    *** 线程池线程工厂,负责生成线程
  7.private volatile RejectedExecutionHandler handler;
    *** 线程池拒绝策略
      1) 无法处理任务,抛出异常
      2) 放弃一个任务,让空闲线程来处理当前任务
      3) 放弃当前任务
  8.private volatile long keepAliveTime;
    *** 存放线程的最大空闲时间。如果一个线程空闲时间达到了最大空闲值,此时线程池将负责销毁掉当前线程
  9.private volatile int corePoolSize;
    *** 线程池核心线程数量
  10.private volatile int maximumPoolSize;
    *** 线程池可以存放最大线程数量
四、ThreadPoolExecuter源码分析----execute
  1.作用:用于将任务调给ThreadPoolExecuter进行处理
  2.流程:
    1).判断当前核心线程数量<规定核心线程数量
      新增一个核心线程处理当前任务
    2).判断当前核心线程数量==规定核心线程数量
      2.1 如果线程池工作状态==Running(可以接收新任务)
      2.1.1 将任务成功添加到【任务队列】
      2.1.1.1 第二次检测线程池工作状态 == Runnding
        创建一个线程处理当前任务
      2.1.1.2 第二次检测线程池工作状态 != Runnding
        将任务从任务队列中删除,调用拒绝策略
      2.1.2 将任务添加到【任务队列】失败
        尝试创建一个新线程去处理这个任务,如果创建失败就调用拒绝策略
      2.2 如果线程池工作状态!=Running(不可以接收新任务)
五、ThreadPoolExecuter源码分析----addWorker
  1.作用:新增一个线程处理任务
  2.参数:private boolean addWorker(Runnable firstTask, boolean core)
     @Param Runnable firstTask ----> 当前处理任务
     @Param boolean core ----> ture:判断线程数量<核心线程数量;false:判断线程数量<线程池最大线程数量
  3.业务:addWorker方法代码分为两块
    1) 双重循环:通过CAS试图增加线程池数量
    2) 保证线程并发的安全情况下,实现将任务交给线程执行
  4.双重循环:通过CAS试图增加线程池数量
      runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())
      当前状态>=SHUTDOWN 并且 (当前状态>=STOP 或 firstTask != null 或 workQueue为空)
      1) 线程池工作状态是STOP,TIDYING TERMATED 返回false
      2) 线程池工作状态是SHUTDOWN 并且任务是null 返回false
      3) 线程池工作状态是SHUTDOWN 并且任务是空 返回false
六、ThreadPoolExecuter源码分析----shutDown
  1、作用:将线程池工作状态转变为【Shutdown】状态,此时线程池不再接收新任务。将委派线程池已有线程对工作队列中任务进行处理
  2、步骤:
    1) 加锁
    2) 检测线程(worker)权限:
      当前线程(worker)是否具有关闭线程池权限
      当前线程(worker)是否有能力中断其它线程权限
      如果没有上述权限,此时可能抛出NullPointerException或者安全异常
    3) 通过原子性操作修改线程池工作状态--> shutDown
    4) 将空闲线程进行终端(interrupt)
    5) 如果在线程池已经转变为【Shutdown】状态时,尝试将线程池状态直接转变为【Terminated】
七、ThreadPoolExecuter源码分析----shutdownNow
  1、方法格式:public List<Runnable> shutdownNow()
  2、方法作用:shutdownNow方法执行时
    1) 将线程池工作状态转变为【stop】状态
    2) 此时线程池不再接收新的任务
    3) 此时线程池要将所有正在执行的任务进行结束
    4) 此时线程池要将所有剩余任务从任务队列中清除
  3、返回值含义:
    List<Runnable>:返回从任务列表中清除的任务
  4、执行步骤:
    1) 加锁
    2) 检查当前线程(worker)权限:
      检测当前线程(worker)是否具备修改线程池工作状态权限
      检测当前线程(worker)是否具备中断其她线程权限
      如果没有上述权限,此时可能抛出NullPointerException或者安全异常
    3) 修改线程池工作状态为【stop】
    4) 将所有线程进行中断
    5) 将任务队列中的任务进行清除
    6) 尝试将线程池工作状态直接转变为【Terminated】
    7) 解锁
    8) 将任务队列中的任务返回
八、等待队列管理策略
  1、队列:线程池中任务管理模式,实际上就是一个数组存储需要被处理的任务
  2、策略:
    1) 不放入任务队列,直接交给核心线程执行 (运行线程数量 < 核心线程数量)
    2) 放入到任务队列: (运行线程数量 >= 核心线程数量)
      情况1: 核心线程数量 == 最大线程数量,队列没有设置存放任务最大上限
      情况2: 核心线程数量 == 最大线程数量,队列存放任务数量小于队列允许存放最大任务数量
    3) 不放入任务队列,创建一个新的线程去处理任务:(运行线程数量 >= 核心线程数量) (运行线程数量 < 最大线程数量)
    4) 不放入任务队列,并调用拒绝策略阻止任务处理:(运行线程数量 >= 核心线程数量)
      情况1:任务队列存放任务数量 == 任务队列允许存放最大任务数量,此时运行线程数量 >= 线程池允许出现最大线程数量
      情况2:任务队列存放任务数量 < 任务队列允许存放最大任务数量,此时运行线程数量 >= 线程池允许出现最大线程数量
      情况3:在线程池工作状态为【shutDown】或者调用shutDown方法或者调用shutdownNow方法
九、线程池阻塞队列分类和选择
  1、线程池中阻塞队列:
    BolckingQueue workQueue
  2、线程池中阻塞队列分类:
    1) 无界阻塞队列
    2) 有界阻塞队列
    3) 同步移交阻塞队列
  3、无界阻塞队列:
    不会限制存储任务数量的上限,线程池中最常用的无界阻塞队列----LinkedBlockingQueue
    隐患:如果当前任务执行时间相对较长,导致大量新任务堆积到队列中,有可能造成服务端OOM问题
    不适用场景:在互联网通信过程中,如果QPS(单位时间里的访问量)较高并且发送数据量较大,此时如果使用无界阻塞队列来存储任务,导致服务端出现熔断现象
    Executore.newFixThreaPool默认使用的阻塞队列----LinkedBlockingQueue
  4、有界阻塞队列:
    设置队列中所存储的任务上限,如果达到了上限则需要通过线程池拒绝策略/饱和策略来拒绝接收这个任务
    基于FIFO原则来实现有界阻塞队列----ArrayBlockingQueue
    基于任务优先级原则实现有界阻塞队列----PriorityBlockingQueue
  5、同步移交阻塞队列:
    并不是一种真实的阻塞队列
    线程池中最常用同步移交阻塞队列----SynchronousQueue
    同步移交阻塞队列接收到一个新任务要去线程池,此时必须存在一个线程来处理这个任务。
    在线程池发现运行线程数量 < 线程池核心线程数量,直接将任务交给【SynchronousQueue】或者运行线程数量 <线程池最大线程数量,线程池会创建一个新线程,然后将任务交给【SynchronousQueue】,由同步移交阻塞队列将新任务交给新线程处理
十、线程池饱和策略
  1、介绍
    饱和策略也叫拒绝策略
    在新任务数量大于线程池中有界阻塞队列上限时,此时可以通过饱和策略来对多出的任务进行处理,这些方案统称饱和策略
    在线程池中饱和策略数据类型RejectHandler
  2、饱和策略分类:
    1) AbortPolicy方案
    2) DiscardPolicy方案
    3) DiscardOldestPolicy方案
    4) CallerRunsPolicy方案
  3、AbortPolicy方案:
    1) 是数据库连接池默认的饱和方案
    2) 对于不能存放到阻塞队列中的新任务会直接删除并抛出一个RejectedExecutionException异常,这个异常可以通过catch语句捕捉到
    3) 对应代码
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
        " rejected from " +
        e.toString());
      }
  4、DiscardPolicy方案:
    1) 与AbortPolicy方案相似,也是不处理不能存入到阻塞队列中的新任务,只不过不会抛出异常
    2) 对应代码
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      }
  5、DiscardOldestPolicy方案:
    1) 在阻塞队列数量已经达到了任务上限后,为了让新任务进入到阻塞队列中,DiscardOldestPolicy方案将阻塞队列中最早的任务/最老的任务从阻塞队列中删除,将新的任务添加到阻塞队列中
    2) 对应代码
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
          e.getQueue().poll();//弹出最早的任务/最老的任务
          e.execute(r);//添加新任务
        }
      }
  6、CallerRunsPolicy方案:
    1) 目的还是要处理新任务
    2) 不会使用线程池中线程来处理当前任务
    3) 将这个任务返回给提交任务的线程去执行
    4) 对应代码
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
          r.run();
        }
      }
十一、线程池关闭原则----线程中断方案
  1、不应该手动中断任何一个线程执行
  2、如果线程正在持有锁,此时中断线程执行,导致锁不会释放
  3、如果线程正在通过I/O流操作数据库或者是文档,此时中断线程执行,导致数据不一致
  4、所以JDK中将用于结束线程的方法stop方法设置为过时方法
  5、线程结束应该由线程自身来决定
十二、线程池关闭原则----interrupt方法
  1、interrupt方法本质上并不是结束一个线程,用于设置线程中状态
  2、如果此时线程正处于阻塞队列(wait,I/O等待),此时interrupt方法执行,会结束线程阻塞状态并抛出InterruptException异常,这个异常在catch中进行捕捉,进行处理后,线程自动结束
  3、如果此时线程正处于运行状态,此时interrupt方法执行并不会阻止线程继续运行。应该在线程执行中适当位置调用isInterrupt方法,来让线程了解是否被设置中断状态,然后根据实际情况由线程决定市场还要继续执行
  4、如果先执行interrupt方法。在将线程放入阻塞队列中,此时依然会抛出InterruptException异常
十三、线程池关闭原则----依赖条件
  1、如果线程池没有继续被对象引用,导致线程池尝试关闭自己
  2、如果线程池没有任何线程时,导致线程池关闭自己
十四、线程池关闭原则----shutdown与 shutdownNow
  1、shutdown方法:禁止线程池接收新任务
    要求线程池将阻塞队列中的任务依次执行
    在任务执行完毕后,将线程依次销毁
    在所有线程都销毁完毕后,线程池关闭
  2、shutdownNow方法:禁止线程池接收新任务
    要求线程池将阻塞队列任务进行依次删除
    在任务删除完毕后,将线程依次销毁
    在所有线程都销毁完毕后,线程池关闭
    ***shutdown与shutdownNow方法执行后,线程池并不会立刻关闭
    所以采用异步通知方案,如果需要在这两个方法执行后要求线程池立刻关闭,此时需要采用同步方案,因此需要调用terminated
十五、线程池关闭原则----shutdown与 shutdownNow执行特征
  1、需要了解问题:
    1) 在shutdown与shutdownNow方法执行后,线程池是否还会接收新任务
    2) 在shutdown与shutdownNow方法执行后,阻塞队列中的任务是否继续执行
    3) 在shutdown与shutdownNow方法执行后,线程池中正在执行的任务会不会被中断
  2、线程池关闭原则:
    1) 在线程池关闭时,是不会再接收新任务,此时对于传递过来新任务交给饱和原则(AbortPolicy[默认],DiscardPolicy,DiscardOldestPolicy,CallerRunsPolicy)进行处理
    2) 如果由shutdown方法通知线程池关闭,此时线程池会将阻塞队列中所有任务执行完毕后才会关闭;如果由shutdownNow方法通知线程池关闭,此时线程池不会执行阻塞队列中的任务,并且将阻塞队列中的任务删除
    3) 在线程池接收到关闭命令时,正在执行的线程是不会被中断,可以继续执行完毕
十六、线程池关闭原则----FixedThreadPool与CachedThreadPool没有关闭线程池特征
  1、FixedThreadPool:
    没有设置线程超时策略,因此在线程池任务结束后,线程池中线程依然存在,如果不去手动关闭线程池,在多次使用之后会造成OOM【内存泄露】问题
  2、CachedThreadPool:
    默认线程超时策略,默认时间60s,因此CachedThreadPool在执行完成后即使没有关闭线程池,也不会导致OOM问题
十七、线程池启动原则
  1、在线程池对象创建完成后,不会自动运行
  2、在线程池创建时,想阻塞队列添加任务,也不会导致线程池自动运行
  3、只有通过submit或者execute方法向线程池提交任务,才会促使线程池运行
    1) 创建一个全新线程独立当前任务
      情况1:(运行线程数量 < 核心线程数量)新线程处理任务
      情况2:(线程池最大线程数量 >= 运行线程数量 >= 核心线程数量)
      同时(阻塞队列以满),新线程处理任务
    2) 将任务暂时存放到阻塞队列中等待线程空闲时处理
      情况1:(阻塞队列未满)(运行线程数量 == 核心线程数量 < 线程池最大线程数量)将任务添加到阻塞队列
      情况2:(阻塞队列未满)(运行线程数量 >= 核心线程数量 == 线程池最大线程数量) 将任务添加到阻塞队列
      情况3:(阻塞队列已满)(运行线程数量 == 线程池最大线程数量) 但是线程池使用的是【DiscardOldestPolicy饱和策略】,将阻塞队列最老任务删除,新任务添加到阻塞队列尾部
    3) 不处理当前任务:
      情况1:(阻塞队列已满)(运行线程数量 == 线程池最大线程数量)如果线程池采用饱和策略【AbortPolicy,DiscardPolicy】此时线程池将拒绝处理当前任务;如果采用【AbortPolicy饱和策略】,不仅不处理当前任务同时还会向上抛出RejectExecuteException异常,如果采用【DiscardPolicy饱和策略】,只会不处理当前任务
      情况2:无论阻塞队列是否已满,运行线程数量是否大于等于线程池最大线程数量,只要线程池工作状态为【shutdown】或者【stop】状态,都会导致线程池不再接收新任务
    4) 线程池不会调用自己线程来处理当前任务,而是将任务交给提交任务的线程去处理
      情况1:(阻塞队列已满)(运行线程数量 >= 线程池最大线程数量) 线程池采用【CallerRunsPolicy饱和策略】,导致线程池不会调用自身的线程来处理当前任务,将新任务返回给发起请求的线程,由发起请求的线程处理
十八、线程池扩容策略----集合扩容
  集合 数据结构 加载因子 扩容因子
  ArrayList 数组 0.5(元素个数超过ArrayList存放数据0.5倍) 0.5(扩容后是原来的1.5倍)
  Vector 数组 0.5(元素个数超过Vector存放数据0.5倍) 0.5(扩容后是原来的1.5倍)
  HashSet HashMap 0.75(元素个数超过HashSet存放数据个数) 1(扩容后是原来的2倍)
  HashMap 散列表 0.75(元素个数超过HashMap存放数据个数) 1(扩容后是原来的2倍)
  HashTable 散列表 0.75(元素个数超过HashTable存放数据个数) 1+1(扩容后是原来的2倍+1)
十九、线程池扩容策略----线程扩容
  1、线程池采用HashSet存储线程
  2、线程池在什么情况下会新增线程
    情况1:(运行线程数量 < 核心线程数量)此时线程池会添加一个新线程,并存到HashSet中来处理当前任务
    情况2:(阻塞队列已满)(运行线程数量 < 线程池最大线程数量)此时线程池会添加一个新线程,并存到HashSet中来处理当前任务
二十、四种常见线程池----FixedThreadPool
  1、FixedThreadPool由Executros提供的一个ThreadPoolExecutor的实例
  2、FixedThreadPool是一个定长的线程池,线程池中线程池数量是固定不能增加的,核心线程数量与最大线程数量相等
  3、FixedThreadPool没有设置线程空闲时间,因此线程工作完毕后是不会被销毁,因此FixedThreadPool在执行完毕后,必须通过手动调用shutdown方法关闭线程池,避免出现OOM【内存泄露】问题
  4、FixedThreadPool使用的是LinkedBlockingQueue作为阻塞队列,但是没有设置队列任务上限,因此FixedThreadPool可以无限存放任务,但是也带来了OOM【内存泄露】问题
二十一、四种常见线程池----CachedThreadPool
  1、CachedThreadPool由Executros提供的一个ThreadPoolExecutor的实例
  2、CachedThreadPool被称为缓存线程池,如果线程池有空闲线程则复用线程处理任务,如果没有空闲线程则新增线程来处理当前任务
  3、CachedThreadPool没有核心线程,可以接收最大线程数量Integer.MAX_VALUE
  4、CachedThreadPool默认设置空闲时间60s,因此理论上来说CachedThreadPool在工作完毕后,可以不用手动调用shutdown也会被关闭
  5、CachedThreadPool使用的是SynchronousQueue<Runnable>作为阻塞队列(同步移交阻塞队列),因此不会将任务存储到阻塞队列,意味着第一插入的任务没有执行完毕之前,下一个任务是不能执行的
二十二、四种常见线程池----SingleThreadPool
  1、SingleThreadPool由Executros提供的一个ThreadPoolExecutor的实例
  2、SingleThreadPool被称为单一线程池
  3、SingleThreadPool中核心线程数量固定是1,最大线程数量固定1,在运行期间至始至终只使用1个线程来处理所有任务
  4、SingleThreadPool没有设置线程空闲时间,因此在SingleThreadPool线程池工作完毕后,这个线程不会自动销毁,因此必须通过手动调用shutdown方法关闭线程池
  5、SingleThreadPool使用的是LinkedBlockingQueue作为阻塞队列,可以接收任意数量的任务
二十三、四种常见线程池----ScheduledThreadPool
*
*
*
二十四、SingleThreadPool与FixedThreadPool区别
  1、SingleThreadPool:
    1) 在线程池中只使用1个线程处理任务
    2) 能保证任务执行顺序,先提交先执行
    3) 当线程执行时抛出异常,SingleThreadPool会自动创建一个新线程来代替旧的线程
  2、FixedThreadPool:
    1) 指定线程池可以创建最大线程数量
    2) 如果线程池中线程数量达到最大线程数量,此时不会新增线程
    3) Executors.newFixedThreadPool(1),当这个唯一线程出现了异常,此时FixedThreadPool也会自动创建一个新线程来代替旧的线程

  • 我的微信
  • 微信扫一扫
  • weinxin
  • 我的微信公众号
  • 微信扫一扫
  • weinxin
大标
  • 本文由 发表于 2022年3月9日18:45:02
  • 转载请务必保留本文链接:https://www.tanhuibiao.com/script/qita/1311.html
匿名

发表评论

匿名网友 填写信息

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: