0%

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类,是一个“使用线程池调度执行所提交‘一次性时延’或者‘周期性’任务”的执行器,可简称“STPE”。

在JDK中,执行“一次性时延”或者“周期性”任务的组件还有Timer类,STPE跟Timer的主要对比如下:

  • 前者支持多线程,后者固定单线程。当配置前者只有1个线程,主要功能跟Timer是等价的
  • 前者支持多种时间单位,后者只支持“日期时间”和“毫秒”

STPE继承类图如图1。

图1

一、核心原理

查看ScheduledThreadPoolExecutor类源码,理解其核心原理。带有笔者注释的版本见链接

1.0、三种任务

在STPE中,任务的表示类为ScheduledFutureTask,其详细介绍可见《FutureTask和ScheduledFutureTask》

ScheduledFutureTask类中有两个字段:

  • long time,单位为纳秒,表示任务的计划执行时间。需要注意的是,它表示“计划执行时间”与“本JVM进程基准时间”之间的差值,而不是跟System.currentTimeMillis()一样以midnight, January 1, 1970 UTC为基准时间,不同JVM进程的基准时间可能不同,一个JVM进程的生命周期内基准时间不变
  • long period,单位为纳秒,用以区分3种任务类型
    • period=0,一次性时延任务
    • period>0,周期性固定间隔任务
    • period<0,周期性固定时延任务

关于3种类型任务的介绍:

  • 一次性时延任务:执行一次就结束
  • 周期性固定间隔任务:本次执行完后将该任务再次加入workQueue,设置time=time+period
  • 周期性固定时延任务:本次执行完后将该任务再次加入workQueue,设置time=System.nanoTime()+(-period)System.nanoTime()表示“当前时间”与“本JVM进程基准时间”之间的差值,单位为纳秒)

ScheduledFutureTask类的run()方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
// 周期性任务执行如果返回false,这个周期性任务也不加回去了

//outerTask也不能指向别的任务,不然setNextRunTime()这个time值白设置了
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
1
2
3
4
5
6
7
8
9
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
1
2
3
4
5
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
1
2
3
4
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}

关于以上run()方法源码的几点说明:

  1. 以上源码中展示了3种类型任务的执行
  2. 对于周期性任务,通过time=time+period或者time=System.nanoTime()+(-period)更新time值,看似有运算溢出可能,实则几乎不可能,因为Long.MAX_VALUE - System.nanoTime()差值十分大
  3. 跟TPE中的任务执行不同,此处在任务执行时会再去结合执行器的状态,判断本任务是否需要执行,即canRunInCurrentRunState(boolean periodic)语句,该语句的含义如下:
    • 当执行器状态=RUNNING,返回true
    • 当执行器状态>=STOP,返回false
    • 当执行器状态=SHUTDOWN,此时如果是周期性任务,结果跟成员变量continueExistingPeriodicTasksAfterShutdown值(默认为false)一致;如果不是周期性任务,结果跟成员变量executeExistingDelayedTasksAfterShutdown值(默认为true)一致

1.1、执行器状态和Worker数量上限表示值

跟父类差异点:

  • SHUTDOWN,值为0<<29,即000,[29个0],继续执行“Worker中当前正在执行的任务”和“筛选过的workQueue中的任务(这个筛选过程在onShutDown()方法中实现,详细见“2.3、关闭执行器”小节)”,禁止新提交任务,当所有Worker都退出后,经历TIDYING -> TERMINATED

另外,根据上面叙述可知,以ScheduledFutureTask表示的任务在执行run()方法时会使用canRunInCurrentRunState(boolean periodic)方法判断“本任务在当前执行器状态下是否能够执行”,但是对于触发执行器状态改变的线程来说,的确是继续执行“Worker中当前正在执行的任务”,即上面继续执行“Worker中当前正在执行的任务”叙述没有错

1.2、任务队列

STPE实例对象的workQueue隐式指定——workQueue = new DelayedWorkQueue()

DelayWorkQueue类的特点如下:

  • 无界队列,虽然实际上限为Integer.MAX_VALUE,这么设计的原因在于:新提交的“一次性时延”或者“周期性”任务如果计划执行时间还未到不能直接交给Worker执行,只能插入到队列,故队列应该是无界的
  • Worker每次获取任务时,获取到的任务须满足“计划执行时间在队列中最小,且小于等于当前时间(当两个任务的计划执行时间一致时,先提交任务优先级高)”,DelayWorkQueue队列内部通过维护一个堆数据结构实现了上述需求

1.3、拒绝执行策略

跟父类相同!

1.4、线程工厂方法类

跟父类相同!

1.5、Worker

1.5.1、基本

跟父类相同!

1.5.2、运转

runWorker(Worker w)方法的核心逻辑是:不断获取任务调度执行。

任务来源只有1个:

  • 调用getTask()方法从workQueue队列获取任务,获取不到任务时挂起

父类中Worker运转的任务来源还有另外一个——创建生成Worker实例时传递的初始任务,但是在这里,任务的计划执行时间很可能还未到,故直接排除掉这种任务来源。

1.5.3、退出机制

有3种退出机制:

  1. 退出机制1:任务执行过程中抛出未被内部catch住的异常,导致Worker退出
  2. 退出机制2:一般情况下,调用getTask()方法从workQueue队列获取任务,获取不到时恒挂起。但是当Worker数量>corePoolSize || 成员变量allowCoreThreadTimeOut=true条件判断语句返回结果为true时:
    • 成员变量keepAliveTime>0,尝试从workQueue队列获取任务,挂起成员变量keepAliveTime指定时间,如果过期仍未获取到任务,则getTask()方法返回NULL,继而导致Worker退出
    • 成员变量keepAliveTime=0,尝试从workQueue队列获取任务,不进行挂起,getTask()方法直接返回NULL,继而导致Worker退出
  3. 退出机制3:一般情况下,调用getTask()方法从workQueue队列获取任务,获取不到时恒挂起。但是当执行器的状态>=STOP || (执行器的状态=SHUTDOWN && workQueue为空),此时调用getTask()方法会直接返回NULL,继而导致Worker退出

需要注意的是:每次Worker退出都会去调用tryTerminate()方法,查看执行器是否能够进入TERMINATED状态。

以上跟父类相同,跟父类不同的是:在STPE中,对于Worker数量>corePoolSize条件不可能成立,详见“1.5.5、数量”,而根据STPE JavaDoc中如下片段可知,成员变量“allowCoreThreadTimeOut”也不被推荐设置成true,故“退出机制2”可认为无效。

Additionally, it is almost never a good idea to set {@code corePoolSize} to zero or use {@code allowCoreThreadTimeOut} because this may leave the pool without threads to handle tasks once they become eligible to run.

1.5.4、中断

跟父类相同!

1.5.5、数量

1.5.5.1、执行器状态=RUNNING

影响Worker数量之提交任务

  • 对于新提交的任务,立即无条件插入到workQueue队列
  • Worker数量的变化趋势:最开始也即没有提交任何任务时,此时Worker数量=0,随着任务的提交,线性递增Worker数量直到Worker数量=corePoolSize,此后Worker数量不会再增加

注意以上跟父类有很大的不同!

影响Worker数量之Worker退出

  1. 退出机制1
1.5.5.2、执行器状态>=SHUTDOWN

影响Worker数量之提交任务:禁止提交,触发“拒绝执行策略”。

影响Worker数量之Worker退出

  1. 退出机制1
  2. 退出机制3

二、具体动作

2.1、构造STPE

构造方法涉及到的所有参数如下:

  • int corePoolSize,对应成员变量int corePoolSize
  • ThreadFactory threadFactory,对应成员变量ThreadFactory threadFactory
  • RejectedExecutionHandler handler,对应成员变量RejectedExecutionHandler handler

跟父类的构造方法最多支持7个传入参数相比,少了4个传入参数,对应的成员变量直接隐式指定:

  • 成员变量workQueue=new DelayedWorkQueue(),根据STPE的定义,存放任务的workQueue需要是一个优先级队列,参见“1.2、任务队列”小节
  • 成员变量maximumPoolSize=Integer.MAX_VALUE,根据“1.5.5.1、执行器状态=RUNNING”小节,Worker数量不会大于corePoolSize,这里设置maximumPoolSize=Integer.MAX_VALUE纯粹只是在满足字段大小语义(maximumPoolSize>corePoolSize)的前提下而取用了一个特殊大值Integer.MAX_VALUE
  • 成员变量keepAliveTime=0,根据“1.5.3、退出机制”小节可知,“退出机制2”可认为无效,故自然可认为成员变量“keepAliveTime”的设置无实际效果,这里只是取了一个特殊值0而已

上述成员变量在STPE实例构造完成后都可被动态修改,除了BlockingQueue<Runnable> workQueue成员变量,但是不推荐进行动态修改,特别是动态改变原隐式指定的成员变量更是违反设计

2.2、提交任务

2.2.1、继承自TPE类的方法

继承自TPE类的方法,分为两类:

  1. 只提交任务,不关注执行结果,不能对执行过程进行控制:
    • public void execute(Runnable command),执行command实例对象的run()方法,不关注执行结果,其实也没有执行结果,因为Runnable实例的run()方法并没有执行结果
  2. 提交任务后,通过Future可获取执行结果,也可对执行过程进行控制(关于Future,Runnable,Callable的关系可详见《FutureTask和ScheduledFutureTask》):
    • public <T> Future<T> submit(Callable<T> task),执行task实例对象的call()方法,获取的执行结果为call()方法的执行结果
    • public Future<?> submit(Runnable task),执行task实例对象的run()方法,获取的执行结果为预设的执行结果NULL,因为Runnable实例的run()方法并没有执行结果
    • public <T> Future<T> submit(Runnable task, T result),执行task实例对象的run()方法,获取的执行结果为预设的执行结果result,因为Runnable实例的run()方法并没有执行结果

在STPE类中为了实现上的统一(比如“加入workQueue队列的任务实际类型可以统一”),对以上4个方法进行了覆盖实现,覆盖实现只是简单转发调用了“2.2.2、STPE类增加的方法”小节中的方法。

2.2.2、STPE类增加的方法

STPE类中增加了4个提交任务的方法,这几个方法的性质都是“提交任务后,通过ScheduledFuture可获取执行结果,可对执行过程进行控制,也能获得计划执行时间与当前时间的纳秒时间差值(关于ScheduledFuture,Runnable,Callable的关系可详见《FutureTask和ScheduledFutureTask》)”:

  • public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit),一次性时延任务,计划执行时间为当前时间加上“delay”和“unit”联合指定的差值,执行command实例对象的run()方法,获取的执行结果为预设值NULL,因为Runnable实例的run()方法并没有执行结果
  • public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit),一次性时延任务,计划执行时间为当前时间加上“delay”和“unit”联合指定的差值,执行callable实例对象的call()方法,获取的执行结果为call()方法的执行结果
  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit),周期性固定间隔任务,首次计划执行时间为P——当前时间加上“initialDelay”和“unit”联合指定的差值,后续的计划执行时间为P+N*(“period”和“unit”联合指定的固定间隔时间),每次执行时,执行command实例对象的run()方法,无谓获取执行结果,具体可参见《FutureTask和ScheduledFutureTask》博文中“1.2.2、核心方法”小节下的“2、runAndReset()”内容
  • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit),周期性固定时延任务,首次计划执行时间为P——当前时间加上“initialDelay”和“unit”联合指定的差值,后续的计划执行时间为前一次执行完成时间+“delay”和“unit”联合指定的固定时延时间,每次执行时,执行command实例对象的run()方法,无谓获取执行结果,具体可参见《FutureTask和ScheduledFutureTask》博文中“1.2.2、核心方法”小节下的“2、runAndReset()”内容

2.3、关闭执行器

有两个关闭方法:shutdown()shutdownNow()
1、shutdown()
调用TPE的shutdown()

唯一的一点变化是TPE中的onShutDown()方法是空实现,在STPE中覆盖实现了该方法,现在该方法的逻辑是:在当前执行器状态为SHUTDOWN的前提下,根据成员变量continueExistingPeriodicTasksAfterShutdown(默认为false)成员变量executeExistingDelayedTasksAfterShutdown(默认为true)的值从workQueue队列中筛选掉一些任务,该筛选逻辑跟“1.0、三种任务”小节中“当执行器状态=SHUTDOWN,此时如果是周期性任务…”部分逻辑一致,本质也是对其的呼应——既然后续run()方法运行时会有对这个任务的筛选,这里就提前进行筛选。

2、shutdownNow()
调用TPE的shutdownNow()

三、监控

跟父类相同!

四、最佳实践

4.1、核心Worker数量

跟父类相同!

4.2、一般情况下只需关注执行器的RUNNING状态

跟父类相同!

4.3、STPE中任务的“计划执行时间”与“实际执行时间”

在TPE中任务没有“计划执行时间”概念,故什么时候实际调度执行都没有关系;但在STPE中任务有“计划执行时间”概念,此时如果“实际调度执行时间”大大超过“计划执行时间”(两个时间如果相差不大,则认为处于可接受范围),有违背业务需求之嫌,故对于在STPE中提交的任务要求执行时间不可过久,否则极易导致后续没有空闲Worker调度执行到期的任务T,而最终使得T的“实际调度执行时间”大大超过“计划执行时间”。

4.4、一个happens-before关系

存在一个happens-before关系:一个周期性任务,前一次执行happens-before于后一次执行。JavaDoc原叙述为:Successive executions of a task scheduled via scheduleAtFixedRate or scheduleWithFixedDelay do not overlap. While different executions may be performed by different threads, the effects of prior executions happen-before those of subsequent ones.

接下来进行证明。

查看周期性任务执行的外围方法runAndReset()的源代码,其中//2处是真实业务执行逻辑调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread())) //1
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result //2
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null; //3
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

具体证明过程:以volatile变量runner为基点,根据分析可知,“后一次执行的//1处”必得读取到“前一次执行的//3处对runner写入的NULL值”,符合happens-before规则——“volatile变量规则”,结合happens-before规则——“程序顺序规则”和“传递性规则”,有:前一次执行的//1 -hb-> 前一次执行的//2 -hb-> 前一次执行的//3 -hb-> 后一次执行的//1-hb-> 后一次执行的//2,到此得证。

您的支持将鼓励我继续分享!