Java并发学习之ReentrantLock的工作原理及使用姿势


声明:本文转载自https://my.oschina.net/u/566591/blog/1557978,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

Lock,ReentrantLock的工作原理及使用方式

jdk提供synchronized实现线程同步,但有些场景下并不灵活,如多个同步方法,每次只能有一个线程访问;而Lock则可以非常灵活的在代码中实现同步机制

I. Lock的使用

在之前学习阻塞队列中,较多地方使用 ReadWriteLock, Condition,接下来在探究实现原理之前,先研究下锁的使用

Lock 接口的定义

public interface Lock {       // 获取锁,若当前lock被其他线程获取;则此线程阻塞等待lock被释放      // 如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁     void lock();      // 获取锁,若当前锁不可用(被其他线程获取);     // 则阻塞线程,等待获取锁,则这个线程能够响应中断,即中断线程的等待状态     void lockInterruptibly() throws InterruptedException;      // 来尝试获取锁,如果获取成功,则返回true;     // 如果获取失败(即锁已被其他线程获取),则返回false     // 也就是说,这个方法无论如何都会立即返回     boolean tryLock();      // 在拿不到锁时会等待一定的时间     // 等待过程中,可以被中断     // 超过时间,依然获取不到,则返回false;否则返回true     boolean tryLock(long time, TimeUnit unit) throws InterruptedException;      // 释放锁     void unlock();      // 返回一个绑定该lock的Condtion对象     // 在Condition#await()之前,锁会被该线程持有     // Condition#await() 会自动释放锁,在wait返回之后,会自动获取锁     Condition newCondition(); } 

1. ReentrantLock

可重入锁。jdk中ReentrantLock是唯一实现了Lock接口的类

可重入的意思是一个线程拥有锁之后,可以再次获取锁,

基本使用

最基本的使用场景,就是利用lock和unlock来实现线程同步

以轮班为实例进行说明,要求一个人下班之后,另一个人才能上班,即不能两个人同时上班,具体实现可以如下

public class LockDemo {      private Lock lock = new ReentrantLock();      private void workOn() {         System.out.println(Thread.currentThread().getName() + ":上班!");     }      private void workOff() {         System.out.println(Thread.currentThread().getName() + ":下班");     }       public void work() {         try {             lock.lock();             workOn();             System.out.println(Thread.currentThread().getName()                + "工作中!!!!");             Thread.sleep(100);             workOff();         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             lock.unlock();         }     }      public static void main(String[] args) throws InterruptedException {         LockDemo lockDemo = new LockDemo();          int i = 0;         List<Thread> list = new ArrayList<>(30);         do {             Thread a = new Thread(new Runnable() {                 @Override                 public void run() {                     lockDemo.work();                 }             }, "小A_" + i);              Thread b = new Thread(new Runnable() {                 @Override                 public void run() {                     lockDemo.work();                 }             }, "小B_" + i);             list.add(a);           list.add(b);         } while (i++ < 10);         list.parallelStream().forEach(Thread::start);          Thread.sleep(3000);         System.out.println("main over!");     } } 

上面的示例,主要给出了lock()unlock() 的配套使用,当一个线程在上班迁尝试获取锁,如果获取到,则只有在下班之后才会释放锁,保证在其上班的过程中,不会有线程也跑来上岗抢饭碗,输出如下

小A_3:上班! 小A_3工作中!!!! 小A_3:下班 小A_1:上班! 小A_1工作中!!!! 小A_1:下班 // .... 省略部分 小B_7:上班! 小B_7工作中!!!! 小B_7:下班 小B_5:上班! 小B_5工作中!!!! 小B_5:下班 main over! 

从基本的使用中,确定lock的使用姿势一般如下:

  1. 创建锁对象 Lock lock = new ReentrantLock()
  2. 在希望保证线程同步的代码之前显示调用 lock.lock() 尝试获取锁,若被其他线程占用,则阻塞
  3. 执行完之后,一定得手动释放锁,否则会造成死锁 lock.unlock(); 一般来讲,把释放锁的逻辑,放在需要线程同步的代码包装外的finally块中

即常见的使用姿势应该是

try {   lock.lock();   // ..... } finally {   lock.unlock(); } 

一个疑问,若没被加锁,仅只执行lock.unlock()是否会有问题?

测试如下

@Test public void testLock() {     Lock lock = new ReentrantLock();     // lock.lock();     // lock.unlock();     lock.unlock();     System.out.println("123"); } 

执行之后,发现抛了异常(去掉上面的注释,即加锁一次,释放锁两次也会抛下面异常)

exception

另一个疑问,代码块中多次上锁,释放锁只一次,是否会有问题?

将前面的TestDemo方法稍稍改动一下

public void work() {     try {         lock.lock();         workOn();         lock.lock();         System.out.println(Thread.currentThread().getName()            + "工作中!!!!");         Thread.sleep(100);         workOff();     } catch (InterruptedException e) {         e.printStackTrace();     } finally {         lock.unlock();     } } 

再次执行,发现其他线程都无法再次获取到锁了,运行的gif图如下

gif

因此可以得出结论:

  1. lock() 和 unlock() 配套使用,不要出现一个比另一个用得多的情况
  2. 不要出现lock(),lock()连续调用的情况,即两者之间没有释放锁unlock()的显示调用

2. Condtion 及 Lock的配合使用

在JDK的阻塞队列中,很多地方就利用了Condition和Lock来实现出队入队的并发安全性,以 ArrayBlockingQueue为例

内部定义了锁,非空条件,非满条件

/** Main lock guarding all access */ final ReentrantLock lock;  /** Condition for waiting takes */ private final Condition notEmpty;  /** Condition for waiting puts */ private final Condition notFull;  public ArrayBlockingQueue(int capacity, boolean fair) {     // ... 省略     lock = new ReentrantLock(fair);     notEmpty = lock.newCondition();     notFull =  lock.newCondition(); } 

出队,入队的实现如下(屏蔽一些与锁无关逻辑)

// 入队逻辑 public void put(E e) throws InterruptedException {     // ...     final ReentrantLock lock = this.lock;     lock.lockInterruptibly();     try {         while (count == items.length)          // 如果队列已满,则执行Condtion notFull的等待方法         // 本线程会释放锁,等待其他线程出队之后,执行 notFull.singal()方法             notFull.await();         enqueue(e);     } finally {         // 释放锁         lock.unlock();     } }  private void enqueue(E x) {     // 入队,notEmpty 条件执行,唤醒被 notEmpty.await() 阻塞的出队线程     notEmpty.signal(); }   // 出队 public E take() throws InterruptedException {   final ReentrantLock lock = this.lock;   lock.lockInterruptibly();   try {       while (count == 0)           // 队列为空,线程执行notEmpty.wait(),阻塞并释放锁           // 等待其他入队线程执行  notEmpty.signal(); 后被唤醒           notEmpty.await();       return dequeue();   } finally {       lock.unlock();   } }  private E dequeue() {   // ...   // 出队,notFull 条件执行,唤醒被 notFull.await() 阻塞的入队线程   notFull.signal(); } 

下面看下Condition的定义

public interface Condition {      // 使当前线程处于等待状态,释放与Condtion绑定的lock锁     // 直到 singal()方法被调用后,被唤醒(若中断,就game over了)     // 唤醒后,该线程会再次获取与条件绑定的 lock锁     void await() throws InterruptedException;      // 相比较await()而言,不响应中断     void awaitUninterruptibly();      // 在wait()的返回条件基础上增加了超时响应,返回值表示当前剩余的时间     // < 0 ,则表示超时     long awaitNanos(long nanosTimeout) throws InterruptedException;      // 同上,只是时间参数不同而已     boolean await(long time, TimeUnit unit) throws InterruptedException;      // 同上,只是时间参数不同而已     boolean awaitUntil(Date deadline) throws InterruptedException;      // 表示条件达成,唤醒一个被条件阻塞的线程     void signal();      // 唤醒所有被条件阻塞的线程。     void signalAll(); } 

通过上面的注释,也就是说Condtion一般是与Lock配套使用,应用在多线程协同工作的场景中;即一个线程的执行,期望另一个线程执行完毕之后才完成

针对这种方式,我们写个测试类,来实现累加,要求如下:

  1. 线程1实现 start-middle的累加;线程2实现middle-end的累加
  2. 线程3实现线程1和线程2计算结果的相加

上面这种情况下,线程3的执行,要求线程1和线程2都执行完毕

说明:下面实现只是为了演示Condition和Lock的使用,上面这种场景有更好的选择,如Thread.join()或者利用Fork/Join都更加优雅

public class LockCountDemo {      private int start = 10;     private int middle = 90;     private int end = 200;      private volatile int tmpAns1 = 0;     private volatile int tmpAns2 = 0;      private Lock lock = new ReentrantLock();     private Condition condition = lock.newCondition();     private AtomicInteger count = new AtomicInteger(0);       private int add(int i, int j) {         try {             lock.lock();             int sum = 0;             for (int tmp = i; tmp < j; tmp++) {                 sum += tmp;             }             return sum;         } finally {             atomic();             lock.unlock();         }     }       private int sum() throws InterruptedException {         try {             lock.lock();             condition.await();             return tmpAns1 + tmpAns2;         } finally {             lock.unlock();         }     }      private void atomic() {         if (2 == count.addAndGet(1)) {             condition.signal();         }     }       public static void main(String[] args) throws InterruptedException {         LockCountDemo demo = new LockCountDemo();         Thread thread1 = new Thread(() -> {             System.out.println(Thread.currentThread().getName() +                  " : 开始执行");             demo.tmpAns1 = demo.add(demo.start, demo.middle);             System.out.println(Thread.currentThread().getName() +                     " : calculate ans: " + demo.tmpAns1);         }, "count1");          Thread thread2 = new Thread(() -> {             System.out.println(Thread.currentThread().getName() +                 " : 开始执行");             demo.tmpAns2 = demo.add(demo.middle, demo.end + 1);             System.out.println(Thread.currentThread().getName() +                     " : calculate ans: " + demo.tmpAns2);         }, "count2");          Thread thread3 = new Thread(() -> {             try {                 System.out.println(Thread.currentThread().getName() +                     " : 开始执行");                 int ans = demo.sum();                 System.out.println("the total result: " + ans);             } catch (Exception e) {                 e.printStackTrace();             }         }, "sum");           thread3.start();         thread1.start();         thread2.start();          Thread.sleep(3000);         System.out.println("over");     } } 

输出如下

sum : 开始执行 count2 : 开始执行 count1 : 开始执行 count1 : calculate ans: 3960 the total result: 20055 count2 : calculate ans: 16095 over 

小结Condition的使用:

  • Condition与Lock配套使用,通过 Lock#newConditin() 进行实例化
  • Condition#await() 会释放lock,线程阻塞;直到线程中断or Condition#singal()被执行,唤醒阻塞线程,并重新获取lock
  • 经典case可以参考jdk的阻塞队列实现(ArrayBlockingQueue, LinkedBlockingQueue)

II. ReentrantLock实现原理

1. AbstractQueuedSynchronizer (简称AQS)

AQS是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题

AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus

private transient volatile Node head;  private transient volatile Node tail;  private volatile int state;  static final class Node {     static final Node SHARED = new Node();     static final Node EXCLUSIVE = null;      /** waitStatus value to indicate thread has cancelled */     static final int CANCELLED =  1;     /** waitStatus value to indicate successor's thread needs unparking */     static final int SIGNAL    = -1;     /** waitStatus value to indicate thread is waiting on condition */     static final int CONDITION = -2;     /**      * waitStatus value to indicate the next acquireShared should      * unconditionally propagate      */     static final int PROPAGATE = -3;      //取值为 CANCELLED, SIGNAL, CONDITION, PROPAGATE 之一     volatile int waitStatus;      volatile Node prev;      volatile Node next;      // Link to next node waiting on condition,      // or the special value SHARED     volatile Thread thread;      Node nextWaiter; } 

artch

2. lock()实现

final void lock() {     if (compareAndSetState(0, 1))         setExclusiveOwnerThread(Thread.currentThread());     else         acquire(1); }  public final void acquire(int arg) {     if (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         selfInterrupt(); } 

源码分析:

  • 首先用一个CAS操作,判断state是否是0(表示当前锁未被占用),
    • 如果是0则把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功
  • 当多个线程同时尝试占用同一个锁时,CAS操作只能保证一个线程操作成功,其他被阻塞

acquire的逻辑如下:

tyrAcquire: 尝试获取锁,非阻塞立即返回

final boolean nonfairTryAcquire(int acquires) {     final Thread current = Thread.currentThread();     int c = getState();     if (c == 0) { // 没有线程占用锁         if (compareAndSetState(0, acquires)) {           // 占用锁成功,设置为独占             setExclusiveOwnerThread(current);             return true;         }     } else if (current == getExclusiveOwnerThread()) {         // 本线程之前已经获取到锁         int nextc = c + acquires;         if (nextc < 0) // overflow             throw new Error("Maximum lock count exceeded");         // 更新重入次数         setState(nextc);         return true;     }     return false; } 

非公平锁tryAcquire的流程是:

  • 检查state字段,若为0,表示锁未被占用,那么尝试占用
  • 若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数。
  • 如果以上两点都没有成功,则获取锁失败,返回false

addWaiter: 入队

private Node addWaiter(Node mode) {     // 创建一个节点,并将其挂在链表的最后     Node node = new Node(Thread.currentThread(), mode);     // Try the fast path of enq; backup to full enq on failure     Node pred = tail;     if (pred != null) {         node.prev = pred;         if (compareAndSetTail(pred, node)) {             pred.next = node;             return node;         }     }          // 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点     enq(node);     return node; } 

acquireQueued挂起

final boolean acquireQueued(final Node node, int arg) {     boolean failed = true;     try {         // /标记线程是否被中断过         boolean interrupted = false;         for (;;) {             //获取前驱节点             final Node p = node.predecessor();             // 若该节点为有效的队列头(head指向的Node内部实际为空)             // 尝试获取锁             if (p == head && tryAcquire(arg)) {                 setHead(node); // 获取成功,将当前节点设置为head节点                 p.next = null; // help GC                 failed = false;                 //返回是否被中断过                 return interrupted;             }             // 判断获取失败后是否可以挂起,若可以则挂起             if (shouldParkAfterFailedAcquire(p, node) &&                 parkAndCheckInterrupt())                  // 线程若被中断,设置interrupted为true                 interrupted = true;         }     } finally {         if (failed)             cancelAcquire(node);     } } 

线程挂起的逻辑

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {     int ws = pred.waitStatus; //前驱节点的状态     if (ws == Node.SIGNAL) // 前驱节点状态为signal,返回true         return true;     if (ws > 0) {      // 从队尾向前寻找第一个状态不为CANCELLED的节点         do {             node.prev = pred = pred.prev;         } while (pred.waitStatus > 0);         pred.next = node;     } else {         // 将前驱节点的状态设置为SIGNAL         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     }     return false; } 

线程入队后能够挂起的前提是,它的前驱节点的状态为SIGNAL,它的含义是“Hi,前面的兄弟,如果你获取锁并且出队后,记得把我唤醒!”。

shouldParkAfterFailedAcquire会先判断当前节点的前驱状态是否符合要求:

  • 若符合则返回true,然后调用parkAndCheckInterrupt,将自己挂起
  • 如果不符合,再看前驱节点是否>0(CANCELLED)
  • 若是那么向前遍历直到找到第一个符合要求的前驱
  • 若不是则将前驱节点的状态设置为SIGNAL

小结下lock()流程

  1. CAS判断state是否为0来表示锁是否被占用;若未被占用,则独占锁
  2. 否则,尝试获取锁 acquire()
  3. 若尝试获取锁成功(锁就是被当前线程占用的,重入计数+1即可;或者锁正好被释放)
  4. 获取锁失败,则需要创建一个Node节点(包含了线程信息)入队
  5. 挂起线程 acquireQueued, 挂起之前,会先尝试获取锁,值有确认失败之后,则挂起锁,并设置前置Node的状态为SIGNAL(以保障在释放锁的时候,可以保证唤醒Node的后驱节点线程)

3. unlock的实现

尝试释放锁,成功,需要清楚各种状态(计数,释放独占锁)

此外还需要额外判断队列下个节点是否需要唤醒,然后决定唤醒被挂起的线程;

public final boolean release(int arg) {     if (tryRelease(arg)) { // 尝试释放锁         Node h = head;         if (h != null && h.waitStatus != 0)         // 查看头结点的状态是否为SIGNAL,如果是则唤醒头结点的下个节点关联的线程             unparkSuccessor(h);         return true;     }     return false; }  protected final boolean tryRelease(int releases) {     int c = getState() - releases; // 计算释放后state值     if (Thread.currentThread() != getExclusiveOwnerThread())         // 如果不是当前线程占用锁,那么抛出异常         throw new IllegalMonitorStateException();     boolean free = false;     if (c == 0) {         // 锁被重入次数为0,表示释放成功,清空独占线程         free = true;         setExclusiveOwnerThread(null);     }     setState(c);     return free; } 

小结

  1. 创建锁对象 Lock lock = new ReentrantLock()
  2. 在希望保证线程同步的代码之前显示调用 lock.lock() 尝试获取锁,若被其他线程占用,则阻塞
  3. 执行完之后,一定得手动释放锁,否则会造成死锁 lock.unlock(); 一般来讲,把释放锁的逻辑,放在需要线程同步的代码包装外的finally块中
  4. lock() 和 unlock() 配套使用,不要出现一个比另一个用得多的情况
  5. 不要出现lock(),lock()连续调用的情况,即两者之间没有释放锁unlock()的显示调用
  6. Condition与Lock配套使用,通过 Lock#newConditin() 进行实例化
  7. Condition#await() 会释放lock,线程阻塞;直到线程中断or Condition#singal()被执行,唤醒阻塞线程,并重新获取lock
  8. ReentrantLock#lock的流程图大致如下

lock

参考

ReentrantLock实现原理详解

扫描关注,java分享

QrCode

本文发表于2017年10月31日 00:34
(c)注:本文转载自https://my.oschina.net/u/566591/blog/1557978,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1512 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1