ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类,是一个“使用线程池调度执行所提交‘一次性时延’或者‘周期性’任务”的执行器,可简称“STPE”。
在JDK中,执行“一次性时延”或者“周期性”任务的组件还有Timer类,STPE跟Timer的主要对比如下:
- 前者支持多线程,后者固定单线程。当配置前者只有1个线程,主要功能跟Timer是等价的
- 前者支持多种时间单位,后者只支持“日期时间”和“毫秒”
STPE继承类图如图1。
图1
一、核心原理
查看ScheduledThreadPoolExecutor类源码,理解其核心原理。带有笔者注释的版本见链接。
1.0、三种任务
在STPE中,任务的表示类为ScheduledFutureTask
,其详细介绍可见《FutureTask和ScheduledFutureTask》。
ScheduledFutureTask类中有两个字段:
long time
,单位为纳秒,表示任务的计划执行时间。需要注意的是,它表示“计划执行时间”与“本JVM进程基准时间”之间的差值,而不是跟System.currentTimeMillis()
一样以midnight, January 1, 1970 UTC
为基准时间,不同JVM进程的基准时间可能不同,一个JVM进程的生命周期内基准时间不变long period
,单位为纳秒,用以区分3种任务类型period=0
,一次性时延任务period>0
,周期性固定间隔任务period<0
,周期性固定时延任务
关于3种类型任务的介绍:
- 一次性时延任务:执行一次就结束
- 周期性固定间隔任务:本次执行完后将该任务再次加入workQueue,设置
time=time+period
- 周期性固定时延任务:本次执行完后将该任务再次加入workQueue,设置
time=System.nanoTime()+(-period)
(System.nanoTime()
表示“当前时间”与“本JVM进程基准时间”之间的差值,单位为纳秒)
ScheduledFutureTask类的run()
方法源码如下:
1 | public void run() { |
1 | void reExecutePeriodic(RunnableScheduledFuture<?> task) { |
1 | boolean canRunInCurrentRunState(boolean periodic) { |
1 | final boolean isRunningOrShutdown(boolean shutdownOK) { |
关于以上run()
方法源码的几点说明:
- 以上源码中展示了3种类型任务的执行
- 对于周期性任务,通过
time=time+period
或者time=System.nanoTime()+(-period)
更新time
值,看似有运算溢出可能,实则几乎不可能,因为Long.MAX_VALUE - System.nanoTime()
差值十分大 - 跟TPE中的任务执行不同,此处在任务执行时会再去结合执行器的状态,判断本任务是否需要执行,即
canRunInCurrentRunState(boolean periodic)
语句,该语句的含义如下:- 当执行器状态
=RUNNING
,返回true
- 当执行器状态
>=STOP
,返回false
- 当执行器状态
=SHUTDOWN
,此时如果是周期性任务,结果跟成员变量continueExistingPeriodicTasksAfterShutdown值(默认为false)
一致;如果不是周期性任务,结果跟成员变量executeExistingDelayedTasksAfterShutdown值(默认为true)
一致
- 当执行器状态
1.1、执行器状态和Worker数量上限表示值
跟父类差异点:
- SHUTDOWN,值为
0<<29
,即000,[29个0]
,继续执行“Worker中当前正在执行的任务”和“筛选过的workQueue中的任务(这个筛选过程在onShutDown()
方法中实现,详细见“2.3、关闭执行器”小节)”,禁止新提交任务,当所有Worker都退出后,经历TIDYING -> TERMINATED
另外,根据上面叙述可知,以ScheduledFutureTask表示的任务在执行run()
方法时会使用canRunInCurrentRunState(boolean periodic)
方法判断“本任务在当前执行器状态下是否能够执行”,但是对于触发执行器状态改变的线程来说,的确是继续执行“Worker中当前正在执行的任务”
,即上面继续执行“Worker中当前正在执行的任务”
叙述没有错
1.2、任务队列
STPE实例对象的workQueue
隐式指定——workQueue = new DelayedWorkQueue()
。
DelayWorkQueue类的特点如下:
- 无界队列,虽然实际上限为
Integer.MAX_VALUE
,这么设计的原因在于:新提交的“一次性时延”或者“周期性”任务如果计划执行时间还未到不能直接交给Worker执行,只能插入到队列,故队列应该是无界的 - Worker每次获取任务时,获取到的任务须满足“计划执行时间在队列中最小,且小于等于当前时间(当两个任务的计划执行时间一致时,先提交任务优先级高)”,DelayWorkQueue队列内部通过维护一个堆数据结构实现了上述需求
1.3、拒绝执行策略
跟父类相同!
1.4、线程工厂方法类
跟父类相同!
1.5、Worker
1.5.1、基本
跟父类相同!
1.5.2、运转
runWorker(Worker w)
方法的核心逻辑是:不断获取任务调度执行。
任务来源只有1个:
- 调用
getTask()
方法从workQueue队列获取任务,获取不到任务时挂起
父类中Worker运转的任务来源还有另外一个——创建生成Worker实例时传递的初始任务
,但是在这里,任务的计划执行时间很可能还未到,故直接排除掉这种任务来源。
1.5.3、退出机制
有3种退出机制:
- 退出机制1:任务执行过程中抛出未被内部catch住的异常,导致Worker退出
- 退出机制2:一般情况下,调用
getTask()
方法从workQueue队列获取任务,获取不到时恒挂起。但是当Worker数量>corePoolSize || 成员变量allowCoreThreadTimeOut=true
条件判断语句返回结果为true
时:成员变量keepAliveTime>0
,尝试从workQueue队列获取任务,挂起成员变量keepAliveTime指定时间,如果过期仍未获取到任务,则getTask()
方法返回NULL
,继而导致Worker退出成员变量keepAliveTime=0
,尝试从workQueue队列获取任务,不进行挂起,getTask()
方法直接返回NULL
,继而导致Worker退出
- 退出机制3:一般情况下,调用
getTask()
方法从workQueue队列获取任务,获取不到时恒挂起。但是当执行器的状态>=STOP || (执行器的状态=SHUTDOWN && workQueue为空)
,此时调用getTask()
方法会直接返回NULL
,继而导致Worker退出
需要注意的是:每次Worker退出都会去调用tryTerminate()
方法,查看执行器是否能够进入TERMINATED
状态。
以上跟父类相同,跟父类不同的是:在STPE中,对于Worker数量>corePoolSize
条件不可能成立,详见“1.5.5、数量”,而根据STPE JavaDoc中如下片段可知,成员变量“allowCoreThreadTimeOut”也不被推荐设置成true
,故“退出机制2”可认为无效。
Additionally, it is almost never a good idea to set {@code corePoolSize} to zero or use {@code allowCoreThreadTimeOut} because this may leave the pool without threads to handle tasks once they become eligible to run.
1.5.4、中断
跟父类相同!
1.5.5、数量
1.5.5.1、执行器状态=RUNNING
影响Worker数量之提交任务:
- 对于新提交的任务,立即无条件插入到workQueue队列
- Worker数量的变化趋势:最开始也即没有提交任何任务时,此时
Worker数量=0
,随着任务的提交,线性递增Worker数量直到Worker数量=corePoolSize
,此后Worker数量不会再增加
注意以上跟父类有很大的不同!
影响Worker数量之Worker退出:
- 退出机制1
1.5.5.2、执行器状态>=SHUTDOWN
影响Worker数量之提交任务:禁止提交,触发“拒绝执行策略”。
影响Worker数量之Worker退出:
- 退出机制1
- 退出机制3
二、具体动作
2.1、构造STPE
构造方法涉及到的所有参数如下:
int corePoolSize
,对应成员变量int corePoolSize
ThreadFactory threadFactory
,对应成员变量ThreadFactory threadFactory
RejectedExecutionHandler handler
,对应成员变量RejectedExecutionHandler handler
跟父类的构造方法最多支持7个传入参数相比,少了4个传入参数,对应的成员变量直接隐式指定:
成员变量workQueue=new DelayedWorkQueue()
,根据STPE的定义,存放任务的workQueue需要是一个优先级队列,参见“1.2、任务队列”小节成员变量maximumPoolSize=Integer.MAX_VALUE
,根据“1.5.5.1、执行器状态=RUNNING”小节,Worker数量不会大于corePoolSize
,这里设置maximumPoolSize=Integer.MAX_VALUE
纯粹只是在满足字段大小语义(maximumPoolSize>corePoolSize
)的前提下而取用了一个特殊大值Integer.MAX_VALUE
成员变量keepAliveTime=0
,根据“1.5.3、退出机制”小节可知,“退出机制2”可认为无效,故自然可认为成员变量“keepAliveTime”的设置无实际效果,这里只是取了一个特殊值0而已
上述成员变量在STPE实例构造完成后都可被动态修改,除了BlockingQueue<Runnable> workQueue
成员变量,但是不推荐进行动态修改,特别是动态改变原隐式指定的成员变量更是违反设计。
2.2、提交任务
2.2.1、继承自TPE类的方法
继承自TPE类的方法,分为两类:
- 只提交任务,不关注执行结果,不能对执行过程进行控制:
public void execute(Runnable command)
,执行command
实例对象的run()
方法,不关注执行结果,其实也没有执行结果,因为Runnable实例的run()
方法并没有执行结果
- 提交任务后,通过Future可获取执行结果,也可对执行过程进行控制(关于Future,Runnable,Callable的关系可详见《FutureTask和ScheduledFutureTask》):
public <T> Future<T> submit(Callable<T> task)
,执行task
实例对象的call()
方法,获取的执行结果为call()
方法的执行结果public Future<?> submit(Runnable task)
,执行task
实例对象的run()
方法,获取的执行结果为预设的执行结果NULL,因为Runnable实例的run()
方法并没有执行结果public <T> Future<T> submit(Runnable task, T result)
,执行task
实例对象的run()
方法,获取的执行结果为预设的执行结果result,因为Runnable实例的run()
方法并没有执行结果
在STPE类中为了实现上的统一(比如“加入workQueue队列的任务实际类型可以统一”),对以上4个方法进行了覆盖实现,覆盖实现只是简单转发调用了“2.2.2、STPE类增加的方法”小节中的方法。
2.2.2、STPE类增加的方法
STPE类中增加了4个提交任务的方法,这几个方法的性质都是“提交任务后,通过ScheduledFuture可获取执行结果,可对执行过程进行控制,也能获得计划执行时间与当前时间的纳秒时间差值(关于ScheduledFuture,Runnable,Callable的关系可详见《FutureTask和ScheduledFutureTask》)”:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
,一次性时延任务,计划执行时间为当前时间加上“delay”和“unit”联合指定的差值
,执行command
实例对象的run()
方法,获取的执行结果为预设值NULL,因为Runnable实例的run()
方法并没有执行结果public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
,一次性时延任务,计划执行时间为当前时间加上“delay”和“unit”联合指定的差值
,执行callable
实例对象的call()
方法,获取的执行结果为call()
方法的执行结果public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
,周期性固定间隔任务,首次计划执行时间为P——当前时间加上“initialDelay”和“unit”联合指定的差值
,后续的计划执行时间为P+N*(“period”和“unit”联合指定的固定间隔时间)
,每次执行时,执行command
实例对象的run()
方法,无谓获取执行结果,具体可参见《FutureTask和ScheduledFutureTask》博文中“1.2.2、核心方法”小节下的“2、runAndReset()”内容public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
,周期性固定时延任务,首次计划执行时间为P——当前时间加上“initialDelay”和“unit”联合指定的差值
,后续的计划执行时间为前一次执行完成时间+“delay”和“unit”联合指定的固定时延时间
,每次执行时,执行command
实例对象的run()
方法,无谓获取执行结果,具体可参见《FutureTask和ScheduledFutureTask》博文中“1.2.2、核心方法”小节下的“2、runAndReset()”内容
2.3、关闭执行器
有两个关闭方法:shutdown()
和shutdownNow()
。
1、shutdown()
调用TPE的shutdown()
。
唯一的一点变化是TPE中的onShutDown()
方法是空实现,在STPE中覆盖实现了该方法,现在该方法的逻辑是:在当前执行器状态为SHUTDOWN
的前提下,根据成员变量continueExistingPeriodicTasksAfterShutdown(默认为false)
和成员变量executeExistingDelayedTasksAfterShutdown(默认为true)
的值从workQueue队列中筛选掉一些任务,该筛选逻辑跟“1.0、三种任务”小节中“当执行器状态=SHUTDOWN
,此时如果是周期性任务…”部分逻辑一致,本质也是对其的呼应——既然后续run()
方法运行时会有对这个任务的筛选,这里就提前进行筛选。
2、shutdownNow()
调用TPE的shutdownNow()
。
三、监控
跟父类相同!
四、最佳实践
4.1、核心Worker数量
跟父类相同!
4.2、一般情况下只需关注执行器的RUNNING
状态
跟父类相同!
4.3、STPE中任务的“计划执行时间”与“实际执行时间”
在TPE中任务没有“计划执行时间”概念,故什么时候实际调度执行都没有关系;但在STPE中任务有“计划执行时间”概念,此时如果“实际调度执行时间”大大超过“计划执行时间”(两个时间如果相差不大,则认为处于可接受范围),有违背业务需求之嫌,故对于在STPE中提交的任务要求执行时间不可过久,否则极易导致后续没有空闲Worker调度执行到期的任务T,而最终使得T的“实际调度执行时间”大大超过“计划执行时间”。
4.4、一个happens-before关系
存在一个happens-before关系:一个周期性任务,前一次执行happens-before于后一次执行。JavaDoc原叙述为:Successive executions of a task scheduled via scheduleAtFixedRate or scheduleWithFixedDelay do not overlap. While different executions may be performed by different threads, the effects of prior executions happen-before those of subsequent ones.
接下来进行证明。
查看周期性任务执行的外围方法runAndReset()
的源代码,其中//2处是真实业务执行逻辑调用:
1 | protected boolean runAndReset() { |
具体证明过程:以volatile变量runner
为基点,根据分析可知,“后一次执行的//1处”必得读取到“前一次执行的//3处对runner写入的NULL值”,符合happens-before规则——“volatile变量规则”,结合happens-before规则——“程序顺序规则”和“传递性规则”,有:前一次执行的//1 -hb-> 前一次执行的//2 -hb-> 前一次执行的//3 -hb-> 后一次执行的//1-hb-> 后一次执行的//2,到此得证。