0%

ThreadPoolExecutor

ThreadPoolExecutor类,是一个“使用线程池调度执行所提交任务”的执行器,可简称“TPE”,它在日常工作中经常被用到,十分重要。

TPE继承类图如图1。

图1

一、核心原理

查看ThreadPoolExecutor类源码,理解其核心原理。带有笔者注释的版本见链接

1.1、执行器状态和Worker数量上限

存在成员变量AtomicInteger ctl,其对应的整型值有32位比特,分段表示不同的含义:

  • 低29位,记录执行器当前Worker数量,因此,Worker数量的上限值为(1<<29)-1
  • 高3位,记录执行器的状态(根据以下状态值,可知有RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED
    • RUNNING,值为-1<<29,即111,[29个0],正常运行态
    • SHUTDOWN,值为0<<29,即000,[29个0],继续执行“Worker中当前正在执行的任务”和“workQueue中的任务”,禁止新提交任务,当所有Worker都退出后,经历TIDYING -> TERMINATED
    • STOP,值为1<<29,即001,[29个0],给所有Worker中正在执行的任务发出中断信号(当然,正在执行的任务如何响应该中断信号由其自身决定,可以不响应的[1]),“workQueue中的任务”被立即提取出来返回给调用者(即,这些任务不会再被执行),禁止新提交任务,当所有Worker都退出后,经历TIDYING -> TERMINATED
    • TIDYING,值为2<<29,即010,[29个0],一个临时的中间状态
    • TERMINATED,值为3<<29,即011,[29个0],执行器成功关闭,Worker数量为0,workQueue为空

1.2、任务队列

存在成员变量BlockingQueue<Runnable> workQueue:表示任务队列,当Worker数量=corePoolSize时,新提交的任务被插入到该队列,一般是有界队列,否则当提交任务过多时,会导致内存OOM,而且超大无界队列尾部的任务长时间得不到执行一般也非所欲,宁可及早发现。

JDK 1.8中BlockingQueue的实现子类详见《并发Queue》中的阻塞队列。

1.3、拒绝执行策略

存在成员变量RejectedExecutionHandler handler:表示拒绝执行策略类。

有4种拒绝执行策略:

  • AbortPolicy:抛出RejectedExecutionException异常。默认策略
  • DiscardPolicy:直接忽略
  • CallerRunsPolicy:当执行器状态为RUNNING时,由提交任务的线程自身去执行;否则,直接忽略
  • DiscardOldestPolicy:当执行器状态为RUNNING时,丢弃workQueue队列队首任务,然后再重新提交任务;否则,直接忽略

1.4、线程工厂方法类

存在成员变量ThreadFactory threadFactory:新增一个Worker时,会用该工厂方法类创建一个新的绑定到该Worker的Thread实例,具体通过其内的Thread newThread(Runnable r)方法进行创建,推荐提供一个自定义实现的ThreadFactory子类,覆盖实现Thread newThread(Runnable r)方法,使创建得到的Thread实例名字具备强可读性,便于后续排查问题。

默认的线程工厂方法类是Executors.defaultThreadFactory(),其源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

有一个比较好的线程工厂方法类是com.alibaba.csp.sentinel.concurrent.NamedThreadFactory,其源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NamedThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber;
private final String namePrefix;
private final boolean daemon;

public NamedThreadFactory(String namePrefix, boolean daemon) {
this.threadNumber = new AtomicInteger(1);
this.daemon = daemon;
SecurityManager s = System.getSecurityManager();
this.group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = namePrefix;
}

public NamedThreadFactory(String namePrefix) {
this(namePrefix, false);
}

public Thread newThread(Runnable r) {
Thread t = new Thread(this.group, r, this.namePrefix + "-thread-" + this.threadNumber.getAndIncrement(), 0L);
t.setDaemon(this.daemon);
return t;
}
}

1.5、Worker

1.5.1、基本

TPE中的执行实体是Worker,关于该类有以下几点说明:

  • 继承实现AQS接口,实现了非可重入的排他锁逻辑
  • 继承自Runnable接口,故是一个执行实体,它的run()方法转发调用外围TPE实例的runWorker(Worker w)方法
  • 每个Worker对应一个Thread,在该Thread中执行该Worker,Worker内有一个实例成员变量final Thread thread指向该Thread,虽然这个不是必需的,因为在Worker运行过程中可通过Thread.currentThread()语句获取到该Thread

需要注意的是:我们常说线程数量,而不是Worker数量,这两者本质是等价的。

1.5.2、运转

runWorker(Worker w)方法的核心逻辑是:不断获取任务调度执行。

任务来源有两个:

  • 创建生成Worker实例时传递的初始任务(可以为空),该任务被获取执行最多一次
  • 调用getTask()方法从workQueue队列获取任务,获取不到任务时挂起

1.5.3、退出机制

有3种退出机制:

  1. 退出机制1:任务执行过程中抛出未被内部catch住的异常,导致Worker退出
  2. 退出机制2:一般情况下,调用getTask()方法从workQueue队列获取任务,获取不到时恒挂起。但是当Worker数量>corePoolSize || 成员变量allowCoreThreadTimeOut=true条件判断语句返回结果为true时:
    • 成员变量keepAliveTime>0,尝试从workQueue队列获取任务,挂起成员变量keepAliveTime指定时间,如果过期仍未获取到任务,则getTask()方法返回NULL,继而导致Worker退出
    • 成员变量keepAliveTime=0,尝试从workQueue队列获取任务,不进行挂起,getTask()方法直接返回NULL,继而导致Worker退出
  3. 退出机制3:一般情况下,调用getTask()方法从workQueue队列获取任务,获取不到时恒挂起。但是当执行器的状态>=STOP || (执行器的状态=SHUTDOWN && workQueue为空),此时调用getTask()方法会直接返回NULL,继而导致Worker退出

需要注意的是:每次Worker退出都会去调用tryTerminate()方法,查看执行器是否能够进入TERMINATED状态。

1.5.4、中断

根据[1]知道,对于中断信号如何响应处理,由具体程序具体决定,可以不响应处理的。

1.5.4.1、Worker运转过程中的中断信号处理点

Worker运转过程中有3个中断信号处理点(其中一个是“潜在可能点”):
点1getTask()方法中的workQueue.poll或者workQueue.take语句,即中断获取任务的挂起,workQueue.poll或者workQueue.take语句响应中断,抛出InterruptedException异常,复位中断标志位,接着进入下一轮循环:首先判断是否满足直接返回NULL的条件(此时,触发Worker的“退出机制2”或者“退出机制3”),不满足则继续调用workQueue.poll或者workQueue.take语句进行获取任务的挂起
点2runWorker()方法中的如下源码,它表示的含义是:当执行器的状态>=STOP时,如果已有中断信号则传递下去,否则自中断产生一个中断信号;否则,即执行器的状态<STOP时,对已有的中断标志位进行复位
点3:真正的任务执行过程中,即task.run()执行过程中,它是潜在可能点,即对中断信号是否处理是未定的

1
2
3
4
5
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
1.5.4.2、中断信号来源

对于针对Worker运转的中断信号来源,这里介绍的来自于TPE内部逻辑,而不考虑其他的(比如“在task.run()自中断”,“以任意可行的方式获取Worker对应的Thread实例,在其上调用interrupt()方法”等)。

在继续介绍之前,首先介绍“Idle Worker”的概念:查看源码1中的w.lock()和源码2中的w.tryLock()语句,可知“Idle Worker”就是还未获取到任务的Worker。

源码1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
...
}
}
}

源码2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

来自于TPE内部逻辑的中断信号:

  • 调用shutdown()方法,其内部调用interruptIdleWorkers()方法,它会给所有“Idle Worker”发送中断信号(此时执行器状态为SHUTDOWN),该中断信号只能被“点1”或者“点2(存在一种进入‘点2’的并发情形)”响应处理,而不能到达“点3”
  • 调用shutdownNow()方法,其内部调用interruptWorkers()方法,它会给所有“Worker”发送中断信号(此时执行器状态为STOP),该中断信号可能被“点1”或者“点2”或者“点3”响应处理,或者继续存在
  • 如果执行器的状态>=STOP,且当下没有中断信号,那么在“点2”处自中断,产生一个中断信号,该中断信号可能被“点3”响应处理,或者继续存在

1.5.5、数量

1.5.5.1、执行器状态=RUNNING

影响Worker数量之提交任务

  1. 最开始也即没有提交任何任务时,此时Worker数量=0,随着任务的提交,线性递增Worker数量直到Worker数量=corePoolSize,在本阶段,当有新的提交任务时,直接新增一个Worker,而不管已有Worker是否空闲
  2. Worker数量=corePoolSize时,继续提交任务,那么新提交的任务会被插入到workQueue队列,当workQueue队列是无界队列时,永远不可能进入阶段3和阶段4,但一般workQueue为有界队列
  3. Worker数量=corePoolSize,workQueue队列为有界队列且队列已满,此时继续提交任务,新增一个Worker,直到Worker数量=maximumPoolSize
  4. Worker数量=maximumPoolSize,workQueue队列为有界队列且队列已满,此时继续提交任务,触发“拒绝执行策略”

以上步骤示意图见图2。

图2

影响Worker数量之Worker退出

  1. 退出机制1
  2. 退出机制2
1.5.5.2、执行器状态>=SHUTDOWN

影响Worker数量之提交任务:禁止提交,触发“拒绝执行策略”。

影响Worker数量之Worker退出

  1. 退出机制1
  2. 退出机制2
  3. 退出机制3

二、具体动作

2.1、构造TPE

构造TPE实例的构造方法涉及到的所有参数如下:

  • int corePoolSize,对应成员变量int corePoolSize
  • int maximumPoolSize,对应成员变量int maximumPoolSize
  • long keepAliveTimeTimeUnit unit,对应成员变量long keepAliveTime
  • BlockingQueue<Runnable> workQueue,对应成员变量BlockingQueue<Runnable> workQueue
  • ThreadFactory threadFactory,对应成员变量ThreadFactory threadFactory
  • RejectedExecutionHandler handler,对应成员变量RejectedExecutionHandler handler

上述成员变量在TPE实例构造完成后都可被动态修改,除了BlockingQueue<Runnable> workQueue成员变量,但是不推荐进行动态修改

2.2、提交任务

分为两类:

  1. 只提交任务,不关注执行结果,不能对执行过程进行控制:
    • public void execute(Runnable command),执行command实例对象的run()方法,不关注执行结果,其实也没有执行结果,因为Runnable实例的run()方法并没有执行结果
  2. 提交任务后,通过Future可获取执行结果,也可对执行过程进行控制(关于Future,Runnable,Callable的关系可详见《FutureTask和ScheduledFutureTask》):
    • public <T> Future<T> submit(Callable<T> task),执行task实例对象的call()方法,获取的执行结果为call()方法的执行结果
    • public Future<?> submit(Runnable task),执行task实例对象的run()方法,获取的执行结果为预设的执行结果NULL,因为Runnable实例的run()方法并没有执行结果
    • public <T> Future<T> submit(Runnable task, T result),执行task实例对象的run()方法,获取的执行结果为预设的执行结果result,因为Runnable实例的run()方法并没有执行结果

2.3、关闭执行器

有两个关闭方法:shutdown()shutdownNow()
1、shutdown()
方法体源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); //1
interruptIdleWorkers(); //2
onShutdown(); // hook for ScheduledThreadPoolExecutor //3
} finally {
mainLock.unlock();
}
tryTerminate(); //4
}

分析如下:

  • //1,将执行器状态变为SHUTDOWN
  • //2,给“Idle Worker”发中断信号,详见“1.5.4.2、中断信号来源”小节
  • //3,空实现
  • //4,tryTerminate(),查看执行器是否能够进入TERMINATED状态。Worker退出的时候也会调用该方法

2、shutdownNow()
方法体源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); //1

interruptWorkers(); //2

tasks = drainQueue(); //3
} finally {
mainLock.unlock();
}
tryTerminate(); //4
return tasks;
}

分析如下:

  • //1,将执行器状态变为STOP
  • //2,给所有Worker发中断信号,详见“1.5.4.2、中断信号来源”小节
  • //3,把workQueue中的任务直接返回给调用者,并将其清空
  • //4,tryTerminate(),查看执行器是否能够进入TERMINATED状态。Worker退出的时候也会调用该方法

三、监控

TPE类提供了很多监控API(需要注意的是,在并发语境中,API调用返回的是某个并发瞬时值),如果需要可以实现一个异步监控任务对执行器的状态进行监控:

  • int getCorePoolSize():设置的核心Worker数量,即成员变量corePoolSize
  • int getMaximumPoolSize():设置的最大Worker数量,即成员变量maximumPoolSize
  • int getLargestPoolSize():历史峰值Worker数量
  • int getPoolSize():当前Worker数量,需要注意的是,当执行器状态>=TIDYING时,返回0
  • int getActiveCount():当前活跃Worker(即正在执行任务的Worker)数量
  • long getCompletedTaskCount():已经完成的任务总数
  • long getTaskCount:是“已经完成的任务总数”,“正在执行的任务总数”和“workQueue队列中的任务总数”三部分之和

四、最佳实践

4.1、核心Worker数量

我们知道,“核心Worker数量”与“核心线程数量”等价。

核心线程数量根据执行的任务是属于“CPU密集型”还是“IO密集型”进行配置:

  • CPU密集型。执行CPU密集型任务的线程大部分时间处于RUNNING状态[1],因此只有在真正的多核CPU上才可能得到加速,在单核CPU上并不能得到很大的加速,一般公式为:核心线程数量=CPU核数+1
  • IO密集型。执行IO密集型任务的线程大部分时间处于IO_WAIT状态[1],处于IO_WAIT状态时未占用CPU资源,相应的CPU资源资源可分配给其他线程,因此对于IO密集型任务,可配置远大于CPU核数的线程数,一般公式为:核心线程数量=CPU核数*2

备注:可通过Runtime.getRuntime().availableProcessors()语句获取CPU核数

4.2、一般情况下只需关注执行器的RUNNING状态

一般情况下只需关注执行器的RUNNING状态,无需关注其他状态,理解处于RUNNING状态的执行器内部运转还是较为简单的。


参考文献

[1]《Java并发编程基础》
[2]https://www.cnblogs.com/andy-songwei/p/10784049.html
[3]https://www.hollischuang.com/archives/6338
[4]https://www.jianshu.com/p/3001431f1b0a
[5]https://mp.weixin.qq.com/s/axWymUaYaARtvsYqvfyTtw
[6]https://www.cnblogs.com/thisiswhy/p/12690630.html

您的支持将鼓励我继续分享!