0%

AQS

一、基本含义

AQS,指代java.util.concurrent.locks.AbstractQueuedSynchronizer类,中文名为“队列同步器”,它是实现J.U.C包中几个高级组件(比如“ReentrantLock”,“ReentrantReadWriteLock”,“CountDownLatch”,“Semaphore”等)的核心辅助类。

设计AQS面向的问题是:多线程竞争临界资源(即private volatile int state),竞争失败的线程进入队列排队。

另外有一个类java.util.concurrent.locks.AbstractQueuedLongSynchronizer,它与java.util.concurrent.locks.AbstractQueuedSynchronizer相比,核心差异在于:其临界资源是private volatile long state

二、源码阅读

笔者对于AQS源码的理解程度只是概貌程度,远没有到细节,关于这点有以下几点说明:

  • 看源码时,以看懂大致内容为主,学习其设计的思路,不要陷入所有条件的处理细节中。尤其是在多线程环境中,对与错有时候并不是那么容易看出来的
  • AQS中的临界资源获取方法,比如“acquire(int arg)”,其本质其实就是“乐观锁实现悲观锁”,而“乐观锁”的实现本就较难理解
  • 内存可见性语义确保是通过“volatile变量”,“Unsafe类的CAS操作”,“final变量”等机制获得的

2.1、state

表征临界资源。

2.2、队列

2.2.1、队列节点

队列节点定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

static final class Node {
static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;

static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

volatile int waitStatus;

volatile AbstractQueuedSynchronizer.Node prev;

volatile AbstractQueuedSynchronizer.Node next;

volatile Thread thread;

AbstractQueuedSynchronizer.Node nextWaiter;

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, AbstractQueuedSynchronizer.Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}

final boolean isShared() {
return nextWaiter == SHARED;
}

final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
AbstractQueuedSynchronizer.Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}

1、3种类型队列
3种类型队列共用以上队列节点定义。

队列类型 waitStatus prev next thread nextWaiter
排他锁申请等待队列 !=Node.CONDITION 前导节点 后导节点 关联的线程实例对象 Node.EXCLUSIVE
共享锁申请等待队列 !=Node.CONDITION 前导节点 后导节点 关联的线程实例对象 Node.SHARED
AWAIT调用等待队列 ==Node.CONDITION 关联的线程实例对象 后导节点

2、prev和next
用于表征队列节点之间的关系(除了AWAIT调用等待队列),prev指针是准确的,next指针不一定准确,但是next不准确时只可能是以下两种不准确值:

  • NULL。cancelAcquire方法和enq方法可能出现一种并发执行情形:cancelAcquire方法中语句node == tail && compareAndSetTail(node, pred)enq方法中的compareAndSetTail(t, node)语句执行结果都为true,然后compareAndSetNext(pred, predNext, null);又后于t.next = node;执行,导致次尾节点的next字段值被错误地置为NULL
  • 指向的Node.waitStatus > 0,即处于CANCELLED状态。在cancelAcquire方法中,可能不进入unparkSuccessor(node);分支,此时pred节点的next字段值未得到更新,可能指向waitStatus > 0的节点

3、thread
关联的线程实例对象。

2.2.2、head和tail

head和tail分别表示排他锁申请等待队列/共享锁申请等待队列的队列头节点和尾节点(AWAIT调用等待队列的队列头和尾节点不由head/tail表示,而是由AbstractQueuedSynchronizer.ConditionObject类中的firstWaiter/lastWaiter表示):

  • 初始化时,head和tail都为NULL,只有产生线程竞争需要加入队列节点时才进行队列初始化,即惰性生成,此时head=tail=new Node()
  • 当head=tail时(值可以是NULL,也可以是非NULL),表示队列为空
  • 当线程T1持有锁时,head或者为空;或者不为空,此时该head节点即表征T1,虽然thread和prev字段会被置为NULL;反过来,当head为空或者不为空,是否有线程持有锁都是不确定的

2.3、方法

方法分为3类:

  • 辅助方法
  • 核心方法
  • 核心方法变种

2.3.1、核心方法

核心方法涵盖“获取”和“释放”两部分,又根据“排他竞争临界资源/共享竞争临界资源”可分为两类。
1、“排他竞争临界资源”核心方法
获取

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

释放

1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;

if (h != null && h.waitStatus != 0) {
unparkSuccessor(h);
}
return true;
}
return false;
}
1
2
3
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

代码理解

  • 在这里,tryAcquire(int arg)tryRelease(int arg)方法直接抛出异常,真正的逻辑在子类实现,故本质上采用模板方法设计模式
  • 获取逻辑流程可描述为:
    1. 调用tryAcquire方法进行尝试,成功即退出;否则添加一个对应的队列节点,进入下述循环
    2. 循环:调用tryAcquire方法进行尝试,成功退出;否则,如果满足挂起条件则挂起等待被唤醒(唤醒后仍然回到循环开头),不满足挂起条件则回退到循环开头
  • 释放逻辑流程可描述为:如果释放成功,并且当前线程有对应的队列节点且节点的waitStatus!=0,则尝试唤醒队列的下一个节点
  • Node节点中waitStatus字段存在SIGNAL值的必要性在于:用于协调“获取方法中的线程挂起”和“释放方法中的线程唤醒”操作,确保挂起会被唤醒

2、“共享竞争临界资源”核心方法
获取:

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
1
2
3
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

释放:

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
1
2
3
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

代码理解

  • 在这里,tryAcquireShared(int arg)tryReleaseShared(int arg)方法直接抛出异常,真正的逻辑在子类实现,故本质上采用模板方法设计模式
  • 获取逻辑流程可描述为:
    1. 调用tryAcquireShared方法进行尝试,成功即退出;否则添加一个对应的队列节点,进入下述循环
    2. 循环:调用tryAcquireShared方法进行尝试,成功(这里跟1、“排他竞争临界资源”核心方法小节不一样的是,在以共享模式竞争到临界资源后,会尝试唤醒队列中所有等待进行共享竞争的节点,即调用setHeadAndPropagate方法,这是一个贪心优化)退出;否则,如果满足挂起条件则挂起等待被唤醒(唤醒后仍然回到循环开头),不满足挂起条件则回退到循环开头
  • 释放逻辑流程可描述为:如果释放成功,尝试唤醒队列中后续节点
  • Node节点中waitStatus字段存在SIGNAL值的必要性在于:用于协调“获取方法中的线程挂起”和“释放方法中的线程唤醒”操作,确保挂起会被唤醒

2.3.2、核心方法变种

根据以上内容知,核心方法涵盖“获取”和“释放”两部分,变种存在于“获取”部分。

1、“排他竞争临界资源”核心方法变种
原获取核心方法acquire -> tryAcquire,acquireQueued。在获取锁等待挂起过程中,遇到中断信号,不抛出InterruptedException异常,在获取到锁之后,再自调用中断方法。

获取核心方法变种1acquireInterruptibly -> tryAcquire,doAcquireInterruptibly。在获取锁等待挂起过程中,遇到中断信号,直接抛出InterruptedException异常。

获取核心方法变种2tryAcquireNanos -> tryAcquire,doAcquireNanos。在至多指定时间内尝试获取锁,如果时间超期仍然未成功则直接失败;在整个过程中,遇到中断信号,直接抛出InterruptedException异常。另外,tryAcquireNanos方法的名字取的易混淆。

2、“共享竞争临界资源”核心方法变种
原获取核心方法acquireShared -> tryAcquireShared,doAcquireShared。在获取锁等待挂起过程中,遇到中断信号,不抛出InterruptedException异常,在获取到锁之后,再自调用中断方法。

获取核心方法变种1acquireSharedInterruptibly -> tryAcquireShared,doAcquireSharedInterruptibly。在获取锁等待挂起过程中,遇到中断信号,直接抛出InterruptedException异常。

获取核心方法变种2tryAcquireSharedNanos -> tryAcquireShared,doAcquireSharedNanos。在至多指定时间内尝试获取锁,如果时间超期仍然未成功则直接失败;在整个过程中,遇到中断信号,直接抛出InterruptedException异常。另外,tryAcquireSharedNanos方法的名字取的易混淆。

2.4、“排他竞争临界资源”语境下的ConditionObject

AQS的内部类ConditionObject针对“排他竞争临界资源”语境设计,在“共享竞争临界资源”语境讨论无意义,其父接口为java.util.concurrent.locks.Condition

2.4.1、设计用途

为提供线程协同机制:

  • 与synchronized锁(排他锁)配套的是,Object类提供wait()/wait(long timeout)/wait(long timeout, int nanos)等待方法和notify()/notifyAll()唤醒等待方法
  • 与基于此处AQS实现的高级排他锁(比如“JDK中的ReentrantLock类表征的锁”)配套的是,ConditionObject类提供await()/awaitUninterruptibly()/await(long time, TimeUnit unit)/awaitNano(long nanosTimeout)/awaitUntil(Date deadline)等待方法和signal()/signalAll()唤醒等待方法

2.4.2、等待方法和唤醒等待方法

1、“synchronized锁”情形
等待方法有:wait()/wait(long timeout)/wait(long timeout, int nanos);唤醒等待方法有:notify()/notifyAll(),是“一次性唤醒信号”。

假定有线程A和线程B,锁对象lock,线程A调用执行对象lock的等待方法进入等待,该等待被唤醒的条件有[1]:

  • 线程B调用执行对象lock的notify()方法
  • 线程B调用执行对象lock的notifyAll()方法
  • 设定的超时时间到期
  • 虚假唤醒,即“无任何理由直接自唤醒退出,虽然很少发生(JavaDoc的原话是A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup.)”
  • 中断

2、“基于此处AQS实现的高级排他锁”情形
等待方法有:await()/awaitUninterruptibly()/await(long time, TimeUnit unit)/awaitNano(long nanosTimeout)/awaitUntil(Date deadline);唤醒等待方法有:signal()/signalAll(),是“一次性唤醒信号”。

假定有线程A和线程B,锁对象lock,调用lock.newConditionObject()生成的ConditionObject对象为obj,线程A调用执行对象obj的等待方法进入等待,该等待被唤醒的条件有:

  • 线程B调用执行对象obj的signal()方法
  • 线程B调用执行对象obj的signalAll()方法
  • 设定的超时时间到期
  • 设定的截止时间到期
  • 不存在虚假唤醒。但是需要注意的是Condition接口的JavaDoc中声明允许虚假唤醒存在,这里的ConditionObject实现杜绝了这种可能而已
  • 中断(需要注意的是,调用执行obj.awaitUninterruptibly()等待方法进入的等待不会被中断唤醒)

2.4.3、使用形式

两者的使用形式皆为:

1
2
3
4
5
6
7
8
9
10
调用等待方法:
获取排他锁
调用等待惯用法:在一个循环中调用等待
释放排他锁


唤醒等待方法:
获取排他锁
唤醒等待
释放排他锁

synchronized锁示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Main {
Object obj = new Object();

boolean flag = false;

public void waitF() throws InterruptedException {
synchronized (obj) {
while (!flag) {
obj.wait();
}
}
}

public void signalF() {
synchronized (obj) {
flag = true;

obj.notify();
}
}
}

ReentrantLock锁示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

ReentrantLock lock = new ReentrantLock();

Condition condition = lock.newCondition();

boolean flag = false;

public void waitF() throws InterruptedException {
lock.lock();

try {
while (!flag) {
condition.await();
}
} finally {
lock.unlock();
}

}

public void signalF() {
lock.lock();

try {
flag = true;
condition.signal();
} finally {
lock.unlock();
}
}
}

备注

  • “synchronized锁机制类”还有synchronized方法使用形式
  • 基于此处AQS实现的高级排他锁,除了这里的ReentrantLock,还可以自定义实现
  • synchronized锁类型只有一个“WAIT调用等待队列”;而基于此处AQS实现的高级排他锁可能就有多个“AWAIT调用等待队列”,因为可以多次调用newCondition()方法,生成多个ConditionObject对象,也即对应着多个“AWAIT调用等待队列”

2.4.4、实现细节

基于ConditionObject实现的等待/唤醒流程图如图1。

图1

几点说明

  • 跟调用wait()/wait(long timeout)/wait(long timeout, int nanos)方法公平加入对应的“WAIT调用等待队列”一样,调用await()/awaitUninterruptibly()/await(long time, TimeUnit unit)/awaitNano(long nanosTimeout)/awaitUntil(Date deadline)方法也是公平加入对应的“AWAIT调用等待队列”
  • 关于“释放锁-获取锁-抛出InterruptedException异常”三者关系:
    • 在这里我们可查看await()/await(long time, TimeUnit unit)/awaitNano(long nanosTimeout)/awaitUntil(Date deadline)这4个方法的源代码,从而可知:对于await/awaitNano/awaitUntil等待,不管是后序还是前序中断,“释放锁-获取锁-抛出InterruptedException异常”三者关系都较为明确
    • 而我们在《Java并发编程基础》中有提到过一个困惑——对于wait等待,后序中断时,“释放锁-获取锁-抛出InterruptedException异常”三者关系较为明确;前序中断时,“释放锁-获取锁-抛出InterruptedException异常”三者关系不能确定。wait等待的实现可能类似于await/awaitNano/awaitUntil,也可能不是的
    • 特别需要注意的是,上述“前序中断”和“后序中断”是通常范畴的理解,不要较真,比如“极端的后序中断产生前序中断的效果”

三、其他

接下来介绍基于AQS实现高级锁,常见形式为:在高级锁类中继承实现一个AQS的子类Sync,在该高级锁中定义一个该子类的实例成员变量Sync sync,该高级锁的核心方法实现都借助于sync

根据《原子操作与锁》我们知道,常见的锁分类有:

  • 悲观锁 vs 乐观锁
  • 阻塞锁 vs 非阻塞锁
  • 公平锁 vs 非公平锁
  • 可重入锁 vs 非可重入锁
  • 共享锁 vs 排他锁

结合上述锁分类,关于基于AQS实现高级锁有以下几点:

  • 根据AQS中的临界资源获取方法,比如“acquire(int arg)”,其本质其实就是“乐观锁实现悲观锁”,且获取不到时需要排队阻塞,故基于AQS实现的高级锁必为“悲观锁”和“阻塞锁”
  • 根据Sync具体实现,决定是“公平锁”还是“非公平锁”:
    • “公平锁”例子:sync实例成员变量指向ReentrantLock.FairSync实例的ReentrantLock锁,sync实例成员变量指向ReentrantReadWriteLock.FairSync实例的ReentrantReadWriteLock.WriteLock锁,sync实例成员变量指向ReentrantReadWriteLock.FairSync实例的ReentrantReadWriteLock.ReadLock锁
    • “非公平锁”例子:sync实例成员变量指向ReentrantLock.NonfairSync实例的ReentrantLock锁,sync实例成员变量指向ReentrantReadWriteLock.NonfairSync实例的ReentrantReadWriteLock.WriteLock锁,sync实例成员变量指向ReentrantReadWriteLock.NonfairSync实例的ReentrantReadWriteLock.ReadLock锁
  • 根据Sync具体实现,决定是“可重入锁”还是“非可重入锁”:
    • “可重入锁”例子:ReentrantLock锁,ReentrantReadWriteLock.WriteLock锁,ReentrantReadWriteLock.ReadLock锁
    • “非可重入锁”例子:如下源码中的NonReentrantMutex锁
  • 根据Sync具体实现,决定是“共享锁”还是“排他锁”:
    • “共享锁”例子:ReentrantReadWriteLock.ReadLock锁
    • “排他锁”例子:ReentrantLock锁,ReentrantReadWriteLock.WriteLock锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class NonReentrantMutex implements Lock, java.io.Serializable {

// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Report whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}

// Acquire the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// Release the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// Provide a Condition
Condition newCondition() { return new ConditionObject(); }

// Deserialize properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();

public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}

参考文献

[1]《Java并发编程基础》
[2]http://www.docjar.com/docs/api/java/util/concurrent/locks/AbstractQueuedSynchronizer$Node.html
[3]https://mp.weixin.qq.com/s/ut4hRJSDOls6UTUdlpu9_g
[4]https://javadoop.com/post/AbstractQueuedSynchronizer
[5]https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

您的支持将鼓励我继续分享!