0%

并发工具类

一、CountDownLatch

1.1、语义和使用示例

对于CountDownLatch实例对象,构造时传入一个计数值int count

  • 一般用法:调用await()await(long timeout, TimeUnit unit)方法排队挂起,直到调用countcountDown()方法(每次调用countDown()方法,都会对计数值减1)
  • 调用await()方法排队挂起,除了上述的正常退出机制,退出机制还有:“线程被中断,抛出InterruptedException异常”;调用await(long timeout, TimeUnit unit)方法排队挂起,除了上述的正常退出机制,退出机制还有:“设定的超时时间到期,返回false”或者“线程被中断,抛出InterruptedException异常”
  • 当计数值已减为0,调用countDown()方法没有任何效果,调用await()/await(long timeout, TimeUnit unit)方法立即成功返回
  • 不支持复位计数值进行新一轮使用

一般用法使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
static CountDownLatch c = new CountDownLatch(3);

public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("4");
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("5");
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();

System.out.println(2);
c.countDown();
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
System.out.println(3);
c.countDown();
}
}).start();
}
}

一种运行结果:

1
2
3
4
5
3
1
2
4
5

“1 2 3”和“4 5”的打印顺序不定,但是“1 2 3”的打印必定先于“4 5”。这个必定先于基于后续的“1.3、happens-before关系”小节,而不是根据“3个countDown()没执行之前,await()挂起,所以1 2 3在4 5之前”,因为“1 2 3理论上都可能重排序到countDown()后面,4 5先执行再执行1 2 3”。

1.2、源码实现

核心基于AQS实现,AQS的state字段被赋值为所传入的count值:

  • 调用countDown()方法,转发调用AQS的releaseShared(1)方法,见CountDownLatch.Sync中的tryReleaseShared(int releases)方法,可知这里的重载实现逻辑是:字段值减1,当减为0时,唤醒等待队列中的所有线程
  • 调用await()方法,转发调用AQS的acquireSharedInterruptibly(1)方法,见CountDownLatch.Sync中的tryAcquireShared(int acquires)方法,可知这里的重载实现逻辑是:只有当state字段值为0时,表示成功;否则失败,挂起排队
  • 调用await(long timeout, TimeUnit unit)方法,转发调用AQS的tryAcquireSharedNanos(1, unit.toNanos(timeout))方法,见CountDownLatch.Sync中的tryAcquireShared(int acquires)方法,可知这里的重载实现逻辑是:只有当state字段值为0时,表示成功;否则失败,挂起排队

1.3、happens-before关系

存在一个happens-before关系:对于前count次对state字段成功减1的countDown()方法调用,其前面的操作 happens-before 于任何await()/await(long timeout, TimeUnit unit)方法调用成功返回后的操作。

JavaDoc原叙述是:Memory consistency effects: Until the count reaches zero, actions in a thread prior to calling countDown() happen-before actions following a successful return from a corresponding await() in another thread.

几点说明:

  • state字段值为1时,并发调用两个countDown()方法,分别以A和B代指,假如A成功减1,那么此时B必然不能成功减1,此时A属于上述叙述中的countDown()方法调用,B不属于上述叙述中的countDown()方法调用
  • 上述叙述中的“成功返回”指的是:state字段值为0返回,而不是设定的超时时间到期返回或者中断返回
  • 除了上述情形,“调用countDown()方法之前的操作”和“调用await()/await(long timeout, TimeUnit unit)方法之后的操作”两者之间的happens-before关系不必然,须具体情况具体分析
  • 主要基于volatile变量happens-before规则推导上述happens-before关系,state字段是一个volatile变量。详细证明可参见[1]

二、CyclicBarrier

2.1、语义和使用示例

对于CyclicBarrier实例对象,构造时传入一个计数值int parties,其使用有一个“轮”概念:

  • 一般用法,调用await()/await(long timeout, TimeUnit unit)方法,对本轮计数值减1,挂起直到“本轮计数值减为0”。本轮结束后会进入新一轮
  • 在调用await()/await(long timeout, TimeUnit unit)方法时,除了上述正常退出机制,异常退出机制有:
    • 线程被中断,抛出InterruptedException异常,本轮被置为非法态
    • 设定的超时时间到期,抛出TimeoutException异常,本轮被置为非法态
    • 本轮已被置为非法态,则抛出BrokenBarrierException异常
  • 支持复位计数值进行新一轮使用
  • 构造方法还支持传入一个Runnable barrierAction,在一轮中它只被调用执行1次,它的调用执行时机为:假定将本轮计数值减为0的执行await()/await(long timeout, TimeUnit unit)方法的线程为T,在T中,将本轮计数值减为0后,调用执行barrierActionrun()方法

一般用法使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
static CyclicBarrier barrier;

public static void main(String[] args) {
int N = 5;

barrier = new CyclicBarrier(N, new Runnable() {
@Override
public void run() {
System.out.println("hello world");
}
});

List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
}

static class Worker implements Runnable {

int id;

public Worker(int id) {
this.id = id;
}

public void run() {
System.out.println("ID-[" + id + "] before");

try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

System.out.println("ID-[" + id + "] after");
}
}
}

一种运行结果:

1
2
3
4
5
6
7
8
9
10
11
ID-[0] before
ID-[1] before
ID-[3] before
ID-[2] before
ID-[4] before
hello world
ID-[4] after
ID-[0] after
ID-[1] after
ID-[3] after
ID-[2] after

“ID-[0] before,ID-[1] before,ID-[2] before,ID-[3] before,ID-[4] before”的打印顺序不定,“ID-[0] after,ID-[1] after,ID-[2] after,ID-[3] after,ID-[4] after”的打印顺序也不定,但是有happens-before关系链:“ID-[0~4] before” –hb–> “hello world” –hb–> “ID-[0~4] after”。这个关系链基于后续的“2.3、happens-before关系”小节

2.2、源码实现

基于ReentrantLock锁和其关联的Condition实例对象实现,核心方法是dowait(boolean timed, long nanos)

详见CyclicBarrier.java源码文件内对dowait(boolean timed, long nanos)方法的注释。

2.3、happens-before关系

存在happens-before关系:

  • 一般使用情形中,即不抛出异常,包括“中断”和“超时”。当执行await()/await(long timeout, TimeUnit unit)方法的线程数量等于所传入的parties时,有一个happens-before关系链:所有await()/await(long timeout, TimeUnit unit)方法之前的操作 –hb–> barrierAction实例对象的run()方法(如果barrierAction实例对象存在) –hb–> 所有await()/await(long timeout, TimeUnit unit)方法之后的操作
  • 在异常使用情形中,即会抛出异常,包括“中断”和“超时”。上述三者的happens-before关系不必然,须具体情况具体分析

JavaDoc原叙述是:Actions in a thread prior to calling await() happen-before actions that are part of the barrier action, which in turn happen-before actions following a successful return from the corresponding await() in other threads

证明过程如下。
1、一般情形
先做几个假定:

  • dowait(boolean timed, long nanos)核心方法中,lock.lock();处和lock.unlock();处的申请锁和释放锁操作分别以a和b代指,trip.await();处或者nanos = trip.awaitNanos(nanos);处所隐含的释放锁和申请锁操作分别以B和A代指。需要注意的是,在《AQS》中我们知道,调用await()/await(long time, TimeUnit unit)/awaitNano(long nanosTimeout)/awaitUntil(Date deadline)后,如果立即中断,则不会进行隐含的释放锁和申请锁操作,但是这里是一般情形,无需考虑该种可能
  • N=传入的partiesN>=1
  • 线程数为TN
  • 线程Tx调用await()/await(long timeout, TimeUnit unit)方法之前的操作以“Tx-ActionBefore”代指,之后的操作以“Tx-ActionAfter”代指
  • 传入了barrierAction实例对象,以P代指

证明如下:

  1. N=1时,
    • 如果TN<N,即TN=0,无意义
    • 如果TN=N,即TN=1,根据“程序顺序”happens-before规则,直接可证
    • 如果TN>N,每个调用都是新一轮,每轮根据“程序顺序”happens-before规则,简单可证
  2. N>1时,
    • 如果TN<N,如果TN=0,无意义;如果TN>0,则await()/await(long timeout, TimeUnit unit)方法调用恒挂起
    • 如果TN=N,假定第一个在a获得锁的线程为T0,执行第N次int index = --count;代码的线程为Tn,则至此的申请锁和释放锁动作过程必然类似于T0a T0B Tpa TpB Tqa TqB ... Tna(如果考虑虚假唤醒,可能还存在形如TjB TkA过程,不影响证明,况且ReentrantLock类关联的Condition实例对象是AQS中的ConditionObject,实现不会出现虚假唤醒[2]),Tn继续执行,执行P,最后直到在Tnb释放锁,后续的申请锁和释放锁动作必然类似于TgA Tgb ThA Thb TiA Tib ...。现在对于任意两个线程Te和Tr,基于3个happens-before规则——“程序顺序”,“监视器锁”和“传递性”,结合上述锁申请和释放动作序列,总能构造出“Te-ActionBefore –hb–> P –hb–> Tr-ActionAfter”和“Tr-ActionBefore –hb–> P –hb–> Te-ActionAfter”关系链,故此得证
    • 如果TN>N,参照TN=N叙述,当Tn执行后,除了即将被唤醒的N-1个线程,其他线程执行进入新一轮,递归下去最后必然落于TN<N或者TN=N情形

2、异常情形
在异常情形下没有必然的happens-before关系,须具体情况具体分析。
比如:

  • TN=N>1,有线程T1和T2,T1调用await()/await(long timeout, TimeUnit unit)方法时直接抛出异常,T1退出上述方法后,T2才运行,根据3个happens-before规则——“程序顺序”,“监视器锁”和“传递性”,有“T1-ActionBefore –hb–> T2-ActionAfter”,没有“T2-ActionBefore –hb–> T1-ActionAfter”,而且此时P未被调用执行

三、Semaphore

3.1、语义和使用示例

对于Semaphore实例对象,构造时传入一个计数值int permits表征该信号量的许可证数量,构造时还允许传入boolean fair表征采用“非公平/公平”分配策略,默认是“非公平”分配策略:

  • 一般用法:调用acquire()acquire(int permits)acquireUninterruptibly()acquireUninterruptibly(int permits)方法排队挂起直到获得申请的1/permits个许可证;调用release()release(int permits)方法释放1/permits个许可证
  • 调用acquire()acquire(int permits)方法除了上述正常的退出机制,还有退出机制:“线程被中断,抛出InterruptedException异常”;调用acquireUninterruptibly()acquireUninterruptibly(int permits)方法只有上述正常的退出机制,没有“响应中断”退出机制
  • 调用tryAcquire()tryAcquire(int permits)方法尝试获取1/permits个许可证,无论获取成功与否,直接返回,获取时的策略恒为“非公平”策略,而不管生成Semaphore实例对象时选取的分配策略;调用tryAcquire(long timeout, TimeUnit unit)tryAcquire(int permits, long timeout, TimeUnit unit)方法尝试获取1/permits个许可证,获取时的策略为生成Semaphore实例对象时选取的分配策略,成功则立即返回;否则排队挂起直到获取成功,或者“设定的超时时间到期”,或者“响应中断”

一般用法使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.Semaphore;

public class SemaphoreExample {

public static void main(String[] args) {

final Semaphore sp = new Semaphore(3);

System.out.println("初始化:当前有" + (3 - sp.availablePermits() + "个并发"));

for (int index = 0; index < 10; index++) {
final int NO = index;

Runnable run = new Runnable() {
public void run() {
try {
// 获取1个许可证
sp.acquire();

System.out.println(Thread.currentThread().getName() + "获取许可证" + ",剩余:" + sp.availablePermits());

Thread.sleep(1000);

// 释放1个许可证
sp.release();

System.out.println(Thread.currentThread().getName() + "释放许可证" + ",剩余:" + sp.availablePermits());
} catch (InterruptedException e) {
}
}
};

new Thread(run, "thread-" + index).start();
}
}
}

一种运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
初始化:当前有0个并发
thread-0获取许可证,剩余:2
thread-1获取许可证,剩余:1
thread-2获取许可证,剩余:0
thread-0释放许可证,剩余:1
thread-3获取许可证,剩余:0
thread-1释放许可证,剩余:1
thread-4获取许可证,剩余:0
thread-2释放许可证,剩余:1
thread-6获取许可证,剩余:0
thread-3释放许可证,剩余:1
thread-7获取许可证,剩余:0
thread-4释放许可证,剩余:1
thread-8获取许可证,剩余:0
thread-6释放许可证,剩余:1
thread-9获取许可证,剩余:0
thread-7释放许可证,剩余:1
thread-8释放许可证,剩余:2
thread-5获取许可证,剩余:1
thread-9释放许可证,剩余:2
thread-5释放许可证,剩余:3

3.2、源码实现

核心基于AQS实现:AQS的state字段被赋值为所传入的permits值:

  • 调用acquire()方法,转发调用AQS的acquireSharedInterruptibly(1)方法,然后分为两种情形:
    • “公平策略”情形。判断是否获取成功的核心方法是Semaphore.FairSync中的tryAcquireShared(int acquires)方法,可知这里的重载实现逻辑是:当state字段值大于所需的许可证数量,表示成功;否则失败,挂起排队
    • “非公平策略”情形。判断是否获取成功的核心方法是Semaphore.NonfairSync中的tryAcquireShared(int acquires)方法,然后转发调用Semaphore.Sync中的nonfairTryAcquireShared(int acquires)方法,可知这里的重载实现逻辑是:当state字段值大于所需的许可证数量,表示成功;否则失败,挂起排队
  • 调用release()方法,转发调用AQS的releaseShared(1)方法,没有“公平/非公平”策略之分,最后都转发到Semaphore.Sync中的tryReleaseShared(int releases)方法,可知这里的重载实现逻辑是:state字段值增加归还的许可证数量值
  • acquire(int permits)acquireUninterruptibly()acquireUninterruptibly(int permits)release(int permits)tryAcquire()tryAcquire(int permits)tryAcquire(long timeout, TimeUnit unit)tryAcquire(int permits, long timeout, TimeUnit unit)方法的调用也都是转发到AQS,这里略

3.3、happens-before关系

releaseacquire/acquireUninterruptibly/tryAcquire两类之间没有必然的happens-before关系,接下来讨论几种具体情形下的happens-before关系。

在许可证耗尽前提下,此时两类的happens-before关系分为3类:

  1. 对于release()/release(int permits)acquire()/acquire(int permits)/acquireUninterruptibly()/acquireUninterruptibly(int permits)
    • acquire()/acquire(int permits)是正常退出而不是中断退出,acquireUninterruptibly()/acquireUninterruptibly(int permits)只能是正常退出而不允许是中断退出。此时,调用release()/release(int permits)方法之前的操作 happens-before 于调用acquire()/acquire(int permits)/acquireUninterruptibly()/acquireUninterruptibly(int permits)方法之后的操作。JavaDoc中的原叙述Memory consistency effects: Actions in a thread prior to calling a "release" method such as release() happen-before actions following a successful "acquire" method such as acquire() in another thread其实指的就是这种情形
    • acquire()/acquire(int permits)是中断退出。此时,两类之间没有必然的happens-before关系,须具体情况具体分析
  2. 对于release()/release(int permits)tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit)
    • tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit)是正常退出而不是中断或者超时退出。此时,调用release()/release(int permits)方法之前的操作 happens-before 于调用tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit)方法之后的操作
    • tryAcquire(long timeout, TimeUnit unit)/tryAcquire(int permits, long timeout, TimeUnit unit)是中断或者超时退出。此时,两类之间没有必然的happens-before关系,须具体情况具体分析
  3. 对于release()/release(int permits)tryAcquire()/tryAcquire(int permits)。此时,两类之间没有必然的happens-before关系,须具体情况具体分析

一点说明:

  • 主要基于volatile变量happens-before规则推导上述happens-before关系,state字段是一个volatile变量。详细证明可参见[1]

四、Exchanger

4.1、语义和使用示例

对于Exchanger实例对象:

  • 一般用法:两个线程调用exchange()方法,完成数据交互
  • 允许多余2个的线程调用exchange()方法,但最终必然是一对线程完成真正的数据交换

一般用法使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.Exchanger;

public class ExchangerExample {
public static void main(String[] args) {

final Exchanger exchanger = new Exchanger();

new Thread(new Runnable() {
String data1 = "Hello";

@Override
public void run() {
doExchangeWork(data1, exchanger);
}
}, "T1").start();

new Thread(new Runnable() {
String data1 = "World";

@Override
public void run() {
doExchangeWork(data1, exchanger);
}
}, "T2").start();
}

private static void doExchangeWork(String data1, Exchanger exchanger) {
try {
System.out.println(Thread.currentThread().getName() + "正在把数据 " + data1 + " 交换出去"); // 1
Thread.sleep((long)(Math.random() * 1000));

String data2 = (String)exchanger.exchange(data1); // 2

System.out.println(Thread.currentThread().getName() + "交换到数据 " + data2); // 3
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

一种程序运行结果:

1
2
3
4
T1正在把数据 Hello 交换出去
T2正在把数据 World 交换出去
T1交换到数据 World
T2交换到数据 Hello

“//T1-1 //T2-1”和“//T1-3 //T2-3”的执行顺序不定,但是“//T1-1 //T2-1”的执行必定先于“//T1-3 //T2-3”。这个必定先于基于后续的“4.3、happens-before关系”小节,而不是根据“在T1-2和T2-2没同时执行完之前,两者阻塞,故T1-1和T2-1先执行,再执行T1-2和T2-2,最后执行T1-3和T2-3”,因为“//1理论上可能重排序到//3后面,最后先执行T1-3和T2-3,再执行T1-1和T2-1”。

4.2、源码实现

核心流程是:

  • 并发竞争不激烈时,使用Node slot作为交换数据槽
  • 并发竞争激烈时,使用Node[] arena作为交换数据槽集合

对应核心流程的核心方法是slotExchange()arenaExchange()

详见Exchanger.java源码文件内对slotExchange()arenaExchange()方法的注释说明。

4.3、happens-before关系

对于执行exchange方法的两个线程T1和T2,两者之间存在happens-before关系:

  • T1和T2的exchange不配对,两者之间没有必然的happens-before关系,须具体情况具体分析
  • T1和T2的exchange配对,
    • 一般使用情形中,T1和T2成功返回,而不是“中断返回”或者“超时返回”,此时有:所有exchange方法之前的操作 happens-before 于所有exchange方法之后的操作。JavaDoc原叙述For each pair of threads that successfully exchange objects via an Exchanger, actions prior to the exchange() in each thread happen-before those subsequent to a return from the corresponding exchange() in the other thread其实指的就是这种情形
    • 在异常使用情形中,至少1个线程是非成功返回,而是“中断返回”或者“超时返回”,此时T1和T2的配对关系不再成立,两者之间没有必然的happens-before关系,须具体情况具体分析

一点说明:

  • 主要基于volatile变量happens-before规则推导上述happens-before关系,对槽元素的CAS操作具有volatile变量内存语义[3],Node.match字段是一个volatile变量,对其的读写当然具有volatile变量语义

参考文献

[1]《Lock接口》博文“2.1.2、源码实现”小节中对于ReentrantLock锁的happens-before规则证明
[2]《AQS》博文“2、‘基于此处AQS实现的高级排他锁’情形”小节内容
[3]《Unsafe类》

您的支持将鼓励我继续分享!