一、CountDownLatch
1.1、语义和使用示例
对于CountDownLatch实例对象,构造时传入一个计数值int count:
- 一般用法:调用await()和await(long timeout, TimeUnit unit)方法排队挂起,直到调用count次countDown()方法(每次调用countDown()方法,都会对计数值减1)
- 调用await()方法排队挂起,除了上述的正常退出机制,退出机制还有:“线程被中断,抛出InterruptedException异常”;调用await(long timeout, TimeUnit unit)方法排队挂起,除了上述的正常退出机制,退出机制还有:“设定的超时时间到期,返回false”或者“线程被中断,抛出InterruptedException异常”
- 当计数值已减为0,调用countDown()方法没有任何效果,调用await()/await(long timeout, TimeUnit unit)方法立即成功返回
- 不支持复位计数值进行新一轮使用
一般用法使用示例:
| 1 | import java.util.concurrent.CountDownLatch; | 
一种运行结果:
| 1 | 3 | 
“1 2 3”和“4 5”的打印顺序不定,但是“1 2 3”的打印必定先于“4 5”。这个必定先于基于后续的“1.3、happens-before关系”小节,而不是根据“3个countDown()没执行之前,await()挂起,所以1 2 3在4 5之前”,因为“1 2 3理论上都可能重排序到countDown()后面,4 5先执行再执行1 2 3”。
1.2、源码实现
核心基于AQS实现,AQS的state字段被赋值为所传入的count值:
- 调用countDown()方法,转发调用AQS的releaseShared(1)方法,见CountDownLatch.Sync中的tryReleaseShared(int releases)方法,可知这里的重载实现逻辑是:字段值减1,当减为0时,唤醒等待队列中的所有线程
- 调用await()方法,转发调用AQS的acquireSharedInterruptibly(1)方法,见CountDownLatch.Sync中的tryAcquireShared(int acquires)方法,可知这里的重载实现逻辑是:只有当state字段值为0时,表示成功;否则失败,挂起排队
- 调用await(long timeout, TimeUnit unit)方法,转发调用AQS的tryAcquireSharedNanos(1, unit.toNanos(timeout))方法,见CountDownLatch.Sync中的tryAcquireShared(int acquires)方法,可知这里的重载实现逻辑是:只有当state字段值为0时,表示成功;否则失败,挂起排队
1.3、happens-before关系
存在一个happens-before关系:对于前count次对state字段成功减1的countDown()方法调用,其前面的操作 happens-before 于任何await()/await(long timeout, TimeUnit unit)方法调用成功返回后的操作。
JavaDoc原叙述是:Memory consistency effects: Until the count reaches zero, actions in a thread prior to calling countDown() happen-before actions following a successful return from a corresponding await() in another thread.
几点说明:
- 当state字段值为1时,并发调用两个countDown()方法,分别以A和B代指,假如A成功减1,那么此时B必然不能成功减1,此时A属于上述叙述中的countDown()方法调用,B不属于上述叙述中的countDown()方法调用
- 上述叙述中的“成功返回”指的是:state字段值为0返回,而不是设定的超时时间到期返回或者中断返回
- 除了上述情形,“调用countDown()方法之前的操作”和“调用await()/await(long timeout, TimeUnit unit)方法之后的操作”两者之间的happens-before关系不必然,须具体情况具体分析
- 主要基于volatile变量happens-before规则推导上述happens-before关系,state字段是一个volatile变量。详细证明可参见[1]
二、CyclicBarrier
2.1、语义和使用示例
对于CyclicBarrier实例对象,构造时传入一个计数值int parties,其使用有一个“轮”概念:
- 一般用法,调用await()/await(long timeout, TimeUnit unit)方法,对本轮计数值减1,挂起直到“本轮计数值减为0”。本轮结束后会进入新一轮
- 在调用await()/await(long timeout, TimeUnit unit)方法时,除了上述正常退出机制,异常退出机制有:- 线程被中断,抛出InterruptedException异常,本轮被置为非法态
- 设定的超时时间到期,抛出TimeoutException异常,本轮被置为非法态
- 本轮已被置为非法态,则抛出BrokenBarrierException异常
 
- 支持复位计数值进行新一轮使用
- 构造方法还支持传入一个Runnable barrierAction,在一轮中它只被调用执行1次,它的调用执行时机为:假定将本轮计数值减为0的执行await()/await(long timeout, TimeUnit unit)方法的线程为T,在T中,将本轮计数值减为0后,调用执行barrierAction的run()方法
一般用法使用示例:
| 1 | import java.util.ArrayList; | 
一种运行结果:
| 1 | ID-[0] before | 
“ID-[0] before,ID-[1] before,ID-[2] before,ID-[3] before,ID-[4] before”的打印顺序不定,“ID-[0] after,ID-[1] after,ID-[2] after,ID-[3] after,ID-[4] after”的打印顺序也不定,但是有happens-before关系链:“ID-[0~4] before” –hb–> “hello world” –hb–> “ID-[0~4] after”。这个关系链基于后续的“2.3、happens-before关系”小节。
2.2、源码实现
基于ReentrantLock锁和其关联的Condition实例对象实现,核心方法是dowait(boolean timed, long nanos)。
详见CyclicBarrier.java源码文件内对dowait(boolean timed, long nanos)方法的注释。
2.3、happens-before关系
存在happens-before关系:
- 一般使用情形中,即不抛出异常,包括“中断”和“超时”。当执行await()/await(long timeout, TimeUnit unit)方法的线程数量等于所传入的parties时,有一个happens-before关系链:所有await()/await(long timeout, TimeUnit unit)方法之前的操作 –hb–> barrierAction实例对象的run()方法(如果barrierAction实例对象存在) –hb–> 所有await()/await(long timeout, TimeUnit unit)方法之后的操作
- 在异常使用情形中,即会抛出异常,包括“中断”和“超时”。上述三者的happens-before关系不必然,须具体情况具体分析
JavaDoc原叙述是:Actions in a thread prior to calling await() happen-before actions that are part of the barrier action, which in turn happen-before actions following a successful return from the corresponding await() in other threads。
证明过程如下。
1、一般情形
先做几个假定:
- 在dowait(boolean timed, long nanos)核心方法中,lock.lock();处和lock.unlock();处的申请锁和释放锁操作分别以a和b代指,trip.await();处或者nanos = trip.awaitNanos(nanos);处所隐含的释放锁和申请锁操作分别以B和A代指。需要注意的是,在《AQS》中我们知道,调用await()/await(long time, TimeUnit unit)/awaitNano(long nanosTimeout)/awaitUntil(Date deadline)后,如果立即中断,则不会进行隐含的释放锁和申请锁操作,但是这里是一般情形,无需考虑该种可能
- N=传入的parties,- N>=1
- 线程数为TN
- 线程Tx调用await()/await(long timeout, TimeUnit unit)方法之前的操作以“Tx-ActionBefore”代指,之后的操作以“Tx-ActionAfter”代指
- 传入了barrierAction实例对象,以P代指
证明如下:
- 当N=1时,- 如果TN<N,即TN=0,无意义
- 如果TN=N,即TN=1,根据“程序顺序”happens-before规则,直接可证
- 如果TN>N,每个调用都是新一轮,每轮根据“程序顺序”happens-before规则,简单可证
 
- 如果
- 当N>1时,- 如果TN<N,如果TN=0,无意义;如果TN>0,则await()/await(long timeout, TimeUnit unit)方法调用恒挂起
- 如果TN=N,假定第一个在a获得锁的线程为T0,执行第N次int index = --count;代码的线程为Tn,则至此的申请锁和释放锁动作过程必然类似于T0a T0B Tpa TpB Tqa TqB ... Tna(如果考虑虚假唤醒,可能还存在形如TjB TkA过程,不影响证明,况且ReentrantLock类关联的Condition实例对象是AQS中的ConditionObject,实现不会出现虚假唤醒[2]),Tn继续执行,执行P,最后直到在Tnb释放锁,后续的申请锁和释放锁动作必然类似于TgA Tgb ThA Thb TiA Tib ...。现在对于任意两个线程Te和Tr,基于3个happens-before规则——“程序顺序”,“监视器锁”和“传递性”,结合上述锁申请和释放动作序列,总能构造出“Te-ActionBefore –hb–> P –hb–> Tr-ActionAfter”和“Tr-ActionBefore –hb–> P –hb–> Te-ActionAfter”关系链,故此得证
- 如果TN>N,参照TN=N叙述,当Tn执行后,除了即将被唤醒的N-1个线程,其他线程执行进入新一轮,递归下去最后必然落于TN<N或者TN=N情形
 
- 如果
2、异常情形
在异常情形下没有必然的happens-before关系,须具体情况具体分析。
比如:
- 当TN=N>1,有线程T1和T2,T1调用await()/await(long timeout, TimeUnit unit)方法时直接抛出异常,T1退出上述方法后,T2才运行,根据3个happens-before规则——“程序顺序”,“监视器锁”和“传递性”,有“T1-ActionBefore –hb–> T2-ActionAfter”,没有“T2-ActionBefore –hb–> T1-ActionAfter”,而且此时P未被调用执行
三、Semaphore
3.1、语义和使用示例
对于Semaphore实例对象,构造时传入一个计数值int permits表征该信号量的许可证数量,构造时还允许传入boolean fair表征采用“非公平/公平”分配策略,默认是“非公平”分配策略:
- 一般用法:调用acquire(),acquire(int permits),acquireUninterruptibly(),acquireUninterruptibly(int permits)方法排队挂起直到获得申请的1/permits个许可证;调用release(),release(int permits)方法释放1/permits个许可证
- 调用acquire()和acquire(int permits)方法除了上述正常的退出机制,还有退出机制:“线程被中断,抛出InterruptedException异常”;调用acquireUninterruptibly()和acquireUninterruptibly(int permits)方法只有上述正常的退出机制,没有“响应中断”退出机制
- 调用tryAcquire()和tryAcquire(int permits)方法尝试获取1/permits个许可证,无论获取成功与否,直接返回,获取时的策略恒为“非公平”策略,而不管生成Semaphore实例对象时选取的分配策略;调用tryAcquire(long timeout, TimeUnit unit)和tryAcquire(int permits, long timeout, TimeUnit unit)方法尝试获取1/permits个许可证,获取时的策略为生成Semaphore实例对象时选取的分配策略,成功则立即返回;否则排队挂起直到获取成功,或者“设定的超时时间到期”,或者“响应中断”
一般用法使用示例:
| 1 | import java.util.concurrent.Semaphore; | 
一种运行结果:
| 1 | 初始化:当前有0个并发 | 
3.2、源码实现
核心基于AQS实现:AQS的state字段被赋值为所传入的permits值:
- 调用acquire()方法,转发调用AQS的acquireSharedInterruptibly(1)方法,然后分为两种情形:- “公平策略”情形。判断是否获取成功的核心方法是Semaphore.FairSync中的tryAcquireShared(int acquires)方法,可知这里的重载实现逻辑是:当state字段值大于所需的许可证数量,表示成功;否则失败,挂起排队
- “非公平策略”情形。判断是否获取成功的核心方法是Semaphore.NonfairSync中的tryAcquireShared(int acquires)方法,然后转发调用Semaphore.Sync中的nonfairTryAcquireShared(int acquires)方法,可知这里的重载实现逻辑是:当state字段值大于所需的许可证数量,表示成功;否则失败,挂起排队
 
- “公平策略”情形。判断是否获取成功的核心方法是Semaphore.FairSync中的
- 调用release()方法,转发调用AQS的releaseShared(1)方法,没有“公平/非公平”策略之分,最后都转发到Semaphore.Sync中的tryReleaseShared(int releases)方法,可知这里的重载实现逻辑是:state字段值增加归还的许可证数量值
- acquire(int permits),- acquireUninterruptibly(),- acquireUninterruptibly(int permits),- release(int permits),- tryAcquire(),- tryAcquire(int permits),- tryAcquire(long timeout, TimeUnit unit)和- tryAcquire(int permits, long timeout, TimeUnit unit)方法的调用也都是转发到AQS,这里略
3.3、happens-before关系
release和acquire/acquireUninterruptibly/tryAcquire两类之间没有必然的happens-before关系,接下来讨论几种具体情形下的happens-before关系。
在许可证耗尽前提下,此时两类的happens-before关系分为3类:
- 对于release()/release(int permits)和acquire()/acquire(int permits)/acquireUninterruptibly()/acquireUninterruptibly(int permits),- acquire()/acquire(int permits)是正常退出而不是中断退出,- acquireUninterruptibly()/acquireUninterruptibly(int permits)只能是正常退出而不允许是中断退出。此时,调用- release()/release(int permits)方法之前的操作 happens-before 于调用- acquire()/acquire(int permits)/acquireUninterruptibly()/acquireUninterruptibly(int permits)方法之后的操作。JavaDoc中的原叙述- Memory consistency effects: Actions in a thread prior to calling a "release" method such as release() happen-before actions following a successful "acquire" method such as acquire() in another thread其实指的就是这种情形
- acquire()/acquire(int permits)是中断退出。此时,两类之间没有必然的happens-before关系,须具体情况具体分析
 
- 对于release()/release(int permits)和tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit),- tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit)是正常退出而不是中断或者超时退出。此时,调用- release()/release(int permits)方法之前的操作 happens-before 于调用- tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit)方法之后的操作
- tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit)是中断或者超时退出。此时,两类之间没有必然的happens-before关系,须具体情况具体分析
 
- 对于release()/release(int permits)和tryAcquire()/tryAcquire(int permits)。此时,两类之间没有必然的happens-before关系,须具体情况具体分析
一点说明:
- 主要基于volatile变量happens-before规则推导上述happens-before关系,state字段是一个volatile变量。详细证明可参见[1]
四、Exchanger
4.1、语义和使用示例
对于Exchanger实例对象:
- 一般用法:两个线程调用exchange()方法,完成数据交互
- 允许多余2个的线程调用exchange()方法,但最终必然是一对线程完成真正的数据交换
一般用法使用示例:
| 1 | import java.util.concurrent.Exchanger; | 
一种程序运行结果:
| 1 | T1正在把数据 Hello 交换出去 | 
“//T1-1 //T2-1”和“//T1-3 //T2-3”的执行顺序不定,但是“//T1-1 //T2-1”的执行必定先于“//T1-3 //T2-3”。这个必定先于基于后续的“4.3、happens-before关系”小节,而不是根据“在T1-2和T2-2没同时执行完之前,两者阻塞,故T1-1和T2-1先执行,再执行T1-2和T2-2,最后执行T1-3和T2-3”,因为“//1理论上可能重排序到//3后面,最后先执行T1-3和T2-3,再执行T1-1和T2-1”。
4.2、源码实现
核心流程是:
- 并发竞争不激烈时,使用Node slot作为交换数据槽
- 并发竞争激烈时,使用Node[] arena作为交换数据槽集合
对应核心流程的核心方法是slotExchange()和arenaExchange()。
详见Exchanger.java源码文件内对slotExchange()和arenaExchange()方法的注释说明。
4.3、happens-before关系
对于执行exchange方法的两个线程T1和T2,两者之间存在happens-before关系:
- T1和T2的exchange不配对,两者之间没有必然的happens-before关系,须具体情况具体分析
- T1和T2的exchange配对,- 一般使用情形中,T1和T2成功返回,而不是“中断返回”或者“超时返回”,此时有:所有exchange方法之前的操作 happens-before 于所有exchange方法之后的操作。JavaDoc原叙述For each pair of threads that successfully exchange objects via an Exchanger, actions prior to the exchange() in each thread happen-before those subsequent to a return from the corresponding exchange() in the other thread其实指的就是这种情形
- 在异常使用情形中,至少1个线程是非成功返回,而是“中断返回”或者“超时返回”,此时T1和T2的配对关系不再成立,两者之间没有必然的happens-before关系,须具体情况具体分析
 
- 一般使用情形中,T1和T2成功返回,而不是“中断返回”或者“超时返回”,此时有:所有
一点说明:
- 主要基于volatile变量happens-before规则推导上述happens-before关系,对槽元素的CAS操作具有volatile变量内存语义[3],Node.match字段是一个volatile变量,对其的读写当然具有volatile变量语义
参考文献
[1]《Lock接口》博文“2.1.2、源码实现”小节中对于ReentrantLock锁的happens-before规则证明
[2]《AQS》博文“2、‘基于此处AQS实现的高级排他锁’情形”小节内容
[3]《Unsafe类》
