简单聊一聊AbstractQueuedSynchronizer


声明:本文转载自https://my.oschina.net/eumji025/blog/1614129,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

AQS简介

AbstractQueuedSynchronizer是一个基于first-in-first-out (FIFO)队列实现阻塞锁和同步器功能的框架。简称AQS,此类的设计目标是成为依靠单个原子 int 值来表示状态。子类必须定义更改此状态的方法(如tryReleaseShared),并定义哪种状态对于此对象意味着被获取或被释放。

我们首先看一下AQS类中FIFO队列的真实面貌

static final class Node {     static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();     static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;     static final int CANCELLED = 1;     static final int SIGNAL = -1;     static final int CONDITION = -2;     static final int PROPAGATE = -3;     volatile int waitStatus;     volatile AbstractQueuedSynchronizer.Node prev;     volatile AbstractQueuedSynchronizer.Node next;     volatile Thread thread;     AbstractQueuedSynchronizer.Node nextWaiter; } 

可以看出其实就是一个双向队列,应该是非常容易理解的。在这个对象里面设置了好几种状态值,这主要用于设置锁的状态值。

CANCELLED => 取消状态

SIGNAL => 等待触发状态

CONDITION => 等待条件状态

PROPAGATE => 状态需要向后传播

后面具体在介绍其中的含义,不过我倒是觉得这几个状态值应该自己用个枚举维护会更好。。。

而SHARED对象是用于共享锁如(CountDownLatch),EXCLUSIVE则是用于独占锁(ReentrantLock)

方法原理

本文将从独占式和非独占锁两部分分别讲述一下AQS的工作原理。

独占式锁

首先介绍一下acquire方法,此方法是以独占模式获取对象,忽略中断。tryAcquire方法尝试获取锁,需要子类自己实现。

public final void acquire(int arg) {     if (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         selfInterrupt(); } 

这里主要是三个步骤:

1.首先直接先尝试获取锁(tryAcquire),如果获取到就结束了,如果获取不到; 2.则会将这个线程加入到等待队列(addWaiter(Node.EXCLUSIVE), arg)); 3.进行自旋,尝试获取锁(acquireQueued)。这也是和synchronized方法不同的地方。

当获取失败的时候就需要将线程加入到等待的队列中,看一下addWaiter方法的具体实现。

private Node addWaiter(Node mode) {   //将当前线程包装成一个节点     Node node = new Node(Thread.currentThread(), mode);     Node pred = tail;   // 如果存在等待的队列,直接加入到队尾     if (pred != null) {         node.prev = pred;       //使用CAS方式设置tail节点         if (compareAndSetTail(pred, node)) {             pred.next = node;             return node;         }     }   //不存在的时候创建队列     enq(node);     return node; } 

这里总共做了三件事情

1.首先把当前对象包装成一个node

2.判断等待队列是否为空,如果等待队列已经存在,直接添加到队列末尾

3.如果为空的话,需要尝试新建一个等待队列

为什么说是尝试新建队列,下面看一下具体enq方法的实现就能知道

private Node enq(final Node node) {     for (;;) {         Node t = tail;         if (t == null) { //初始化队列头             if (compareAndSetHead(new Node()))                 tail = head;         } else {           //如果已经初始化了就添加到队尾             node.prev = t;             if (compareAndSetTail(t, node)) {                 t.next = node;                 return t;             }         }     } } 

此处为什么要这么设计,因为AQS的方法都是没有加锁的,所以很有可能进入enq方法的时候别的线程初始化过等待队列了。所以此处也是用CAS操作,都是为了防止发生错误的可能性。关于CAS的详细介绍,可以自己查阅相关的资料。

上面介绍完加入队列后,下面就要看一下acquireQueued方法的实现了,大概也能猜到是怎么进行自旋的。

final boolean acquireQueued(final Node node, int arg) {     boolean failed = true;     try {         boolean interrupted = false;         for (;;) {           //获取node的前一个元素,满足条件就直接设置为新的队头,p就要被干掉             final Node p = node.predecessor();             if (p == head && tryAcquire(arg)) {                 setHead(node);                 p.next = null; // help GC                 failed = false;                 return interrupted;             }           //如果不满足条件,就需要把线程挂起并检查中断状态             if (shouldParkAfterFailedAcquire(p, node) &&                 parkAndCheckInterrupt())                 interrupted = true;         }     } finally {       //如果最终都没有找到合适的,取消等待的节点         if (failed)             cancelAcquire(node);     } } 

上面的方法是线程自旋获取锁的方法,主要做了一下几件事情

1.首先检查当前节点是否为head的下一个节点,如果是=>第二步,否则=>第三步 2.尝试获取锁,如果获取失败则可能被别人插队了...则进行第三步,如果成功将当前线程设置为头节点,返回 3.如果没有获得锁,则挂起当前线程并检查中断状态,此状态主要用于wait方法

此处节点会一直检查当前是否为下一个被唤醒的位置。如果没有到就会检测中间元素是否已经被取消,如果取消了就去除中间这个节点。

具体看一下实现shouldParkAfterFailedAcquireparkAndCheckInterrupt方法的实现

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    //获取前面节点的状态值     int ws = pred.waitStatus;     //表示等待被唤醒,直接返回     if (ws == Node.SIGNAL)         return true;     //大于0代表CANCELLED状态,将会被取消,此处就会移除节点,     if (ws > 0) {         do {             node.prev = pred = pred.prev;         } while (pred.waitStatus > 0);         pred.next = node;     } else {        //否则设置为SIGNAL状态,表示等待被唤醒,获取锁         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     }     return false; } //真正的挂起锁,并检测是否被中断了 private final boolean parkAndCheckInterrupt() {     LockSupport.park(this);     return Thread.interrupted(); } 

此处就是使用到了文章开始所说的那几个状态值,

1.当node前一个节点的状态值为SIGNAL,表示这个节点是可以等待锁的。

2.如果不满足ws>0,表示前一个节点已经被取消,则干掉取消的节点。(他只管在他前面的那个节点,至于更前面的,就交给前面节点的线程去检测了)

3.否则就把pred的状态设置为SIGNAL。

最后如果失败了,会执行finally中的cancelAquire方法,先用一张图来看一下(来自百度)

private void cancelAcquire(Node node) {     if (node == null)         return;     node.thread = null;     Node pred = node.prev;   //清除节点前面的已经取消的节点     while (pred.waitStatus > 0)         node.prev = pred = pred.prev;      Node predNext = pred.next;     node.waitStatus = Node.CANCELLED;    //如果节点在末尾,直接将前一个不为cancel的节点设置为末尾节点     if (node == tail && compareAndSetTail(node, pred)) {         compareAndSetNext(pred, predNext, null);     } else {         int ws;        //前节点不是队头,并且是等待的状态,设置连接的状态         if (pred != head &&             ((ws = pred.waitStatus) == Node.SIGNAL ||              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&             pred.thread != null) {             Node next = node.next;            //pred.next的pred => node.pred             if (next != null && next.waitStatus <= 0)                 compareAndSetNext(pred, predNext, next);         } else {            //否则就是获得锁             unparkSuccessor(node);         }         node.next = node; // help GC     } } 

在自旋中发生意外进入到finally的取消方法,其中的主要逻辑再次梳理一下:

1.如果当前的节点是空的,直接返回

2.如果上一个节点被取消,改变节点的连接状态,如果当前元素是tail,尝试将pred的下一个节点设置为null

3.如果不是tail元素,当前元素也不是head,将pred.next设置为node.next

4.否则帮助一下个元素获取锁(处于队列头了)。

具体看一下我们的unparkSuccessor方法

private void unparkSuccessor(Node node) {     int ws = node.waitStatus;     //如果node小于0 设置为0,为什么有这一步?     if (ws < 0)         compareAndSetWaitStatus(node, ws, 0);      Node s = node.next;    //如果当前节点的下一个节点也是被取消的     if (s == null || s.waitStatus > 0) {         s = null;        //从后往前找到node后的第一个没取消的节点         for (Node t = tail; t != null && t != node; t = t.prev)             if (t.waitStatus <= 0)                 s = t;     }   //解锁这个节点     if (s != null)         LockSupport.unpark(s.thread); } 

自己到了下一个被唤醒的线程时,需要分情况的进行解锁,因为unparkSuccessor方法是多个地方都要被使用的,所以虽然在我们的cancelAquire方法中,node已经设置为CANCEL状态,但是其他方法调用的时候node的状态就不确认了,所以这也是为什么还要继续判断状态的理由。

并且找到自己的下一个元素没被取消的节点,让他获得锁。需要注意的是如果下一个节点取消的话,就会采取从后往前找的方式,知道排列在最前面的那个节点. 为什么采用这种方式,因为我们在之前的代码也可以看到,在进行设置next节点的时候都上用CAS的方式,所以next节点设置成不成功也都是不确定的,而且没有一直进行尝试.从后往前保证了不会出现问题.

释放锁

看完获得锁之后,我们看一下如何进行锁释放的。

public final boolean release(int arg) {   //尝试释放锁     if (tryRelease(arg)) {         Node h = head;         if (h != null && h.waitStatus != 0)             unparkSuccessor(h);         return true;     }     return false; } 

和获得锁一样tryRelease方法也是需要子类自己进行实现的。如果释放锁成功的话,就开始unparkSuccessor 方法为head的下一个节点获得锁。

这里就也用到了unparkSuccessor方法。同时我们再次看unparkSuccessor方法就会清晰很多,明白为什么要那么多判断了。

非独占式锁

public final void acquireShared(int arg) {     if (tryAcquireShared(arg) < 0)         doAcquireShared(arg); } 

和上面独占式锁的流程非常类似,我们再看一下doAcquireShared方法所做的事情。

private void doAcquireShared(int arg) {     //添加一个非独占式的节点     final Node node = addWaiter(Node.SHARED);     boolean failed = true;     try {         boolean interrupted = false;         for (;;) {             final Node p = node.predecessor();             //判断是否到了一下个解锁的位置了             if (p == head) {                 int r = tryAcquireShared(arg);                 //如果成功获取锁,就设置head并传播                 if (r >= 0) {                     setHeadAndPropagate(node, r);                     p.next = null; // help GC                     if (interrupted)                         selfInterrupt();                     failed = false;                     return;                 }             }            ...省略 } 

此处和独占式不同的地方主要在setHeadAndPropagate方法,因为独占式获取了只会更改自己的状态。共享式则会传播到其他的节点。

  private void setHeadAndPropagate(Node node, int propagate) {         Node h = head; // Record old head for check below         setHead(node);      	//满足条件调用doReleaseShared方法         if (propagate > 0 || h == null || h.waitStatus < 0 ||             (h = head) == null || h.waitStatus < 0) {             Node s = node.next;             if (s == null || s.isShared())                 doReleaseShared();         }     } 

哈哈看到这里,就直接到释放锁的方法了

 private void doReleaseShared() {       //循环释放锁         for (;;) {             Node h = head;             if (h != null && h != tail) {                 int ws = h.waitStatus;                 if (ws == Node.SIGNAL) {                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                         continue;            // loop to recheck cases                     unparkSuccessor(h);                 }                 else if (ws == 0 &&                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                     continue;                // loop on failed CAS             }             if (h == head)                   // loop if head changed                 break;         }     } 

看懂了独占式,看非独占式就很简单了。

结语

为什么我们需要了解AQS,这是因为Lock锁,线程池,信号量(Semaphore)就是基于AQS实现的。

所以AQS是基础,当时我看ReentrantLock的时候就是一脸懵逼,当看完AQS之后再次看的时候就清晰很多了。同时不得不佩服Doug Lea大神。

本文主要的是基于自己的一些观点写的,如有表达不当或者描述错误的地方,欢迎给我反馈。

与君共勉!!

参考文献

占小狼-深入浅出java同步器AQS

java8-AbstractQueuedSynchronizer文档

本文发表于2018年01月29日 08:38
(c)注:本文转载自https://my.oschina.net/eumji025/blog/1614129,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1800 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1