ThreadPoolExecutor类,是一个“使用线程池调度执行所提交任务”的执行器,可简称“TPE”,它在日常工作中经常被用到,十分重要。
TPE继承类图如图1。
图1
一、核心原理
查看ThreadPoolExecutor类源码,理解其核心原理。带有笔者注释的版本见链接。
1.1、执行器状态和Worker数量上限
存在成员变量AtomicInteger ctl
,其对应的整型值有32位比特,分段表示不同的含义:
- 低29位,记录执行器当前Worker数量,因此,Worker数量的上限值为
(1<<29)-1
- 高3位,记录执行器的状态(根据以下状态值,可知有
RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED
)- RUNNING,值为
-1<<29
,即111,[29个0]
,正常运行态 - SHUTDOWN,值为
0<<29
,即000,[29个0]
,继续执行“Worker中当前正在执行的任务”和“workQueue中的任务”,禁止新提交任务,当所有Worker都退出后,经历TIDYING -> TERMINATED
- STOP,值为
1<<29
,即001,[29个0]
,给所有Worker中正在执行的任务发出中断信号(当然,正在执行的任务如何响应该中断信号由其自身决定,可以不响应的[1]),“workQueue中的任务”被立即提取出来返回给调用者(即,这些任务不会再被执行),禁止新提交任务,当所有Worker都退出后,经历TIDYING -> TERMINATED
- TIDYING,值为
2<<29
,即010,[29个0]
,一个临时的中间状态 - TERMINATED,值为
3<<29
,即011,[29个0]
,执行器成功关闭,Worker数量为0,workQueue为空
- RUNNING,值为
1.2、任务队列
存在成员变量BlockingQueue<Runnable> workQueue
:表示任务队列,当Worker数量=corePoolSize
时,新提交的任务被插入到该队列,一般是有界队列,否则当提交任务过多时,会导致内存OOM,而且超大无界队列尾部的任务长时间得不到执行一般也非所欲,宁可及早发现。
JDK 1.8中BlockingQueue
的实现子类详见《并发Queue》中的阻塞队列。
1.3、拒绝执行策略
存在成员变量RejectedExecutionHandler handler
:表示拒绝执行策略类。
有4种拒绝执行策略:
- AbortPolicy:抛出
RejectedExecutionException
异常。默认策略 - DiscardPolicy:直接忽略
- CallerRunsPolicy:当执行器状态为
RUNNING
时,由提交任务的线程自身去执行;否则,直接忽略 - DiscardOldestPolicy:当执行器状态为
RUNNING
时,丢弃workQueue队列队首任务,然后再重新提交任务;否则,直接忽略
1.4、线程工厂方法类
存在成员变量ThreadFactory threadFactory
:新增一个Worker时,会用该工厂方法类创建一个新的绑定到该Worker的Thread实例,具体通过其内的Thread newThread(Runnable r)
方法进行创建,推荐提供一个自定义实现的ThreadFactory子类,覆盖实现Thread newThread(Runnable r)
方法,使创建得到的Thread实例名字具备强可读性,便于后续排查问题。
默认的线程工厂方法类是Executors.defaultThreadFactory()
,其源代码如下:
1 | static class DefaultThreadFactory implements ThreadFactory { |
有一个比较好的线程工厂方法类是com.alibaba.csp.sentinel.concurrent.NamedThreadFactory
,其源代码如下:
1 | public class NamedThreadFactory implements ThreadFactory { |
1.5、Worker
1.5.1、基本
TPE中的执行实体是Worker,关于该类有以下几点说明:
- 继承实现AQS接口,实现了非可重入的排他锁逻辑
- 继承自Runnable接口,故是一个执行实体,它的
run()
方法转发调用外围TPE实例的runWorker(Worker w)
方法 - 每个Worker对应一个Thread,在该Thread中执行该Worker,Worker内有一个实例成员变量
final Thread thread
指向该Thread,虽然这个不是必需的,因为在Worker运行过程中可通过Thread.currentThread()
语句获取到该Thread
需要注意的是:我们常说线程数量,而不是Worker数量,这两者本质是等价的。
1.5.2、运转
runWorker(Worker w)
方法的核心逻辑是:不断获取任务调度执行。
任务来源有两个:
- 创建生成Worker实例时传递的初始任务(可以为空),该任务被获取执行最多一次
- 调用
getTask()
方法从workQueue队列获取任务,获取不到任务时挂起
1.5.3、退出机制
有3种退出机制:
- 退出机制1:任务执行过程中抛出未被内部catch住的异常,导致Worker退出
- 退出机制2:一般情况下,调用
getTask()
方法从workQueue队列获取任务,获取不到时恒挂起。但是当Worker数量>corePoolSize || 成员变量allowCoreThreadTimeOut=true
条件判断语句返回结果为true
时:成员变量keepAliveTime>0
,尝试从workQueue队列获取任务,挂起成员变量keepAliveTime指定时间,如果过期仍未获取到任务,则getTask()
方法返回NULL
,继而导致Worker退出成员变量keepAliveTime=0
,尝试从workQueue队列获取任务,不进行挂起,getTask()
方法直接返回NULL
,继而导致Worker退出
- 退出机制3:一般情况下,调用
getTask()
方法从workQueue队列获取任务,获取不到时恒挂起。但是当执行器的状态>=STOP || (执行器的状态=SHUTDOWN && workQueue为空)
,此时调用getTask()
方法会直接返回NULL
,继而导致Worker退出
需要注意的是:每次Worker退出都会去调用tryTerminate()
方法,查看执行器是否能够进入TERMINATED
状态。
1.5.4、中断
根据[1]知道,对于中断信号如何响应处理,由具体程序具体决定,可以不响应处理的。
1.5.4.1、Worker运转过程中的中断信号处理点
Worker运转过程中有3个中断信号处理点(其中一个是“潜在可能点”):
点1:getTask()
方法中的workQueue.poll
或者workQueue.take
语句,即中断获取任务的挂起,workQueue.poll
或者workQueue.take
语句响应中断,抛出InterruptedException
异常,复位中断标志位,接着进入下一轮循环:首先判断是否满足直接返回NULL的条件(此时,触发Worker的“退出机制2”或者“退出机制3”),不满足则继续调用workQueue.poll
或者workQueue.take
语句进行获取任务的挂起
点2:runWorker()
方法中的如下源码,它表示的含义是:当执行器的状态>=STOP时,如果已有中断信号则传递下去,否则自中断产生一个中断信号;否则,即执行器的状态<STOP时,对已有的中断标志位进行复位
点3:真正的任务执行过程中,即task.run()
执行过程中,它是潜在可能点,即对中断信号是否处理是未定的
1 | if ((runStateAtLeast(ctl.get(), STOP) || |
1.5.4.2、中断信号来源
对于针对Worker运转的中断信号来源,这里介绍的来自于TPE内部逻辑,而不考虑其他的(比如“在task.run()
自中断”,“以任意可行的方式获取Worker对应的Thread实例,在其上调用interrupt()
方法”等)。
在继续介绍之前,首先介绍“Idle Worker”的概念:查看源码1中的w.lock()
和源码2中的w.tryLock()
语句,可知“Idle Worker”就是还未获取到任务的Worker。
源码1:
1 | final void runWorker(Worker w) { |
源码2:
1 | private void interruptIdleWorkers(boolean onlyOne) { |
来自于TPE内部逻辑的中断信号:
- 调用
shutdown()
方法,其内部调用interruptIdleWorkers()
方法,它会给所有“Idle Worker”发送中断信号(此时执行器状态为SHUTDOWN
),该中断信号只能被“点1”或者“点2(存在一种进入‘点2’的并发情形)”响应处理,而不能到达“点3” - 调用
shutdownNow()
方法,其内部调用interruptWorkers()
方法,它会给所有“Worker”发送中断信号(此时执行器状态为STOP
),该中断信号可能被“点1”或者“点2”或者“点3”响应处理,或者继续存在 - 如果执行器的状态>=STOP,且当下没有中断信号,那么在“点2”处自中断,产生一个中断信号,该中断信号可能被“点3”响应处理,或者继续存在
1.5.5、数量
1.5.5.1、执行器状态=RUNNING
影响Worker数量之提交任务:
- 最开始也即没有提交任何任务时,此时
Worker数量=0
,随着任务的提交,线性递增Worker数量直到Worker数量=corePoolSize
,在本阶段,当有新的提交任务时,直接新增一个Worker,而不管已有Worker是否空闲 - 在
Worker数量=corePoolSize
时,继续提交任务,那么新提交的任务会被插入到workQueue队列,当workQueue队列是无界队列时,永远不可能进入阶段3和阶段4,但一般workQueue为有界队列 - 当
Worker数量=corePoolSize
,workQueue队列为有界队列且队列已满,此时继续提交任务,新增一个Worker,直到Worker数量=maximumPoolSize
- 当
Worker数量=maximumPoolSize
,workQueue队列为有界队列且队列已满,此时继续提交任务,触发“拒绝执行策略”
以上步骤示意图见图2。
图2
影响Worker数量之Worker退出:
- 退出机制1
- 退出机制2
1.5.5.2、执行器状态>=SHUTDOWN
影响Worker数量之提交任务:禁止提交,触发“拒绝执行策略”。
影响Worker数量之Worker退出:
- 退出机制1
- 退出机制2
- 退出机制3
二、具体动作
2.1、构造TPE
构造TPE实例的构造方法涉及到的所有参数如下:
int corePoolSize
,对应成员变量int corePoolSize
int maximumPoolSize
,对应成员变量int maximumPoolSize
long keepAliveTime
和TimeUnit unit
,对应成员变量long keepAliveTime
BlockingQueue<Runnable> workQueue
,对应成员变量BlockingQueue<Runnable> workQueue
ThreadFactory threadFactory
,对应成员变量ThreadFactory threadFactory
RejectedExecutionHandler handler
,对应成员变量RejectedExecutionHandler handler
上述成员变量在TPE实例构造完成后都可被动态修改,除了BlockingQueue<Runnable> workQueue
成员变量,但是不推荐进行动态修改。
2.2、提交任务
分为两类:
- 只提交任务,不关注执行结果,不能对执行过程进行控制:
public void execute(Runnable command)
,执行command
实例对象的run()
方法,不关注执行结果,其实也没有执行结果,因为Runnable实例的run()
方法并没有执行结果
- 提交任务后,通过Future可获取执行结果,也可对执行过程进行控制(关于Future,Runnable,Callable的关系可详见《FutureTask和ScheduledFutureTask》):
public <T> Future<T> submit(Callable<T> task)
,执行task
实例对象的call()
方法,获取的执行结果为call()
方法的执行结果public Future<?> submit(Runnable task)
,执行task
实例对象的run()
方法,获取的执行结果为预设的执行结果NULL,因为Runnable实例的run()
方法并没有执行结果public <T> Future<T> submit(Runnable task, T result)
,执行task
实例对象的run()
方法,获取的执行结果为预设的执行结果result,因为Runnable实例的run()
方法并没有执行结果
2.3、关闭执行器
有两个关闭方法:shutdown()
和shutdownNow()
。
1、shutdown()
方法体源码如下:
1 | public void shutdown() { |
分析如下:
- //1,将执行器状态变为
SHUTDOWN
- //2,给“Idle Worker”发中断信号,详见“1.5.4.2、中断信号来源”小节
- //3,空实现
- //4,
tryTerminate()
,查看执行器是否能够进入TERMINATED
状态。Worker退出的时候也会调用该方法
2、shutdownNow()
方法体源码如下:
1 | public List<Runnable> shutdownNow() { |
分析如下:
- //1,将执行器状态变为
STOP
- //2,给所有Worker发中断信号,详见“1.5.4.2、中断信号来源”小节
- //3,把workQueue中的任务直接返回给调用者,并将其清空
- //4,
tryTerminate()
,查看执行器是否能够进入TERMINATED
状态。Worker退出的时候也会调用该方法
三、监控
TPE类提供了很多监控API(需要注意的是,在并发语境中,API调用返回的是某个并发瞬时值),如果需要可以实现一个异步监控任务对执行器的状态进行监控:
int getCorePoolSize()
:设置的核心Worker数量,即成员变量corePoolSize
值int getMaximumPoolSize()
:设置的最大Worker数量,即成员变量maximumPoolSize
值int getLargestPoolSize()
:历史峰值Worker数量int getPoolSize()
:当前Worker数量,需要注意的是,当执行器状态>=TIDYING
时,返回0int getActiveCount()
:当前活跃Worker(即正在执行任务的Worker)数量long getCompletedTaskCount()
:已经完成的任务总数long getTaskCount
:是“已经完成的任务总数”,“正在执行的任务总数”和“workQueue队列中的任务总数”三部分之和
四、最佳实践
4.1、核心Worker数量
我们知道,“核心Worker数量”与“核心线程数量”等价。
核心线程数量根据执行的任务是属于“CPU密集型”还是“IO密集型”进行配置:
- CPU密集型。执行CPU密集型任务的线程大部分时间处于
RUNNING
状态[1],因此只有在真正的多核CPU上才可能得到加速,在单核CPU上并不能得到很大的加速,一般公式为:核心线程数量=CPU核数+1
- IO密集型。执行IO密集型任务的线程大部分时间处于
IO_WAIT
状态[1],处于IO_WAIT
状态时未占用CPU资源,相应的CPU资源资源可分配给其他线程,因此对于IO密集型任务,可配置远大于CPU核数的线程数,一般公式为:核心线程数量=CPU核数*2
备注:可通过Runtime.getRuntime().availableProcessors()
语句获取CPU核数
4.2、一般情况下只需关注执行器的RUNNING
状态
一般情况下只需关注执行器的RUNNING
状态,无需关注其他状态,理解处于RUNNING
状态的执行器内部运转还是较为简单的。
参考文献
[1]《Java并发编程基础》
[2]https://www.cnblogs.com/andy-songwei/p/10784049.html
[3]https://www.hollischuang.com/archives/6338
[4]https://www.jianshu.com/p/3001431f1b0a
[5]https://mp.weixin.qq.com/s/axWymUaYaARtvsYqvfyTtw
[6]https://www.cnblogs.com/thisiswhy/p/12690630.html