ThreadPoolExecutor 中的参数详解
1. ThreadPoolExecutor 数据成员
ctl 主要⽤于存储线程池的⼯作状态以及池中正在运⾏的线程数。显然要在⼀个整型变量存储两个数据,只能将其⼀分为⼆。其中⾼3bit ⽤于存储线程池的状态,低位的29bit ⽤于存储正在运⾏的线程数。
线程池具有以下五种状态,当创建⼀个线程池时初始化状态为RUNNING
RUNNING
允许提交并处理任务SHUTDOWN
不允许提交新的任务,但是会处理完已提交的任务STOP
不允许提交新的任务,也不会处理阻塞队列中未执⾏的任务,并设置正在执⾏的线程的中断标志位TIDYING
所有任务执⾏完毕,池中⼯作的线程数为0,等待执⾏terminated()勾⼦⽅法TERMINATED terminated()勾⼦⽅法执⾏完毕
注意,这⾥说的是线程池的状态⽽不是池中线程的状态。
调⽤线程池的shutdown ⽅法,将线程池由RUNNING (运⾏状态)转换为SHUTDOWN 状态。
调⽤线程池的shutdownNow ⽅法,将线程池由RUNNING 或SHUTDOWN 状态转换为STOP 状态。
SHUTDOWN 状态和STOP 状态先会转变为TIDYING 状态,最终都会变为TERMINATEDjava线程池创建的四种
ThreadPoolExecutor 同时提供上述三个⽅法⽤于池中的线程查看线程池的状态和计算正在运⾏的线程数。      上述数据成员对线程池的性能也有很⼤的影响,我会将它们放到构造中讲解。
completedTaskCount 表⽰线程池已完成的任务数。
allowCoreThreadTimeeOut 表⽰是否允许核⼼线程在空闲状态下⾃⾏销毁。
largestPoolSize 表⽰线程池从创建到现在,池中线程的最⼤数量
workers 是个HashSet 容器,它存储的是Worker 类的对象,Worker 是线程池的内部类,它继承了Runnable 接⼝,不严格的情况下,可以将⼀个Worker 对象看成Thread 对象,也就是⼯作的线程。shutdown 和shutdownNow ⽅法中会使⽤workers 完成对所有线程的遍历。1Private  final  AtomicInteger ctl =  new  AtomicInteger(ctlOf(RUNNING, 0 ));1
2
3Private  static  int  runStateOf( int  c)Private  static  int  workerCountOf( int  c)Private  static  int  ctlOf( int  rs, int  wc)1
2
3
4
5
6
7Private  int  largestPoolSize;Private  final  BlockingQueue<Runnable>workQueue;Private  volatile  long  keepAliveTime;private  volatile  int  corePoolSize;private  volatile  int  maximumPoolSize;private  volatile  ThreadFactory threadFactory;private  volatile  RejectedExecutionHandler handler;
1
2
3
4Privatefinal HashSet<Worker> workers=  new  HashSet<Worker>();Privatelong completedTaskCount;Private  volatile  boolean  allowCoreThreadTimeOut;private  int  largestPoolSize;
1private  final  HashSet<Worker> workers =  new  HashSet<Worker>();
mainLock 主要⽤于同步访问(或者说改变)线程池的状态以及线程池的各项参数,⽐如completedTaskCount 和workers 等。
在awaitTermination ⽅法中,(mianLock 的)termination 是⽤于延时的条件队列。
2. 构造函数
线程池的构造函数参数多达7个,现在我们⼀⼀来分析它们对线程池的影响。
corePoolSize :线程池中核⼼线程数的最⼤值
maximumPoolSize :线程池中能拥有最多线程数
workQueue :⽤于缓存任务的阻塞队列
我们现在通过向线程池添加新的任务来说明着三者之间的关系。
(1)如果没有空闲的线程执⾏该任务且当前运⾏的线程数少于corePoolSize ,则添加新的线程执⾏该任务。
(2)如果没有空闲的线程执⾏该任务且当前的线程数等于corePoolSize 同时阻塞队列未满,则将任务⼊队列,⽽不添加新的线程。
(3)如果没有空闲的线程执⾏该任务且阻塞队列已满同时池中的线程数⼩于maximumPoolSize ,则创建新的线程执⾏任务。
(4)如果没有空闲的线程执⾏该任务且阻塞队列已满同时池中的线程数等于maximumPoolSize ,则根据构造函数中的handler 指定的策略来拒绝新的任务。
注意,线程池并没有标记哪个线程是核⼼线程,哪个是⾮核⼼线程,线程池只关⼼核⼼线程的数量。
通俗解释,如果把线程池⽐作⼀个单位的话,corePoolSize 就表⽰正式⼯,线程就可以表⽰⼀个员⼯。当我们向单位委派⼀项⼯作时,如果单位发现正式⼯还没招满,单位就会招个正式⼯来完成这项⼯作。随着我们向这个单位委派的⼯作增多,即使正式⼯全部满了,⼯作还是⼲不完,那么单位只能按照我们新委派的⼯作按先后顺序将它们个地⽅搁置起来,这个地⽅就是workQueue ,等正式⼯完成了⼿上的⼯作,就到这⾥来取新的任务。如果不巧,年末了,各个部门都向这个单位委派任务,导致workQueue 已经没有空位置放新的任务,于是单位决定招点临时⼯吧(临时⼯:⼜是我!)。临时⼯也不是想招多少就多少,上级部门通过这个单位的maximumPoolSize 确定了你这个单位的⼈数的最⼤值,换句话说最多招maximumPoolSize–corePoolSize 个临时⼯。当然,在线程池中,谁是正式⼯,谁是临时⼯是没有区别,完全同⼯同酬。        keepAliveTime :表⽰空闲线程的存活时间。
TimeUnitunit :表⽰keepAliveTime 的单位。
为了解释keepAliveTime 的作⽤,我们在上述情况下做⼀种假设。假设线程池这个单位已经招了些临时⼯,但新任务没有继续增加,所以随着每个员⼯忙完⼿头的⼯作,都来workQueue 领取新的任务(看看这个单位的员⼯多⾃觉啊)。随着各个员⼯齐⼼协⼒,任务越来越少,员⼯数没变,那么就必定有闲着没事⼲的员⼯。这样的话领导不乐意啦,但是⼜不能轻易fire 没事⼲的员⼯,因为随时可能有新任务来,于是领导想了个办法,设定了keepAliveTime ,当空闲的员⼯在
keepAliveTime 这段时间还没有到事情⼲,就被辞退啦,毕竟地主家也没有余粮啊!当然辞退到corePoolSize 个员⼯时就不再辞退了,领导也不想当光杆司令啊!
handler :表⽰当workQueue 已满,且池中的线程数达到maximumPoolSize 时,线程池拒绝添加新任务时采取的策略。为了解释handler 的作⽤,我们在上述情况下做另⼀种假设。假设线程池这个单位招满临时⼯,但新任务依然继续增加,线程池从上到下,从⾥到外真⼼忙的不可开交,阻塞队列也满了,只好拒绝上级委派下来的任务。怎么拒绝是门艺术,handler ⼀般可以采取以下四种取值。1
2Privatefinal ReentrantLock mainLock = new  ReentrantLock();Privatefinal Condition termination = wCondition();1
2
3
4
5
6
7publicThreadPoolExecutor(intcorePoolSize,        int  maximumPoolSize,        long  keepAliveTime,        TimeUnit unit,        BlockingQueue<Runnable> workQueue,        ThreadFactory threadFactory,        RejectedExecutionHandler handler)
ThreadPoolExecutor.AbortPolicy()
抛出RejectedExecutionException 异常ThreadPoolExecutor.CallerRunsPolicy()
由向线程池提交任务的线程来执⾏该任务ThreadPoolExecutor.DiscardOldestPolicy()
抛弃最旧的任务(最先提交⽽没有得到执⾏的任务)ThreadPoolExecutor.DiscardPolicy()抛弃当前的任务
workQueue :它决定了缓存任务的排队策略。对于不同的应⽤场景我们可能会采取不同的排队策略,这就需要不同类型的阻塞队列,在线程池中常⽤的阻塞队列有以下2种:
(1)SynchronousQueue<Runnable>:此队列中不缓存任何⼀个任务。向线程池提交任务时,如果没有空闲线程来运⾏任务,则⼊列操作会阻塞。当有线程来获取任务时,出列操作会唤醒执⾏⼊列操作的线程。从这个特性来
看,SynchronousQueue 是⼀个⽆界队列,因此当使⽤SynchronousQueue 作为线程池的阻塞队列时,参数
maximumPoolSizes 没有任何作⽤。
(2)LinkedBlockingQueue<Runnable>:顾名思义是⽤链表实现的队列,可以是有界的,也可以是⽆界的,但在Executors 中默认使⽤⽆界的。
threadFactory :指定创建线程的⼯⼚
实际上ThreadPoolExecutor 类中还有很多重载的构造函数,下⾯这个构造函数在Executors 中经常⽤到。
注意到上述的构造⽅法使⽤Executors 中的defaultThreadFactory()线程⼯⼚和ThreadPoolExecutor 中的defaultHandler 抛弃策略。
使⽤defaultThreadFactory 创建的线程同属于相同的线程组,具有同为Thread.NORM_PRIORITY 的优先级,以及名为"pool-XXX-thread-"的线程名(XXX 为创建线程时顺序序号),且创建的线程都是⾮守护进程。
defaultHandler 缺省抛弃策是ThreadPoolExecutor.AbortPolicy()。
除了在创建线程池时指定上述参数的值外,还可在线程池创建以后通过如下⽅法进⾏设置。
3. 其它有关涉及池中线程数量的相关⽅法
默认情况下,当池中有空闲线程,且线程的数量⼤于corePoolSize 时,空闲时间超过keepAliveTime 的线程会⾃⾏销毁,池中仅仅会保留corePoolSize 个线程。如果线程池中调⽤了allowCoreThreadTimeOut 这个⽅法,则空闲时间超过
keepAliveTime 的线程全部都会⾃⾏销毁,⽽不必理会corePoolSize 这个参数。
如果池中的线程数量⼩于corePoolSize 时,调⽤prestartAllCoreThreads ⽅法,则⽆论是否有待执⾏的任务,线程池都会创建新的线程,直到池中线程数量达到corePoolSize 。1
2
3
4
5
6
7
8public  ThreadPoolExecutor( int  corePoolSize,        int  maximumPoolSize,        long  keepAliveTime,        TimeUnit unit,        BlockingQueue<Runnable> workQueue) {    this  (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,    Executors.defaultThreadFactory(), defaultHandler);}
1
2
3
4
5
6Public  void  allowCoreThreadTimeOut( boolean  value)Public  void  setKeepAliveTime( long  time,T
imeUnit unit)Public  void  setMaximumPoolSize( int  maximumPoolSize)Public  void  setCorePoolSize( int  corePoolSize)Public  void  setThreadFactory(ThreadFactory threadFactory)Public  void  setRejectedExecutionHandler(RejectedExecutionHandler handler)
1
2public  void  allowCoreThreadTimeOut( boolean  value) public  int  prestartAllCoreThreads()
4. Executors 中的线程池的⼯⼚⽅法
为了防⽌使⽤者错误搭配ThreadPoolExecutor 构造函数的各个参数以及更加⽅便简洁的创建ThreadPoolExecutor 对象,JavaSE 中⼜定义了Executors 类,Eexcutors 类提供了创建常⽤配置线程池的⽅法。以下是Executors 常⽤的三个创建线程池的源代码。
从源码中可以看出,Executors 间接的调⽤了重载的ThreadPoolExecutor 构造函数,并帮助⽤户根据不同的应⽤场景,配置不同的参数。
newCachedThreadPool :使⽤SynchronousQueue 作为阻塞队列,队列⽆界,线程的空闲时限为60秒。这种类型的线程池⾮常适⽤IO 密集的服务,因为IO 请求具有密集、数量巨⼤、不持续、服务器端CPU 等待IO 响应时间长的特点。服务器端为了能提⾼CPU 的使⽤率就应该为每个IO 请求都创建⼀个
线程,以免CPU 因为等待IO 响应⽽空闲。
newFixedThreadPool :需指定核⼼线程数,核⼼线程数和最⼤线程数相同,使⽤LinkedBlockingQueue 作为阻塞队列,队列⽆界,线程空闲时间0秒。这种类型的线程池可以适⽤CPU 密集的⼯作,在这种⼯作中CPU 忙于计算⽽很少空闲,由于CPU 能真正并发的执⾏的线程数是⼀定的(⽐如四核⼋线程),所以对于那些需要CPU 进⾏⼤量计算的线程,创建的线程数超过CPU 能够真正并发执⾏的线程数就没有太⼤的意义。
newSingleThreadExecutor :池中只有⼀个线程⼯作,阻塞队列⽆界,它能保证按照任务提交的顺序来执⾏任务。
5. 任务的提交过程
submit ⽅法源码1
2
3
4
5public  static  ExecutorService newCachedThreadPool() {    return  new  ThreadPoolExecutor( 0 , Integer.MAX_VALUE,                                  60L, TimeUnit.SECONDS,                                  new  SynchronousQueue<Runnable>());}1
2
3
4
5public  static  ExecutorService newFixedThreadPool( int  nThreads) {    return  new  ThreadPoolExecutor(nThreads, nThreads,                                  0L, TimeUnit.MILLISECONDS,                                  new  LinkedBlockingQueue<Runnable>());}1
2
3
4
5
6public  static  ExecutorService newSingleThreadExecutor() {    return  new  FinalizableDelegatedExecutorService          ( new  ThreadPoolExecutor( 1 ,  1 ,                                0L, TimeUnit.MILLISECONDS,                                new  LinkedBlockingQueue<Runnable>()));}
submit 的实现⽅法位于抽象类AbstractExecutorService 中,⽽此时execute ⽅法还未实现(⽽是在
AbstractExecutorService 的继承类ThreadPoolExecutor 中实现)。submit 有三种重载⽅法,这⾥我选取了两个常⽤的进⾏分析,可以看出⽆论哪个submit ⽅法都最终调⽤了execute ⽅法。
execute ⽅法源码1
2
3
4
5
6
7
8
9
1
1
1
1
2
1
3public  Future<?> submit(Runnable task) {    if  (task ==  null  )  throw  new  NullPointerException();    RunnableFuture<Void> ftask = newTaskFor(task,  null  );    execute(ftask);    return  ftask;} public  <T> Future<T> submit(Callable<T> task) {    if  (task ==  null  )  throw  new  NullPointerException();   
RunnableFuture<T> ftask = newTaskFor(task);    execute(ftask);    return  ftask;}1
2
3
4
5
6
7
8
9
1
1
1
1
2
1
3
1
4
1
5
1
6
1
7
1
8
1
9
2
2
1public  void  execute(Runnable command) {    if  (command ==  null  )        throw  new  NullPointerException();      int  c = ();    if  (workerCountOf(c) < corePoolSize) {        if  (addWorker(command,  true  ))            return  ;        c = ();    }          if  (isRunning(c) && workQueue.offer(command)) {        int  recheck = ();        if  (! isRunning(recheck) && remove(c
ommand))            reject(command);        else  if  (workerCountOf(recheck) ==  0 )            addWorker( null  ,  false  );    }    else  if  (!addWorker(command,  false  ))        reject(command);}