0%

AQS

一、基本含义

AQS,指代java.util.concurrent.locks.AbstractQueuedSynchronizer类,中文名为“队列同步器”,它是实现J.U.C包中几个高级组件(比如“ReentrantLock”,“ReentrantReadWriteLock”,“CountDownLatch”,“Semaphore”等)的核心辅助类。

设计AQS面向的问题是:多线程竞争临界资源(即private volatile int state),竞争失败的线程进入队列排队。

另外有一个类java.util.concurrent.locks.AbstractQueuedLongSynchronizer,它与java.util.concurrent.locks.AbstractQueuedSynchronizer相比,核心差异在于:其临界资源是private volatile long state

二、源码阅读

笔者对于AQS源码的理解程度只是概貌程度,远没有到细节,关于这点有以下几点说明:

  • 看源码时,以看懂大致内容为主,学习其设计的思路,不要陷入所有条件的处理细节中。尤其是在多线程环境中,对与错有时候并不是那么容易看出来的
  • AQS中的临界资源获取方法,比如“acquire(int arg)”,其本质其实就是“乐观锁实现悲观锁”,而“乐观锁”的实现本就较难理解
  • 内存可见性语义确保是通过“volatile变量”,“Unsafe类的CAS操作”,“final变量”等机制获得的

2.1、state

表征临界资源。

2.2、队列

2.2.1、队列节点

队列节点定义如下:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

static final class Node {
    static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
    static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile AbstractQueuedSynchronizer.Node prev;

    volatile AbstractQueuedSynchronizer.Node next;

    volatile Thread thread;

    AbstractQueuedSynchronizer.Node nextWaiter;

    Node() { // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, AbstractQueuedSynchronizer.Node mode) { // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
        AbstractQueuedSynchronizer.Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
}

1、3种类型队列
3种类型队列共用以上队列节点定义。

队列类型 waitStatus prev next thread nextWaiter
排他锁申请等待队列 !=Node.CONDITION 前导节点 后导节点 关联的线程实例对象 Node.EXCLUSIVE
共享锁申请等待队列 !=Node.CONDITION 前导节点 后导节点 关联的线程实例对象 Node.SHARED
AWAIT调用等待队列 ==Node.CONDITION 关联的线程实例对象 后导节点

2、prev和next
用于表征队列节点之间的关系(除了AWAIT调用等待队列),prev指针是准确的,next指针不一定准确,但是next不准确时只可能是以下两种不准确值:

  • NULL。cancelAcquire方法和enq方法可能出现一种并发执行情形:cancelAcquire方法中语句node == tail && compareAndSetTail(node, pred)enq方法中的compareAndSetTail(t, node)语句执行结果都为true,然后compareAndSetNext(pred, predNext, null);又后于t.next = node;执行,导致次尾节点的next字段值被错误地置为NULL
  • 指向的Node.waitStatus > 0,即处于CANCELLED状态。在cancelAcquire方法中,可能不进入unparkSuccessor(node);分支,此时pred节点的next字段值未得到更新,可能指向waitStatus > 0的节点

3、thread
关联的线程实例对象。

2.2.2、head和tail

head和tail分别表示排他锁申请等待队列/共享锁申请等待队列的队列头节点和尾节点(AWAIT调用等待队列的队列头和尾节点不由head/tail表示,而是由AbstractQueuedSynchronizer.ConditionObject类中的firstWaiter/lastWaiter表示):

  • 初始化时,head和tail都为NULL,只有产生线程竞争需要加入队列节点时才进行队列初始化,即惰性生成,此时head=tail=new Node()
  • 当head=tail时(值可以是NULL,也可以是非NULL),表示队列为空
  • 当线程T1持有锁时,head或者为空;或者不为空,此时该head节点即表征T1,虽然thread和prev字段会被置为NULL;反过来,当head为空或者不为空,是否有线程持有锁都是不确定的

2.3、方法

方法分为3类:

  • 辅助方法
  • 核心方法
  • 核心方法变种

2.3.1、核心方法

核心方法涵盖“获取”和“释放”两部分,又根据“排他竞争临界资源/共享竞争临界资源”可分为两类。
1、“排他竞争临界资源”核心方法
获取

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

释放

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;

        if (h != null && h.waitStatus != 0) {
            unparkSuccessor(h);
        }
        return true;
    }
    return false;
}
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

代码理解

  • 在这里,tryAcquire(int arg)tryRelease(int arg)方法直接抛出异常,真正的逻辑在子类实现,故本质上采用模板方法设计模式
  • 获取逻辑流程可描述为:
    1. 调用tryAcquire方法进行尝试,成功即退出;否则添加一个对应的队列节点,进入下述循环
    2. 循环:调用tryAcquire方法进行尝试,成功退出;否则,如果满足挂起条件则挂起等待被唤醒(唤醒后仍然回到循环开头),不满足挂起条件则回退到循环开头
  • 释放逻辑流程可描述为:如果释放成功,并且当前线程有对应的队列节点且节点的waitStatus!=0,则尝试唤醒队列的下一个节点
  • Node节点中waitStatus字段存在SIGNAL值的必要性在于:用于协调“获取方法中的线程挂起”和“释放方法中的线程唤醒”操作,确保挂起会被唤醒

2、“共享竞争临界资源”核心方法
获取:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

释放:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

代码理解

  • 在这里,tryAcquireShared(int arg)tryReleaseShared(int arg)方法直接抛出异常,真正的逻辑在子类实现,故本质上采用模板方法设计模式
  • 获取逻辑流程可描述为:
    1. 调用tryAcquireShared方法进行尝试,成功即退出;否则添加一个对应的队列节点,进入下述循环
    2. 循环:调用tryAcquireShared方法进行尝试,成功(这里跟1、“排他竞争临界资源”核心方法小节不一样的是,在以共享模式竞争到临界资源后,会尝试唤醒队列中所有等待进行共享竞争的节点,即调用setHeadAndPropagate方法,这是一个贪心优化)退出;否则,如果满足挂起条件则挂起等待被唤醒(唤醒后仍然回到循环开头),不满足挂起条件则回退到循环开头
  • 释放逻辑流程可描述为:如果释放成功,尝试唤醒队列中后续节点
  • Node节点中waitStatus字段存在SIGNAL值的必要性在于:用于协调“获取方法中的线程挂起”和“释放方法中的线程唤醒”操作,确保挂起会被唤醒

2.3.2、核心方法变种

根据以上内容知,核心方法涵盖“获取”和“释放”两部分,变种存在于“获取”部分。

1、“排他竞争临界资源”核心方法变种
原获取核心方法acquire -> tryAcquire,acquireQueued。在获取锁等待挂起过程中,遇到中断信号,不抛出InterruptedException异常,在获取到锁之后,再自调用中断方法。

获取核心方法变种1acquireInterruptibly -> tryAcquire,doAcquireInterruptibly。在获取锁等待挂起过程中,遇到中断信号,直接抛出InterruptedException异常。

获取核心方法变种2tryAcquireNanos -> tryAcquire,doAcquireNanos。在至多指定时间内尝试获取锁,如果时间超期仍然未成功则直接失败;在整个过程中,遇到中断信号,直接抛出InterruptedException异常。另外,tryAcquireNanos方法的名字取的易混淆。

2、“共享竞争临界资源”核心方法变种
原获取核心方法acquireShared -> tryAcquireShared,doAcquireShared。在获取锁等待挂起过程中,遇到中断信号,不抛出InterruptedException异常,在获取到锁之后,再自调用中断方法。

获取核心方法变种1acquireSharedInterruptibly -> tryAcquireShared,doAcquireSharedInterruptibly。在获取锁等待挂起过程中,遇到中断信号,直接抛出InterruptedException异常。

获取核心方法变种2tryAcquireSharedNanos -> tryAcquireShared,doAcquireSharedNanos。在至多指定时间内尝试获取锁,如果时间超期仍然未成功则直接失败;在整个过程中,遇到中断信号,直接抛出InterruptedException异常。另外,tryAcquireSharedNanos方法的名字取的易混淆。

2.4、“排他竞争临界资源”语境下的ConditionObject

AQS的内部类ConditionObject针对“排他竞争临界资源”语境设计,在“共享竞争临界资源”语境讨论无意义,其父接口为java.util.concurrent.locks.Condition

2.4.1、设计用途

为提供线程协同机制:

  • 与synchronized锁(排他锁)配套的是,Object类提供wait()/wait(long timeout)/wait(long timeout, int nanos)等待方法和notify()/notifyAll()唤醒等待方法
  • 与基于此处AQS实现的高级排他锁(比如“JDK中的ReentrantLock类表征的锁”)配套的是,ConditionObject类提供await()/awaitUninterruptibly()/await(long time, TimeUnit unit)/awaitNano(long nanosTimeout)/awaitUntil(Date deadline)等待方法和signal()/signalAll()唤醒等待方法

2.4.2、等待方法和唤醒等待方法

1、“synchronized锁”情形
等待方法有:wait()/wait(long timeout)/wait(long timeout, int nanos);唤醒等待方法有:notify()/notifyAll(),是“一次性唤醒信号”。

假定有线程A和线程B,锁对象lock,线程A调用执行对象lock的等待方法进入等待,该等待被唤醒的条件有[1]:

  • 线程B调用执行对象lock的notify()方法
  • 线程B调用执行对象lock的notifyAll()方法
  • 设定的超时时间到期
  • 虚假唤醒,即“无任何理由直接自唤醒退出,虽然很少发生(JavaDoc的原话是A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup.)”
  • 中断

2、“基于此处AQS实现的高级排他锁”情形
等待方法有:await()/awaitUninterruptibly()/await(long time, TimeUnit unit)/awaitNano(long nanosTimeout)/awaitUntil(Date deadline);唤醒等待方法有:signal()/signalAll(),是“一次性唤醒信号”。

假定有线程A和线程B,锁对象lock,调用lock.newConditionObject()生成的ConditionObject对象为obj,线程A调用执行对象obj的等待方法进入等待,该等待被唤醒的条件有:

  • 线程B调用执行对象obj的signal()方法
  • 线程B调用执行对象obj的signalAll()方法
  • 设定的超时时间到期
  • 设定的截止时间到期
  • 不存在虚假唤醒。但是需要注意的是Condition接口的JavaDoc中声明允许虚假唤醒存在,这里的ConditionObject实现杜绝了这种可能而已
  • 中断(需要注意的是,调用执行obj.awaitUninterruptibly()等待方法进入的等待不会被中断唤醒)

2.4.3、使用形式

两者的使用形式皆为:

调用等待方法:
    获取排他锁
    调用等待惯用法:在一个循环中调用等待
    释放排他锁


唤醒等待方法:
    获取排他锁
    唤醒等待
    释放排他锁

synchronized锁示例:

public class Main {
    Object obj = new Object();

    boolean flag = false;

    public void waitF() throws InterruptedException {
        synchronized (obj) {
            while (!flag) {
                obj.wait();
            }
        }
    }

    public void signalF() {
        synchronized (obj) {
            flag = true;

            obj.notify();
        }
    }
}

ReentrantLock锁示例:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

    ReentrantLock lock = new ReentrantLock();

    Condition condition = lock.newCondition();

    boolean flag = false;

    public void waitF() throws InterruptedException {
        lock.lock();

        try {
            while (!flag) {
                condition.await();
            }
        } finally {
            lock.unlock();
        }

    }

    public void signalF() {
        lock.lock();

        try {
            flag = true;
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

备注

  • “synchronized锁机制类”还有synchronized方法使用形式
  • 基于此处AQS实现的高级排他锁,除了这里的ReentrantLock,还可以自定义实现
  • synchronized锁类型只有一个“WAIT调用等待队列”;而基于此处AQS实现的高级排他锁可能就有多个“AWAIT调用等待队列”,因为可以多次调用newCondition()方法,生成多个ConditionObject对象,也即对应着多个“AWAIT调用等待队列”

2.4.4、实现细节

基于ConditionObject实现的等待/唤醒流程图如图1。

图1

几点说明

  • 跟调用wait()/wait(long timeout)/wait(long timeout, int nanos)方法公平加入对应的“WAIT调用等待队列”一样,调用await()/awaitUninterruptibly()/await(long time, TimeUnit unit)/awaitNano(long nanosTimeout)/awaitUntil(Date deadline)方法也是公平加入对应的“AWAIT调用等待队列”
  • 关于“释放锁-获取锁-抛出InterruptedException异常”三者关系:
    • 在这里我们可查看await()/await(long time, TimeUnit unit)/awaitNano(long nanosTimeout)/awaitUntil(Date deadline)这4个方法的源代码,从而可知:对于await/awaitNano/awaitUntil等待,不管是后序还是前序中断,“释放锁-获取锁-抛出InterruptedException异常”三者关系都较为明确
    • 而我们在《Java并发编程基础》中有提到过一个困惑——对于wait等待,后序中断时,“释放锁-获取锁-抛出InterruptedException异常”三者关系较为明确;前序中断时,“释放锁-获取锁-抛出InterruptedException异常”三者关系不能确定。wait等待的实现可能类似于await/awaitNano/awaitUntil,也可能不是的
    • 特别需要注意的是,上述“前序中断”和“后序中断”是通常范畴的理解,不要较真,比如“极端的后序中断产生前序中断的效果”

三、其他

接下来介绍基于AQS实现高级锁,常见形式为:在高级锁类中继承实现一个AQS的子类Sync,在该高级锁中定义一个该子类的实例成员变量Sync sync,该高级锁的核心方法实现都借助于sync

根据《原子操作与锁》我们知道,常见的锁分类有:

  • 悲观锁 vs 乐观锁
  • 阻塞锁 vs 非阻塞锁
  • 公平锁 vs 非公平锁
  • 可重入锁 vs 非可重入锁
  • 共享锁 vs 排他锁

结合上述锁分类,关于基于AQS实现高级锁有以下几点:

  • 根据AQS中的临界资源获取方法,比如“acquire(int arg)”,其本质其实就是“乐观锁实现悲观锁”,且获取不到时需要排队阻塞,故基于AQS实现的高级锁必为“悲观锁”和“阻塞锁”
  • 根据Sync具体实现,决定是“公平锁”还是“非公平锁”:
    • “公平锁”例子:sync实例成员变量指向ReentrantLock.FairSync实例的ReentrantLock锁,sync实例成员变量指向ReentrantReadWriteLock.FairSync实例的ReentrantReadWriteLock.WriteLock锁,sync实例成员变量指向ReentrantReadWriteLock.FairSync实例的ReentrantReadWriteLock.ReadLock锁
    • “非公平锁”例子:sync实例成员变量指向ReentrantLock.NonfairSync实例的ReentrantLock锁,sync实例成员变量指向ReentrantReadWriteLock.NonfairSync实例的ReentrantReadWriteLock.WriteLock锁,sync实例成员变量指向ReentrantReadWriteLock.NonfairSync实例的ReentrantReadWriteLock.ReadLock锁
  • 根据Sync具体实现,决定是“可重入锁”还是“非可重入锁”:
    • “可重入锁”例子:ReentrantLock锁,ReentrantReadWriteLock.WriteLock锁,ReentrantReadWriteLock.ReadLock锁
    • “非可重入锁”例子:如下源码中的NonReentrantMutex锁
  • 根据Sync具体实现,决定是“共享锁”还是“排他锁”:
    • “共享锁”例子:ReentrantReadWriteLock.ReadLock锁
    • “排他锁”例子:ReentrantLock锁,ReentrantReadWriteLock.WriteLock锁
public class NonReentrantMutex implements Lock, java.io.Serializable {

    // Our internal helper class
    private static class Sync extends AbstractQueuedSynchronizer {
        // Report whether in locked state
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // Acquire the lock if state is zero
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Release the lock by setting state to zero
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Provide a Condition
        Condition newCondition() { return new ConditionObject(); }

        // Deserialize properly
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();

    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
    public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

参考文献

[1]《Java并发编程基础》
[2]http://www.docjar.com/docs/api/java/util/concurrent/locks/AbstractQueuedSynchronizer$Node.html
[3]https://mp.weixin.qq.com/s/ut4hRJSDOls6UTUdlpu9_g
[4]https://javadoop.com/post/AbstractQueuedSynchronizer
[5]https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

您的支持将鼓励我继续分享!