AQS简介
AbstractQueuedSynchronizer是一个基于first-in-first-out (FIFO)队列实现阻塞锁和同步器功能的框架。简称AQS
,此类的设计目标是成为依靠单个原子 int 值来表示状态。子类必须定义更改此状态的方法(如tryReleaseShared),并定义哪种状态对于此对象意味着被获取或被释放。
我们首先看一下AQS类中FIFO队列的真实面貌
static final class Node { static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node(); static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile AbstractQueuedSynchronizer.Node prev; volatile AbstractQueuedSynchronizer.Node next; volatile Thread thread; AbstractQueuedSynchronizer.Node nextWaiter; }
可以看出其实就是一个双向队列,应该是非常容易理解的。在这个对象里面设置了好几种状态值,这主要用于设置锁的状态值。
CANCELLED => 取消状态
SIGNAL => 等待触发状态
CONDITION => 等待条件状态
PROPAGATE => 状态需要向后传播
后面具体在介绍其中的含义,不过我倒是觉得这几个状态值应该自己用个枚举维护会更好。。。
而SHARED对象是用于共享锁如(CountDownLatch),EXCLUSIVE则是用于独占锁(ReentrantLock)
方法原理
本文将从独占式和非独占锁两部分分别讲述一下AQS的工作原理。
独占式锁
首先介绍一下acquire
方法,此方法是以独占模式获取对象,忽略中断。tryAcquire
方法尝试获取锁,需要子类自己实现。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这里主要是三个步骤:
1.首先直接先尝试获取锁(tryAcquire),如果获取到就结束了,如果获取不到; 2.则会将这个线程加入到等待队列(addWaiter(Node.EXCLUSIVE), arg)); 3.进行自旋,尝试获取锁(acquireQueued)。这也是和synchronized方法不同的地方。
当获取失败的时候就需要将线程加入到等待的队列中,看一下addWaiter
方法的具体实现。
private Node addWaiter(Node mode) { //将当前线程包装成一个节点 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 如果存在等待的队列,直接加入到队尾 if (pred != null) { node.prev = pred; //使用CAS方式设置tail节点 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //不存在的时候创建队列 enq(node); return node; }
这里总共做了三件事情
1.首先把当前对象包装成一个node
2.判断等待队列是否为空,如果等待队列已经存在,直接添加到队列末尾
3.如果为空的话,需要尝试新建一个等待队列
为什么说是尝试新建队列,下面看一下具体enq
方法的实现就能知道
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { //初始化队列头 if (compareAndSetHead(new Node())) tail = head; } else { //如果已经初始化了就添加到队尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
此处为什么要这么设计,因为AQS的方法都是没有加锁的,所以很有可能进入enq方法的时候别的线程初始化过等待队列了。所以此处也是用CAS操作,都是为了防止发生错误的可能性。关于CAS的详细介绍,可以自己查阅相关的资料。
上面介绍完加入队列后,下面就要看一下acquireQueued
方法的实现了,大概也能猜到是怎么进行自旋的。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取node的前一个元素,满足条件就直接设置为新的队头,p就要被干掉 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果不满足条件,就需要把线程挂起并检查中断状态 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //如果最终都没有找到合适的,取消等待的节点 if (failed) cancelAcquire(node); } }
上面的方法是线程自旋获取锁的方法,主要做了一下几件事情
1.首先检查当前节点是否为head的下一个节点,如果是=>第二步,否则=>第三步 2.尝试获取锁,如果获取失败则可能被别人插队了...则进行第三步,如果成功将当前线程设置为头节点,返回 3.如果没有获得锁,则挂起当前线程并检查中断状态,此状态主要用于wait方法
此处节点会一直检查当前是否为下一个被唤醒的位置。如果没有到就会检测中间元素是否已经被取消,如果取消了就去除中间这个节点。
具体看一下实现shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
方法的实现
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前面节点的状态值 int ws = pred.waitStatus; //表示等待被唤醒,直接返回 if (ws == Node.SIGNAL) return true; //大于0代表CANCELLED状态,将会被取消,此处就会移除节点, if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //否则设置为SIGNAL状态,表示等待被唤醒,获取锁 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //真正的挂起锁,并检测是否被中断了 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
此处就是使用到了文章开始所说的那几个状态值,
1.当node前一个节点的状态值为SIGNAL,表示这个节点是可以等待锁的。
2.如果不满足ws>0,表示前一个节点已经被取消,则干掉取消的节点。(他只管在他前面的那个节点,至于更前面的,就交给前面节点的线程去检测了)
3.否则就把pred的状态设置为SIGNAL。
最后如果失败了,会执行finally中的cancelAquire
方法,先用一张图来看一下(来自百度)
private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; //清除节点前面的已经取消的节点 while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; //如果节点在末尾,直接将前一个不为cancel的节点设置为末尾节点 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; //前节点不是队头,并且是等待的状态,设置连接的状态 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; //pred.next的pred => node.pred if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //否则就是获得锁 unparkSuccessor(node); } node.next = node; // help GC } }
在自旋中发生意外进入到finally的取消方法,其中的主要逻辑再次梳理一下:
1.如果当前的节点是空的,直接返回
2.如果上一个节点被取消,改变节点的连接状态,如果当前元素是tail,尝试将pred的下一个节点设置为null
3.如果不是tail元素,当前元素也不是head,将pred.next设置为node.next
4.否则帮助一下个元素获取锁(处于队列头了)。
具体看一下我们的unparkSuccessor方法
private void unparkSuccessor(Node node) { int ws = node.waitStatus; //如果node小于0 设置为0,为什么有这一步? if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //如果当前节点的下一个节点也是被取消的 if (s == null || s.waitStatus > 0) { s = null; //从后往前找到node后的第一个没取消的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //解锁这个节点 if (s != null) LockSupport.unpark(s.thread); }
自己到了下一个被唤醒的线程时,需要分情况的进行解锁,因为unparkSuccessor
方法是多个地方都要被使用的,所以虽然在我们的cancelAquire方法中,node已经设置为CANCEL状态,但是其他方法调用的时候node的状态就不确认了,所以这也是为什么还要继续判断状态的理由。
并且找到自己的下一个元素没被取消的节点,让他获得锁。需要注意的是如果下一个节点取消的话,就会采取从后往前找的方式,知道排列在最前面的那个节点. 为什么采用这种方式,因为我们在之前的代码也可以看到,在进行设置next节点的时候都上用CAS的方式,所以next节点设置成不成功也都是不确定的,而且没有一直进行尝试.从后往前保证了不会出现问题.
释放锁
看完获得锁之后,我们看一下如何进行锁释放的。
public final boolean release(int arg) { //尝试释放锁 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
和获得锁一样tryRelease
方法也是需要子类自己进行实现的。如果释放锁成功的话,就开始unparkSuccessor
方法为head的下一个节点获得锁。
这里就也用到了unparkSuccessor
方法。同时我们再次看unparkSuccessor
方法就会清晰很多,明白为什么要那么多判断了。
非独占式锁
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
和上面独占式锁的流程非常类似,我们再看一下doAcquireShared方法所做的事情。
private void doAcquireShared(int arg) { //添加一个非独占式的节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //判断是否到了一下个解锁的位置了 if (p == head) { int r = tryAcquireShared(arg); //如果成功获取锁,就设置head并传播 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } ...省略 }
此处和独占式不同的地方主要在setHeadAndPropagate方法,因为独占式获取了只会更改自己的状态。共享式则会传播到其他的节点。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); //满足条件调用doReleaseShared方法 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
哈哈看到这里,就直接到释放锁的方法了
private void doReleaseShared() { //循环释放锁 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
看懂了独占式,看非独占式就很简单了。
结语
为什么我们需要了解AQS,这是因为Lock锁,线程池,信号量(Semaphore)就是基于AQS实现的。
所以AQS是基础,当时我看ReentrantLock的时候就是一脸懵逼,当看完AQS之后再次看的时候就清晰很多了。同时不得不佩服Doug Lea大神。
本文主要的是基于自己的一些观点写的,如有表达不当或者描述错误的地方,欢迎给我反馈。
与君共勉!!
参考文献
占小狼-深入浅出java同步器AQS
java8-AbstractQueuedSynchronizer文档