一、了解 AbstractQueuedSynchronizer(AQS)
1、AQS 简介
AbstractQueuedSynchronizer
是大师 Doug Lea 编写的一个并发编程类,位于 java.util.concurrent.locks,是 CountdownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock、ThreadPoolExecutor 中重要的组成部分,他们中关于 “锁” 的部分与 AQS
息息相关。
借用一下源码中的说法,AbstractQueuedSynchronizer
基于一个 FIFO 队列
提供了一套阻塞锁和同步相关的实现。该类被设计成为很多同步容器 synchronizers
的底层实现,它使用了一个原子int private volatile int state;
来表示当前状态。当在 AQS
被 acquired
(获取资源) 或被 release
(释放资源)时,需要依据这个 state
来进行判断。所以子类需要定义方法来修改这个状态,该状态的含义由我们自由定制。(翻译的不好...)

2、实现最简单的 AQS
我们来看一个最简单的例子,我们有一个类 Sync
继承了 AbstractQueuedSynchronizer
,并重写了其 tryAcquire
和 tryRelease
方法。实现非常简单,我们通过调用父类的 compareAndSetState()
以及 setState()
来完成,简单来说(不是特别准确),就是 tryAcquire
返回 true
,代表获取锁成功,否则就会阻塞。而 tryRelease
则负责锁的释放。
在例子中:将 state
设置为 100
代表当前状态为无锁,1
则代表已经有某个线程获取了该锁。当然这个 state
表达的含义是怎么样的,完全是我们定义的,实际上锁定或者无锁是 100
还是 200
还是 -100
,都没有什么关系。
/**
* Created by Anur IjuoKaruKas on 2019/5/7
*/
public class Mutex extends AbstractQueuedSynchronizer {
public static class Sync extends AbstractQueuedSynchronizer {
public Sync() {
setState(100); // set the initial state, being unlocked.
}
@Override
protected boolean tryAcquire(int ignore) {
boolean result = compareAndSetState(100, 1);
print("尝试获取锁" + (result ? "成功" : "失败"));
return result;
}
@Override
protected boolean tryRelease(int ignore) {
setState(100);
return true;
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(0);
}
public void unLock() {
sync.release(0);
}
public static void main(String[] args) throws InterruptedException {
Mutex mutex = new Mutex();
mutex.lock();
Thread thread = new Thread(() -> {
print("调用 mutex.lock() 之前");
mutex.lock();
print("调用 mutex.lock() 之后");
});
thread.start();
print("main 线程 Sleep 之前");
Thread.sleep(5000);
print("main 线程 Sleep 之后");
mutex.unLock();
}
public static void print(String print) {
System.out.println(String.format("时间 - %s\t\t%s\t\t%s", new Date(), Thread.currentThread(), print));
}
}
========================================= 输出
时间 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] 尝试获取锁成功
时间 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] main 线程 Sleep 之前
时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之前
时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 15:44:24 CST 2019 Thread[main,5,main] main 线程 Sleep 之后
时间 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 尝试获取锁成功
时间 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之后
我们可以看到,代码符合我们的预期:在 main 函数所在线程调用 mutex.unLock();
释放锁之前,子线程是一直阻塞的,调用 mutex.lock() 之后
的日志输出发生在 main 线程 Sleep 之后
之后。
通过重写 tryAcquire
、tryRelease
方法,以及调用 acquire
和 release
方法,我们很容易就实现了一个锁,当然这个锁有一堆问题... 我们只是通过这个小例子,来建立对 AQS
一个简单的了解。
看到这里,有些细心的小伙伴可能会想了,既然锁是由 tryAcquire
控制的,那和 state
又有什么关系呢? 我们完全可以定义一个自定义变量,比如 sign
,false
代表无锁,true
代表锁定,好像也可以实现这段逻辑啊?这个时候就需要引出我们神奇的 compareAndSet
,CAS
操作了。
3、AQS 绕不过的话题: CAS Compare And Swap
前面说到,我们暂时认为 :tryAcquire
返回 true
,代表获取到锁,反之只要 tryAcquire
返回 flase
,线程就会被阻塞(不准确,后面会细说)。实际上这里有一个 隐含条件,我们必须做到:
- ※ 无论何时,都只能有一个线程
tryAcquire
成功,且在某个线程 tryAcquire
成功之后,并在其 release
释放锁之前,任何线程进行 tryAcquire
都将返回 false
。
是的,就是并发问题!
下面这个例子我们简单使用一个自定义变量 sign
来实现 tryAcquire
,看看会发生什么:
private boolean sign;
@Override
protected boolean tryAcquire(int ignore) {
boolean result = false;
if (!sign) {
sign = true;
result = true;
}
print("尝试获取锁" + (result ? "成功" : "失败"));
return result;
}
@Override
protected boolean tryRelease(int ignore) {
sign = false;
return true;
}
========================================= 输出
时间 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] 尝试获取锁成功
时间 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] main 线程 Sleep 之前
时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之前
时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 18:03:17 CST 2019 Thread[main,5,main] main 线程 Sleep 之后
时间 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 尝试获取锁成功
时间 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之后
看起来好像没问题,在这个 demo 中也得到了和第一个 DEMO 一样的预期的结果。然而事情并没有那么简单,新写的这个 tryAcquire
实现是一个 "CompareThenSet"
操作,在并发的情况下,会出现不可预期的情况
- 线程A 进来,发现
sign
为 false
- 线程B 同时进来,也发现
sign
为 false
- 两者同时将
sign
修改为 true
,问题就来了。
到底是 线程A 获取到了锁,还是 线程B 呢?(实际上都获取到了)
我们改一下 Main 方法,我们使用 100 个线程并发执行 mutex.lock();
获取锁成功则会输出语句 print("获取锁成功");
,执行,发现,竟然有两个线程同时获取到了锁。有两个线程同时将 sign
修改为了 true
。
public static void main(String[] args) throws InterruptedException {
Mutex mutex = new Mutex();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
threads.add(new Thread(() -> {
mutex.lock();
print("获取锁成功");
}));
}
ExecutorService executorService = Executors.newFixedThreadPool(100);
threads.forEach(executorService::submit);
Thread.sleep(1000);
}

如果我们使用 AQS
帮我们写好的 compareAndSetState
则没有这个问题。
在 Java9
之前,底层实现是调用 unsafe
包的 compareAndSwapInt
来实现的:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
而在 Java9
之后,则是使用 VarHandle
来实现 VarHandle
是 unSafe
的一个替代方案,本文不多赘述,后面会有文章讲到这个 ~ 。
// VarHandle mechanics
private static final VarHandle STATE;
---------------------------------------------
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
这里简单的说一下 CAS
,即 CompareAndSwap
。CAS
可原子性地比较并替换一个值,乐观锁中一个典型的实现便是使用 CAS
来完成的。对并发编程有所了解的小伙伴应该都知道 CAS
,一般情况下,Compare(比较)
和 Swap(交换)
至少是两个原子操作(实际上是更多个原子操作,主要看编译成多少条机器码)。而 CAS
则保证了 Compare
和 Swap
为一个原子操作。
二、深入理解 AbstractQueuedSynchronizer(AQS) 资源锁定与解锁正向流程
上文说到,我们暂时认为 :tryAcquire
返回 true
,代表获取到锁,反之只要 tryAcquire
返回 flase
,线程就会被阻塞。
AQS
当然没有这么简单,但我们可以先看看加锁时调用的 acquire
方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们发现,tryAcquire
只是第一重判断,如果 tryAcquire
失败,紧接着还有另一个核心逻辑 acquireQueued
。在简介里,我们说,AQS
除了使用一个 原子state
来作为状态判断以外,还有一个 FIFO 队列
,此队列就和 acquireQueued
方法息息相关。另外,AQS
所控制的资源访问,还可以是共享的,或者独占的(addWaiter
参数 Node.EXCLUSIVE
)。
以下的分析我们以一个简单的 独占式非公平 AQS
实现: java.util.concurrent.locks.ReentrantLock.NonfairSync
来深入解析。独占式很好理解,大部分的锁实现都只允许一个线程在同一时间获取到锁定的资源。
1、TryAcquire 与 TryRelease 的标准写法及其优化
先看看 NonfairSync
的 tryAcuire
是怎么实现及优化的。首先,NonfairSync
中将 state == 0
定义为无锁状态。
- 竞争优化: 如果当前无锁(
state == 0
),再调用 CAS
。这实际上对性能是一个很好的优化,假设当前取 state
不为 0
,实际上 CompareAndSetState
成功的概率也很小,这也可以避免同一时间内,过多的线程去并发修改 state
这个状态。
- 重入设计: 试想如果我们不判断当前线程是否持有锁,就去进行
CAS
操作,会发生什么?毫无疑问是 CAS
失败,这会间接导致死锁。这里我们可以看到,重入以后,有一个 int nextc = c + acquires;
操作,这是方便我们记录到底套了几层锁用的,如果没有这个机制,我们将无法精确的控制加锁和解锁的层级,难免会出现一些意料之外的情况。简单来说:lock
几次,就要 unLock
几次。当然我们也可以做到 aquire
多次,一次性 release
掉,或者反过来,取决于怎么我们实现 tryAquire
和 tryRlease
方法。
- 偏向优化: 这个优化实际上很简单,如果说要获取锁的线程就是锁的持有线程,我们无需去进行任何
CAS
操作,返回 true
即可。
@ReservedStackAccess
final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 避免过多的线程竞争 CAS 操作
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);// 如果 CAS 操作成功,则将当前线程保存起来,重入和解锁时用于判断。
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 重入优化,每次加锁相当于 `state++`
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true; // 偏向优化
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 每次解锁相当于 `state--` 直到 state == 0 ,代表可释放锁了
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2、AcquireQueued 解析
①、addWaiter阶段
如果 tryAquire
失败,就会进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
,addWaiter
方法创建了一个新的 Node
实例,Node
实例中主要保存了当前线程信息,并将 nextWaiter
赋值为 Node.EXCLUSIVE
, 这个 nextWaiter
后面再谈,它主要用于线程调度、以及独占模式、共享模式的区分,我们可以先不管它。

操作比较简单,原理是将 node
塞入双向链表尾端,也就是前面提到的 FIFO队列
。就是利用 CAS
操作将新创建的、带有本线程信息的 node
设置为双向链表新的 tail
,并且修改两者的 ‘指针’ prev
和 next
。

/** Constructor used by addWaiter. */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue(); // 初始化双向链表,就是创建一个新的空 node,并且头尾都是此 node。
// 这个 node 除了拿来标记链表从哪里开始,没有什么别的意义。
}
}
②、acquireQueued阶段(自旋)
入队成功后,进入 acquireQueued
方法,抛开线程被 interrupt
的情况,acquireQueued
的代码其实也很简单,我们不看 interrupt
相关逻辑,其实逻辑还是很简单的。这是一个无限循环(或者说自旋),只要没有 tryAcquire
成功,就会一直循环下去,逻辑如下:
- 如果上一个节点是
FIFO
队列头,则进行一次 tryAquire
,如果成功,则跳出循环。
- 检测是否需要阻塞,如果需要阻塞,则阻塞等待唤醒,
parkAndCheckInterrupt
便是阻塞直到被唤醒(或者被 interrupt
,暂时先不考虑这个情况)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquire()
和 parkAndCheckInterrupt()
都很好理解,前者就是去尝试一下获取锁定资源,看能否成功。后者则是阻塞直到被唤醒。
③、阻塞阶段
我们先说说 shouldParkAfterFailedAcquire
,这个判断是一个挺有意思的设计,后续文章会细说,它和线程调度、取消获取锁等相关。因为在获取锁定资源和释放锁定资源的过程中,实际上我们只需要用到两个状态,一个是初始状态 pred.waitStatus == 0
,另一个是 pred.waitStatus == SIGNAL == -1
。
代码中我们可以很容易看出,在 CAS
将 prev
节点的 waitStatus
设置为 SIGNAL : -1
之前,都将返回 false
,如果设置成功,下一次自旋进入该方法就是 true
了,也就是说,会进入 parkAndCheckInterrupt()
方法,阻塞直到被唤醒。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
自旋阶段图解:

阻塞阶段图解:

3、release 解析
①、唤醒 FIFO 的下一个节点
阻塞直到唤醒这个逻辑在锁定资源、释放资源 这两个阶段来看十分简单,最后我们来看看 release
做了什么,release
除了调用了我们自己实现的 tryRelease
之外,其实关键的就是这个 unparkSuccessor
。
tryRelease
上面也说过了,就是改改原子 state
,这里不多赘述。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
代码中可以看出,当 FIFO
队列不为空且头结点的 waitStatu
被修改过,就会进入 unparkSuccessor
,unparkSuccessor
传入了当前 FIFO
的队列头,逻辑如下:
- 如果当前节点
waitStatus
为负(可能为 SIGNAL
、CONDITION
或者 PROPAGATE
),我们这里简单先看成只有 SIGNAL
状态,则 CAS
将其设置为 0
。其他几个状态我们后面会说到。
- 如果
!(s == null || s.waitStatus > 0)
,也就是说 node.next
的 waitStatus <= 0
,则简单的直接将其唤醒:LockSupport.unpark(s.thread);
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

②、被唤醒后
被唤醒的线程当然不是直接获得了锁,它还是会继续 acquireQueue
进行自旋,逻辑还是和之前一样,避免小伙伴往上翻代码,这里贴了一份如果 prev
是头结点,如果 tryAcquire
成功,我们看到其实很简单,只是将自己设为头部即可。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}

这篇文章只是简单的说说 AQS
的正向获取资源,释放资源流程,后续会继续解析 wait
、notify
、condition
等基于 AQS
的线程调度解析 ~~ 以及各个锁是如何实现 AQS
的 ~~
文章皆是基于源码一步步分析,没有参考过多资料,如有错误,请指出!!
另外欢迎来 Q 群讨论技术相关(目前基本没人)[左二维码]~
如果觉得写得好还可以关注一波订阅号哟 ~ 博客和订阅号同步更新 [右二维码]~

参考资料:
JDK12 源码
另外小伙伴可以思考一下:
- 如果说阶段7:
ThreadB
被唤醒后,继续自旋时,另一个线程ThreadC
tryAcquire
成功了会发生什么。
- 如果说第一个问题了解了,那应该就很清楚为什么说本文解析的这个锁叫做:非公平锁了
- 众所周知,只要是
CAS
操作,都有 ABA
问题,如果说修改 waitStatus
发生了 ABA
问题,会发生什么?