在阅读和过程中,会碰到以下几个互相之间具有联系的类和接口:
- Runnable
- Future
- RunnableFuture
- FutureTask
- Callable
- ScheduledFuture
- RunnableScheduledFuture
- ScheduledFutureTask
相关继承类图如图1所示。
图1
本文接下来基于具体实现类“FutureTask”和“ScheduledFutureTask”梳理上述类和接口的脉络关系,以便于理解。
一、FutureTask
1.1、演进
需求:向线程池提交一个任务后,能够控制过程和获取结果。
演进:
- 设计Future接口作为向线程池提交任务后的返回对象,通过它控制过程和获取结果
- 线程池中执行任务,任务的最终载体必然是Runnable接口,其与返回的Future结果属于跨线程关系,因此两者产生关联的唯一途径是共享内存,自然而然设计RunnableFuture接口
- 执行RunnableFuture接口的
run()
方法,然后将执行结果赋值到RunnableFuture实例的某个成员变量 - FutureTask是RunnableFuture接口的实现类,引入一个
Callable callable
成员变量,在run()
方法中执行该Callable接口的call()
方法,然后call()
方法的返回结果被赋值到Object outcome
成员变量 - 由以上点可知,FutureTask的
run()
方法中调度执行Callable接口的call()
方法:如果在调度任务时直接传入Callable接口,自然没啥问题;但是如果传入的是Runnable接口,则需要进行适配,具体是通过Executors.RunnableAdapter类,它实现了Callable接口,查看Executors.RunnableAdapter类的唯一构造方法,须传入两个参数,一个是Runnable task
,另外一个是T result
,后者表示预设的call()
方法返回结果,因为Runnable实例的run()
方法并没有执行结果
Future接口:
1 | public interface Future<V> { |
RunnableFuture接口:
1 | public interface RunnableFuture<V> extends Runnable, Future<V> { |
FutureTask类:
1 | public class FutureTask<V> implements RunnableFuture<V> { |
Callable接口:
1 | public interface Callable<V> { |
RunnableAdapter类:
1 | static final class RunnableAdapter<T> implements Callable<T> { |
1.2、源码阅读
带有笔者注释的FutureTask源码版本见链接。
1.2.1、内部状态
有成员变量volatile int state
,表征内部状态。
其内部状态列表如下:
NEW=0
,初始状态COMPLETING=1
,转向NORMAL
或者EXCEPTIONAL
时的临时中间状态NORMAL=2
,run()
方法内调用call()
方法未抛异常,调用正常退出EXCEPTIONAL=3
,run()
方法内调用call()
方法抛异常,调用非正常退出CANCELLED=4
,调用cancel()
方法时,如果传入参数mayInterruptIfRunning=false
,内部状态从NEW
转为CANCELLED
INTERRUPTING=5
,调用cancel()
方法时,如果传入参数mayInterruptIfRunning=true
,内部状态从NEW
转为INTERRUPTING
INTERRUPTED=6
,调用cancel()
方法时,如果传入参数mayInterruptIfRunning=true
,内部状态从NEW
转为INTERRUPTING
,等发出中断信号后,内部状态再转为INTERRUPTED
1.2.2、核心方法
1、run()
执行任务,具体逻辑是:
- 调用成员变量
callable
的call()
方法 - 根据
call()
方法的调用是否抛出异常分为两种情况:- 不抛出异常:1)内部状态经历
NEW -> COMPLETING -> NORMAL
;2)call()
方法的返回结果赋值给成员变量outcome
- 抛出异常:1)内部状态经历
NEW -> COMPLETING -> EXCEPTIONAL
;2)抛出的Throwable对象赋值给成员变量outcome
- 不抛出异常:1)内部状态经历
2、runAndReset()
STPE中提交的“周期性任务”调度执行时执行本方法,它的具体逻辑是:
- 调用成员变量
callable
的call()
方法 - 根据
call()
方法的调用是否抛出异常分为两种情况:- 不抛出异常:1)内部状态仍为
NEW
,否则下一个周期不能执行;2)call()
方法的返回结果不赋值给成员变量outcome
,即使赋值了,在NEW
内部状态下,调用get()
方法获取结果恒挂起,调用get(long timeout, TimeUnit unit)
方法获取结果超时后抛出TimeoutException
异常;3)runAndReset()
方法返回true
值,表征该周期性任务会再被加入到workQueue队列进行后续调度[1] - 抛出异常:1)内部状态经历
NEW -> COMPLETING -> EXCEPTIONAL
;2)抛出的Throwable对象赋值给成员变量outcome
;3)runAndReset()
方法返回false
值,表征该周期性任务不会再被加入到workQueue队列进行后续调度[1]
- 不抛出异常:1)内部状态仍为
3、cancel(boolean mayInterruptIfRunning)
调用cancel(boolean mayInterruptIfRunning)
方法尝试取消run()
或者runAndReset()
方法的运行。
其核心逻辑是:
- 如果当前内部状态不为
NEW
,表明已经执行完成,不能再取消 - 如果当前内部状态为
NEW
,表明未执行完成,尝试取消:- 如果传入参数
mayInterruptIfRunning=false
,则经历内部状态从NEW转为CANCELLED
过程 - 如果传入参数
mayInterruptIfRunning=true
,则经历内部状态从NEW转为INTERRUPTING -> 给正在执行当前任务的线程发中断信号 -> 内部状态转为INTERRUPTED
过程
- 如果传入参数
4、isCancelled()
当内部状态是CANCELLED
、INTERRUPTING
或者INTERRUPTED
时(即成功调用cancel(boolean mayInterruptIfRunning)
方法的可能结果),返回true,否则返回false。
5、isDone()
当内部状态不是NEW
时,返回true,否则返回false。
6、get()
获取run()
或者runAndReset()
方法的执行结果:
- 当内部状态<=
COMPLETING
时,恒挂起 - 当内部状态>
COMPLETING
时,分为3种情况:- 内部状态为
NORMAL
,表示成员变量callable
的call()
方法执行未抛出异常,成功退出,这里返回成员变量outcome
值(即call()
方法的返回结果) - 内部状态为
EXCEPTIONAL
,表示成员变量callable
的call()
方法执行抛出异常,这里抛出ExecutionException
异常 - 内部状态>=
CANCELLED
,即为CANCELLED
、INTERRUPTING
或者INTERRUPTED
,表示被取消执行,这里抛出CancellationException
异常
- 内部状态为
7、get(long timeout, TimeUnit unit)
获取run()
或者runAndReset()
方法的执行结果:
- 当内部状态<=
COMPLETING
时,挂起指定时间,如果到期仍是<=COMPLETING
,则抛出TimeoutException
异常 - 当内部状态>
COMPLETING
时,分为3种情况:- 内部状态为
NORMAL
,表示成员变量callable
的call()
方法执行未抛出异常,成功退出,这里返回成员变量outcome
值(即call()
方法的返回结果) - 内部状态为
EXCEPTIONAL
,表示成员变量callable
的call()
方法执行抛出异常,这里抛出ExecutionException
异常 - 内部状态>=
CANCELLED
,即为CANCELLED
、INTERRUPTING
或者INTERRUPTED
,表示被取消执行,这里抛出CancellationException
异常
- 内部状态为
二、ScheduledFutureTask
相较于FutureTask,ScheduledFutureTask额外继承实现了3个接口的方法:
- Comparable接口的
public int compareTo(T o)
方法,ScheduledFutureTask被加入到STPE中属于DelayedWorkQueue类型的workQueue,DelayedWorkQueue是一个优先级队列,需要其中的元素实现Comparable接口的上述方法 - Delay接口的
long getDelay(TimeUnit unit)
方法,获取任务计划执行时间与当前时间的时间差值,时间差值的单位由“TimeUnit unit”指定 - RunnableScheduledFuture接口的
boolean isPeriodic()
方法,判断当前任务是否是周期性任务
三、关于源码的两个困惑
3.1、困惑1
困惑1描述:调用get()/get(long timeout, TimeUnit unit)
方法获取set(V v)/setException(Throwable t)
方法中设置的变量outcome
结果时,如何证明设置的值对其是可见的。
1 | protected void set(V v) { |
1 | protected void setException(Throwable t) { |
1 | public V get() throws InterruptedException, ExecutionException { |
1 | private V report(int s) throws ExecutionException { |
以get()
方法为例进行说明(get(long timeout, TimeUnit unit)
方法的说明是类似的),相关源代码已贴在上面。
具体证明如下:根据get()
方法的实现逻辑可知,在调用report()
方法之前必有state>COMPLETING
,本讨论中只关心两个状态NORMAL
和EXCEPTIONAL
,NORMAL
状态通过set()
方法中的UNSAFE.putOrderedInt(this, stateOffset, NORMAL)
语句设置/EXCEPTIONAL
状态通过setException(Throwable t)
方法中的UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL)
语句设置,state
是volatile变量,在get()
方法或者awaitDone(boolean timed, long nanos)
方法中以“一般直白”形式读取故具有volatile变量读语义,根据《Unsafe类》中的“广义的volatile变量happens-before规则”,再结合happens-before规则——“程序顺序规则”和“传递性规则”,有“//1 -hb-> //2 -hb-> //5 -hb-> //6”或者“//3 -hb-> //4 -hb-> //5 -hb> //6”,因此//1处或者//3处对outcome
的设置对//6处可见。
3.2、困惑2
困惑2描述:调用cancel(true)
方法发送的中断信号可能会泄漏而递延到下一个任务的执行过程,而导致影响其执行吗?答案是否定的。
根据具体执行方法是run()
还是runAndReset()(STPE中提交的“周期性任务”调度执行时执行该方法)
分为两种情形进行讨论证明。
3.2.1、run()
1 | public boolean cancel(boolean mayInterruptIfRunning) { |
1 | public void run() { |
1 | private void handlePossibleCancellationInterrupt(int s) { |
1 | final void runWorker(Worker w) { |
1 | private Runnable getTask() { |
1、前导
- 在
cancel(boolean mayInterruptIfRunning)
方法中,Thread t = runner
语句内隐含的加载runner
操作不能被重排序到UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
语句之前,因为结合《synchronized-volatile-final关键词》和《Unsafe类》可知,后者隐式带有StoreLoad绑定型内存屏障,故后续的Load操作不能被重排序到该Store操作之前 - 同理,在
run()
方法中,int s = state
语句内隐含的加载state
操作不能被重排序到会调用的set(V v)
或者setException(Throwable t)
方法内的UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)
语句之前 - 同理,在
run()
方法中,int s = state
语句内隐含的加载state
操作不能被重排序到runner = null
语句之前,因为后者是一个volatile变量写入,隐式带有StoreLoad内存屏障
2、具体证明
根据以上贴出的源码,进行具体证明,以“//4处加载到最新state时机”为讨论基点S,根据在S点时//1处CAS操作的执行情况分为3类完备情况(如果不是CAS操作,考虑到可见性问题,则有不只3类完备情况):
- 在S点时,//1处操作已执行且返回true
- 如果此时S点的
state=CANCELLED
,则不发出中断信号 - 如果此时S点的
state=INTERRUPTING | INTERRUPTED
,需要注意的是,INTERRUPTED
状态通过UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED)
语句设置,并不立即可见。查看//4处后handlePossibleCancellationInterrupt(s)
方法,存在一个确保逻辑:如果当前state=INTERRUPTING
,则进入一个循环,直到state!=INTERRUPTING
,根据可能的state状态转移,此时必有state=INTERRUPTED
。根据《Unsafe类》中的“广义的volatile变量happens-before规则”,再结合happens-before规则——“程序顺序规则”和“传递性规则”,有//2 -hb-> //3 -hb-> TPE类中runWorker()方法内的下一轮getTask()语句执行 -hb-> TPE类中runWorker()方法内的下一轮//5语句执行
,如果//2处实际上未执行到,自然不发出中断信号;如果//2处实际执行到了,且出现中断信号泄漏(如果该中断信号在真实任务执行逻辑内部被复位掉,即c.call();
内部,则没有泄漏),根据分析可知,此时该泄漏的中断信号必然会在getTask()
方法的workQueue.poll或者take
语句处或者//5处(当执行器的状态为RUNNING
)被复位,故不会递延到下一个任务的执行过程,得证
- 如果此时S点的
- 在S点时,//1处操作已执行且返回false,此时不发出中断信号
- 在S点时,//1处操作未执行,根据前导第2点,S点时必有
state>=COMPLETING
,此时执行//1处操作会返回false,故不发出中断信号
3、其他run()
方法中int s = state
语句前有一个注释state must be re-read after nulling runner to prevent leaked interrupts
,就是说在这里需要加载最新的state,避免中断信号泄漏。接下来给出一种不重新加载最新state导致中断信号泄漏的情形:线程T2执行cancel(true)
方法,执行到T1.interrupt()
语句时等待CPU资源,此时state的状态为INTERRUPTING
,线程T1执行run()
方法,执行到if (s >= INTERRUPTING)
语句时,由于s没有重新加载最新的state值,因此还是NEW
,条件判断失败,不会执行handlePossibleCancellationInterrupt(s)
方法,继续执行直到下一个真实任务的执行逻辑,此时T2分配到CPU资源,执行T1.interrupt()
语句,发出中断信号,即出现了“中断信号泄漏”。
3.2.2、runAndReset()
1 | protected boolean runAndReset() { |
1、前导
- 在
cancel(boolean mayInterruptIfRunning)
方法中,Thread t = runner
语句内隐含的加载runner
操作不能被重排序到UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
语句之前,因为结合《synchronized-volatile-final关键词》和《Unsafe类》可知,后者隐式带有StoreLoad绑定型内存屏障,故后续的Load操作不能被重排序到该Store操作之前 - 同理,在
runAndReset()
方法中,int s = state
语句内隐含的加载state
操作不能被重排序到会调用的setException(Throwable t)
方法内的UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)
语句之前 - 同理,在
runAndReset()
方法中,int s = state
语句内隐含的加载state
操作不能被重排序到runner = null
语句之前,因为后者是一个volatile变量写入,隐式带有StoreLoad内存屏障
2、具体证明
根据以上贴出的源码和“3.2.1、run()”小节中贴出的源码,进行具体证明,以“//44处加载到最新state时机”为讨论基点S,根据在S点时//1处CAS操作的执行情况分为3类完备情况(如果不是CAS操作,考虑到可见性问题,则有不只3类完备情况):
- 在S点时,//1处操作已执行且返回true
- 如果此时S点的
state=CANCELLED
,则不发出中断信号 - 如果此时S点的
state=INTERRUPTING | INTERRUPTED
,需要注意的是,INTERRUPTED
状态通过UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED)
语句设置,并不立即可见。查看//44处后handlePossibleCancellationInterrupt(s)
方法,存在一个确保逻辑:如果当前state=INTERRUPTING
,则进入一个循环,直到state!=INTERRUPTING
,根据可能的state状态转移,此时必有state=INTERRUPTED
。根据《Unsafe类》中的“广义的volatile变量happens-before规则”,再结合happens-before规则——“程序顺序规则”和“传递性规则”,有//2 -hb-> //3 -hb-> TPE类中runWorker()方法内的下一轮getTask()语句执行 -hb-> TPE类中runWorker()方法内的下一轮//5语句执行
,如果//2处实际上未执行到,自然不发出中断信号;如果//2处实际执行到了,且出现中断信号泄漏(如果该中断信号在真实任务执行逻辑内部被复位掉,即c.call();
内部,则没有泄漏),根据分析可知,此时该泄漏的中断信号必然会在getTask()
方法的workQueue.poll或者take
语句处或者//5处(当执行器的状态为RUNNING
)被复位,故不会递延到下一个任务的执行过程,得证
- 如果此时S点的
- 在S点时,//1处操作已执行且返回false,此时不发出中断信号
- 在S点时,//1处操作未执行:
- 如果有执行
setException(ex)
语句,根据前导第2点,S点时必有state>=COMPLETING
,此时执行//1处操作会返回false,故不发出中断信号 - 如果未有执行
setException(ex)
语句,根据前导第1和第3点,此时执行//1处操作,runner = null
语句必然已经执行完且结果对其可见,Thread t = runner
语句不可能重排序到//1处操作之前,故if (t != null)
判断语句结果为false,不会发出中断信号
- 如果有执行
3、其他
针对“在S点时,//1处操作未执行,且未执行setException(ex)
语句”情形,本来笔者还认为存在一种“中断信号泄漏递延给下一个任务”的可能,具体是“线程T1执行runAndReset()
方法,S点时有state=NEW
,runner = null;
语句重排序到//44之后,故此时runner!=NULL
,另外一个线程T2在S点时执行cancel(true)
方法,执行到T1.interrupt()
语句时等待CPU资源分配,T1继续执行直到下一个真实任务的执行逻辑(不会调用handlePossibleCancellationInterrupt(s)
方法,因为if (s >= INTERRUPTING)
判断语句返回false),然后T2此时发出中断信号,该中断信号对新任务的执行过程可见,即出现了‘中断信号泄漏’”。针对以上“中断信号泄漏”可能,还特地向OpenJDK反馈了一个Bug,后面根据Doug Lea的回复——The case presented here includes assumption of a reordering that should be precluded because fields state and runner are volatile.
,才意识到了以上分析的错误在于没有考虑到前导第1和第3点。
四、其他
根据以下TPE类中runWorker(Worker w)
方法源码可知,在TPE和STPE中调度执行的任务,必须在内部catch
住自身过程抛出的Throwable
,否则会导致执行该任务的Worker异常退出(虽然会被弥补)。
1 | final void runWorker(Worker w) { |
接下来针对TPE和STPE中的所有任务提交形式一一进行分析说明。
4.1、TPE
存在4种提交形式,分为两类:
- 第一类://1处的
task
就是真实的任务实体,在该情形中,真实任务实体的执行方法必须在内部catch住自身过程抛出的Throwable
public void execute(Runnable command)
,真实任务实体是command
,必须在command.run()
方法内部catch住自身过程抛出的Throwable
- 第二类://1处的
task
指代FutureTask实例,查看FutureTask.run()
方法源码,可知在调用真实任务实体执行方法的外围有catch住异常的逻辑,故不会导致Worker异常退出,但是为统一起见,建议还是在真实任务实体执行方法内部catch住自身过程抛出的Throwable
public <T> Future<T> submit(Callable<T> task)
,真实任务实体是task
,建议在task.call()
方法内部catch住自身过程抛出的Throwable
public Future<?> submit(Runnable task)
,真实任务实体是task
,建议在task.run()
方法内部catch住自身过程抛出的Throwable
public <T> Future<T> submit(Runnable task, T result)
,真实任务实体是task
,建议在task.run()
方法内部catch住自身过程抛出的Throwable
4.2、STPE
存在8种提交形式,包括从TPE继承下来的4种提交形式(在STPE中覆盖实现),查看源码可知//1处的task
指代ScheduledFutureTask实例,查看ScheduledFutureTask.run()
方法源码,其转发调用FutureTask的run()
或者runAndReset()
方法,查看上述两个方法源码,可知在调用真实任务实体执行方法的外围有catch住异常的逻辑,故不会导致Worker异常退出,但是为统一起见,建议还是在真实任务实体执行方法内部catch住自身过程抛出的Throwable
:
- 继承下来的4个方法说明见“4.1、TPE”小节
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
,真实任务实体是command
,建议在command.run()
方法内部catch住自身过程抛出的Throwable
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
,真实任务实体是callable
,建议在callable.call()
方法内部catch住自身过程抛出的Throwable
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
,真实任务实体是command
,建议在command.run()
方法内部catch住自身过程抛出的Throwable
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
,真实任务实体是command
,建议在command.run()
方法内部catch住自身过程抛出的Throwable
参考文献
[1]《ScheduledThreadPoolExecutor》中的ScheduledFutureTask类的run()
方法
[2]https://www.cnblogs.com/thisiswhy/p/13791966.html
[3]《Unsafe类》