一、CountDownLatch
1.1、语义和使用示例
对于CountDownLatch实例对象,构造时传入一个计数值int count
:
- 一般用法:调用
await()
和await(long timeout, TimeUnit unit)
方法排队挂起,直到调用count
次countDown()
方法(每次调用countDown()
方法,都会对计数值减1) - 调用
await()
方法排队挂起,除了上述的正常退出机制,退出机制还有:“线程被中断,抛出InterruptedException异常”;调用await(long timeout, TimeUnit unit)
方法排队挂起,除了上述的正常退出机制,退出机制还有:“设定的超时时间到期,返回false”或者“线程被中断,抛出InterruptedException异常” - 当计数值已减为0,调用
countDown()
方法没有任何效果,调用await()/await(long timeout, TimeUnit unit)
方法立即成功返回 - 不支持复位计数值进行新一轮使用
一般用法使用示例:
1 | import java.util.concurrent.CountDownLatch; |
一种运行结果:
1 | 3 |
“1 2 3”和“4 5”的打印顺序不定,但是“1 2 3”的打印必定先于“4 5”。这个必定先于基于后续的“1.3、happens-before关系”小节,而不是根据“3个countDown()没执行之前,await()挂起,所以1 2 3在4 5之前”,因为“1 2 3理论上都可能重排序到countDown()后面,4 5先执行再执行1 2 3”。
1.2、源码实现
核心基于AQS实现,AQS的state
字段被赋值为所传入的count
值:
- 调用
countDown()
方法,转发调用AQS的releaseShared(1)
方法,见CountDownLatch.Sync中的tryReleaseShared(int releases)
方法,可知这里的重载实现逻辑是:字段值减1,当减为0时,唤醒等待队列中的所有线程 - 调用
await()
方法,转发调用AQS的acquireSharedInterruptibly(1)
方法,见CountDownLatch.Sync中的tryAcquireShared(int acquires)
方法,可知这里的重载实现逻辑是:只有当state
字段值为0时,表示成功;否则失败,挂起排队 - 调用
await(long timeout, TimeUnit unit)
方法,转发调用AQS的tryAcquireSharedNanos(1, unit.toNanos(timeout))
方法,见CountDownLatch.Sync中的tryAcquireShared(int acquires)
方法,可知这里的重载实现逻辑是:只有当state
字段值为0时,表示成功;否则失败,挂起排队
1.3、happens-before关系
存在一个happens-before关系:对于前count
次对state
字段成功减1的countDown()
方法调用,其前面的操作 happens-before 于任何await()/await(long timeout, TimeUnit unit)
方法调用成功返回后的操作。
JavaDoc原叙述是:Memory consistency effects: Until the count reaches zero, actions in a thread prior to calling countDown() happen-before actions following a successful return from a corresponding await() in another thread.
几点说明:
- 当
state
字段值为1时,并发调用两个countDown()
方法,分别以A和B代指,假如A成功减1,那么此时B必然不能成功减1,此时A属于上述叙述中的countDown()
方法调用,B不属于上述叙述中的countDown()
方法调用 - 上述叙述中的“成功返回”指的是:
state
字段值为0返回,而不是设定的超时时间到期返回或者中断返回 - 除了上述情形,“调用
countDown()
方法之前的操作”和“调用await()/await(long timeout, TimeUnit unit)
方法之后的操作”两者之间的happens-before关系不必然,须具体情况具体分析 - 主要基于
volatile变量
happens-before规则推导上述happens-before关系,state
字段是一个volatile变量。详细证明可参见[1]
二、CyclicBarrier
2.1、语义和使用示例
对于CyclicBarrier实例对象,构造时传入一个计数值int parties
,其使用有一个“轮”概念:
- 一般用法,调用
await()/await(long timeout, TimeUnit unit)
方法,对本轮计数值减1,挂起直到“本轮计数值减为0”。本轮结束后会进入新一轮 - 在调用
await()/await(long timeout, TimeUnit unit)
方法时,除了上述正常退出机制,异常退出机制有:- 线程被中断,抛出InterruptedException异常,本轮被置为非法态
- 设定的超时时间到期,抛出TimeoutException异常,本轮被置为非法态
- 本轮已被置为非法态,则抛出BrokenBarrierException异常
- 支持复位计数值进行新一轮使用
- 构造方法还支持传入一个
Runnable barrierAction
,在一轮中它只被调用执行1次,它的调用执行时机为:假定将本轮计数值减为0的执行await()/await(long timeout, TimeUnit unit)
方法的线程为T,在T中,将本轮计数值减为0后,调用执行barrierAction
的run()
方法
一般用法使用示例:
1 | import java.util.ArrayList; |
一种运行结果:
1 | ID-[0] before |
“ID-[0] before,ID-[1] before,ID-[2] before,ID-[3] before,ID-[4] before”的打印顺序不定,“ID-[0] after,ID-[1] after,ID-[2] after,ID-[3] after,ID-[4] after”的打印顺序也不定,但是有happens-before关系链:“ID-[0~4] before” –hb–> “hello world” –hb–> “ID-[0~4] after”。这个关系链基于后续的“2.3、happens-before关系”小节。
2.2、源码实现
基于ReentrantLock锁和其关联的Condition实例对象实现,核心方法是dowait(boolean timed, long nanos)
。
详见CyclicBarrier.java源码文件内对dowait(boolean timed, long nanos)
方法的注释。
2.3、happens-before关系
存在happens-before关系:
- 一般使用情形中,即不抛出异常,包括“中断”和“超时”。当执行
await()/await(long timeout, TimeUnit unit)
方法的线程数量等于所传入的parties
时,有一个happens-before关系链:所有await()/await(long timeout, TimeUnit unit)
方法之前的操作 –hb–> barrierAction实例对象的run()
方法(如果barrierAction实例对象存在) –hb–> 所有await()/await(long timeout, TimeUnit unit)
方法之后的操作 - 在异常使用情形中,即会抛出异常,包括“中断”和“超时”。上述三者的happens-before关系不必然,须具体情况具体分析
JavaDoc原叙述是:Actions in a thread prior to calling await() happen-before actions that are part of the barrier action, which in turn happen-before actions following a successful return from the corresponding await() in other threads
。
证明过程如下。
1、一般情形
先做几个假定:
- 在
dowait(boolean timed, long nanos)
核心方法中,lock.lock();
处和lock.unlock();
处的申请锁和释放锁操作分别以a和b代指,trip.await();
处或者nanos = trip.awaitNanos(nanos);
处所隐含的释放锁和申请锁操作分别以B和A代指。需要注意的是,在《AQS》中我们知道,调用await()/await(long time, TimeUnit unit)/awaitNano(long nanosTimeout)/awaitUntil(Date deadline)
后,如果立即中断,则不会进行隐含的释放锁和申请锁操作,但是这里是一般情形,无需考虑该种可能 N=传入的parties
,N>=1
- 线程数为TN
- 线程Tx调用
await()/await(long timeout, TimeUnit unit)
方法之前的操作以“Tx-ActionBefore”代指,之后的操作以“Tx-ActionAfter”代指 - 传入了
barrierAction
实例对象,以P代指
证明如下:
- 当
N=1
时,- 如果
TN<N
,即TN=0
,无意义 - 如果
TN=N
,即TN=1
,根据“程序顺序”happens-before规则,直接可证 - 如果
TN>N
,每个调用都是新一轮,每轮根据“程序顺序”happens-before规则,简单可证
- 如果
- 当
N>1
时,- 如果
TN<N
,如果TN=0
,无意义;如果TN>0
,则await()/await(long timeout, TimeUnit unit)
方法调用恒挂起 - 如果
TN=N
,假定第一个在a获得锁的线程为T0,执行第N次int index = --count;
代码的线程为Tn,则至此的申请锁和释放锁动作过程必然类似于T0a T0B Tpa TpB Tqa TqB ... Tna
(如果考虑虚假唤醒,可能还存在形如TjB TkA
过程,不影响证明,况且ReentrantLock类关联的Condition实例对象是AQS中的ConditionObject,实现不会出现虚假唤醒[2]),Tn继续执行,执行P,最后直到在Tnb
释放锁,后续的申请锁和释放锁动作必然类似于TgA Tgb ThA Thb TiA Tib ...
。现在对于任意两个线程Te和Tr,基于3个happens-before规则——“程序顺序”,“监视器锁”和“传递性”,结合上述锁申请和释放动作序列,总能构造出“Te-ActionBefore –hb–> P –hb–> Tr-ActionAfter”和“Tr-ActionBefore –hb–> P –hb–> Te-ActionAfter”关系链,故此得证 - 如果
TN>N
,参照TN=N
叙述,当Tn执行后,除了即将被唤醒的N-1个线程,其他线程执行进入新一轮,递归下去最后必然落于TN<N
或者TN=N
情形
- 如果
2、异常情形
在异常情形下没有必然的happens-before关系,须具体情况具体分析。
比如:
- 当
TN=N>1
,有线程T1和T2,T1调用await()/await(long timeout, TimeUnit unit)
方法时直接抛出异常,T1退出上述方法后,T2才运行,根据3个happens-before规则——“程序顺序”,“监视器锁”和“传递性”,有“T1-ActionBefore –hb–> T2-ActionAfter”,没有“T2-ActionBefore –hb–> T1-ActionAfter”,而且此时P未被调用执行
三、Semaphore
3.1、语义和使用示例
对于Semaphore实例对象,构造时传入一个计数值int permits
表征该信号量的许可证数量,构造时还允许传入boolean fair
表征采用“非公平/公平”分配策略,默认是“非公平”分配策略:
- 一般用法:调用
acquire()
,acquire(int permits)
,acquireUninterruptibly()
,acquireUninterruptibly(int permits)
方法排队挂起直到获得申请的1/permits
个许可证;调用release()
,release(int permits)
方法释放1/permits
个许可证 - 调用
acquire()
和acquire(int permits)
方法除了上述正常的退出机制,还有退出机制:“线程被中断,抛出InterruptedException异常”;调用acquireUninterruptibly()
和acquireUninterruptibly(int permits)
方法只有上述正常的退出机制,没有“响应中断”退出机制 - 调用
tryAcquire()
和tryAcquire(int permits)
方法尝试获取1/permits
个许可证,无论获取成功与否,直接返回,获取时的策略恒为“非公平”策略,而不管生成Semaphore实例对象时选取的分配策略;调用tryAcquire(long timeout, TimeUnit unit)
和tryAcquire(int permits, long timeout, TimeUnit unit)
方法尝试获取1/permits
个许可证,获取时的策略为生成Semaphore实例对象时选取的分配策略,成功则立即返回;否则排队挂起直到获取成功,或者“设定的超时时间到期”,或者“响应中断”
一般用法使用示例:
1 | import java.util.concurrent.Semaphore; |
一种运行结果:
1 | 初始化:当前有0个并发 |
3.2、源码实现
核心基于AQS实现:AQS的state
字段被赋值为所传入的permits
值:
- 调用
acquire()
方法,转发调用AQS的acquireSharedInterruptibly(1)
方法,然后分为两种情形:- “公平策略”情形。判断是否获取成功的核心方法是Semaphore.FairSync中的
tryAcquireShared(int acquires)
方法,可知这里的重载实现逻辑是:当state
字段值大于所需的许可证数量,表示成功;否则失败,挂起排队 - “非公平策略”情形。判断是否获取成功的核心方法是Semaphore.NonfairSync中的
tryAcquireShared(int acquires)
方法,然后转发调用Semaphore.Sync中的nonfairTryAcquireShared(int acquires)
方法,可知这里的重载实现逻辑是:当state
字段值大于所需的许可证数量,表示成功;否则失败,挂起排队
- “公平策略”情形。判断是否获取成功的核心方法是Semaphore.FairSync中的
- 调用
release()
方法,转发调用AQS的releaseShared(1)
方法,没有“公平/非公平”策略之分,最后都转发到Semaphore.Sync中的tryReleaseShared(int releases)
方法,可知这里的重载实现逻辑是:state
字段值增加归还的许可证数量值 acquire(int permits)
,acquireUninterruptibly()
,acquireUninterruptibly(int permits)
,release(int permits)
,tryAcquire()
,tryAcquire(int permits)
,tryAcquire(long timeout, TimeUnit unit)
和tryAcquire(int permits, long timeout, TimeUnit unit)
方法的调用也都是转发到AQS,这里略
3.3、happens-before关系
release
和acquire/acquireUninterruptibly/tryAcquire
两类之间没有必然的happens-before关系,接下来讨论几种具体情形下的happens-before关系。
在许可证耗尽前提下,此时两类的happens-before关系分为3类:
- 对于
release()/release(int permits)
和acquire()/acquire(int permits)/acquireUninterruptibly()/acquireUninterruptibly(int permits)
,acquire()/acquire(int permits)
是正常退出而不是中断退出,acquireUninterruptibly()/acquireUninterruptibly(int permits)
只能是正常退出而不允许是中断退出。此时,调用release()/release(int permits)
方法之前的操作 happens-before 于调用acquire()/acquire(int permits)/acquireUninterruptibly()/acquireUninterruptibly(int permits)
方法之后的操作。JavaDoc中的原叙述Memory consistency effects: Actions in a thread prior to calling a "release" method such as release() happen-before actions following a successful "acquire" method such as acquire() in another thread
其实指的就是这种情形acquire()/acquire(int permits)
是中断退出。此时,两类之间没有必然的happens-before关系,须具体情况具体分析
- 对于
release()/release(int permits)
和tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit)
,tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit)
是正常退出而不是中断或者超时退出。此时,调用release()/release(int permits)
方法之前的操作 happens-before 于调用tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit)
方法之后的操作tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit)
是中断或者超时退出。此时,两类之间没有必然的happens-before关系,须具体情况具体分析
- 对于
release()/release(int permits)
和tryAcquire()/tryAcquire(int permits)
。此时,两类之间没有必然的happens-before关系,须具体情况具体分析
一点说明:
- 主要基于
volatile变量
happens-before规则推导上述happens-before关系,state
字段是一个volatile变量。详细证明可参见[1]
四、Exchanger
4.1、语义和使用示例
对于Exchanger实例对象:
- 一般用法:两个线程调用
exchange()
方法,完成数据交互 - 允许多余2个的线程调用
exchange()
方法,但最终必然是一对线程完成真正的数据交换
一般用法使用示例:
1 | import java.util.concurrent.Exchanger; |
一种程序运行结果:
1 | T1正在把数据 Hello 交换出去 |
“//T1-1 //T2-1”和“//T1-3 //T2-3”的执行顺序不定,但是“//T1-1 //T2-1”的执行必定先于“//T1-3 //T2-3”。这个必定先于基于后续的“4.3、happens-before关系”小节,而不是根据“在T1-2和T2-2没同时执行完之前,两者阻塞,故T1-1和T2-1先执行,再执行T1-2和T2-2,最后执行T1-3和T2-3”,因为“//1理论上可能重排序到//3后面,最后先执行T1-3和T2-3,再执行T1-1和T2-1”。
4.2、源码实现
核心流程是:
- 并发竞争不激烈时,使用
Node slot
作为交换数据槽 - 并发竞争激烈时,使用
Node[] arena
作为交换数据槽集合
对应核心流程的核心方法是slotExchange()
和arenaExchange()
。
详见Exchanger.java源码文件内对slotExchange()
和arenaExchange()
方法的注释说明。
4.3、happens-before关系
对于执行exchange
方法的两个线程T1和T2,两者之间存在happens-before关系:
- T1和T2的
exchange
不配对,两者之间没有必然的happens-before关系,须具体情况具体分析 - T1和T2的
exchange
配对,- 一般使用情形中,T1和T2成功返回,而不是“中断返回”或者“超时返回”,此时有:所有
exchange
方法之前的操作 happens-before 于所有exchange
方法之后的操作。JavaDoc原叙述For each pair of threads that successfully exchange objects via an Exchanger, actions prior to the exchange() in each thread happen-before those subsequent to a return from the corresponding exchange() in the other thread
其实指的就是这种情形 - 在异常使用情形中,至少1个线程是非成功返回,而是“中断返回”或者“超时返回”,此时T1和T2的配对关系不再成立,两者之间没有必然的happens-before关系,须具体情况具体分析
- 一般使用情形中,T1和T2成功返回,而不是“中断返回”或者“超时返回”,此时有:所有
一点说明:
- 主要基于
volatile变量
happens-before规则推导上述happens-before关系,对槽元素的CAS操作具有volatile变量内存语义[3],Node.match
字段是一个volatile变量,对其的读写当然具有volatile变量语义
参考文献
[1]《Lock接口》博文“2.1.2、源码实现”小节中对于ReentrantLock锁的happens-before规则证明
[2]《AQS》博文“2、‘基于此处AQS实现的高级排他锁’情形”小节内容
[3]《Unsafe类》