Lock,ReentrantLock的工作原理及使用方式
jdk提供synchronized
实现线程同步,但有些场景下并不灵活,如多个同步方法,每次只能有一个线程访问;而Lock则可以非常灵活的在代码中实现同步机制
I. Lock的使用
在之前学习阻塞队列中,较多地方使用 ReadWriteLock
, Condition
,接下来在探究实现原理之前,先研究下锁的使用
Lock 接口的定义
public interface Lock { // 获取锁,若当前lock被其他线程获取;则此线程阻塞等待lock被释放 // 如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁 void lock(); // 获取锁,若当前锁不可用(被其他线程获取); // 则阻塞线程,等待获取锁,则这个线程能够响应中断,即中断线程的等待状态 void lockInterruptibly() throws InterruptedException; // 来尝试获取锁,如果获取成功,则返回true; // 如果获取失败(即锁已被其他线程获取),则返回false // 也就是说,这个方法无论如何都会立即返回 boolean tryLock(); // 在拿不到锁时会等待一定的时间 // 等待过程中,可以被中断 // 超过时间,依然获取不到,则返回false;否则返回true boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 释放锁 void unlock(); // 返回一个绑定该lock的Condtion对象 // 在Condition#await()之前,锁会被该线程持有 // Condition#await() 会自动释放锁,在wait返回之后,会自动获取锁 Condition newCondition(); }
1. ReentrantLock
可重入锁。jdk中ReentrantLock是唯一实现了Lock接口的类
可重入的意思是一个线程拥有锁之后,可以再次获取锁,
基本使用
最基本的使用场景,就是利用lock和unlock来实现线程同步
以轮班为实例进行说明,要求一个人下班之后,另一个人才能上班,即不能两个人同时上班,具体实现可以如下
public class LockDemo { private Lock lock = new ReentrantLock(); private void workOn() { System.out.println(Thread.currentThread().getName() + ":上班!"); } private void workOff() { System.out.println(Thread.currentThread().getName() + ":下班"); } public void work() { try { lock.lock(); workOn(); System.out.println(Thread.currentThread().getName() + "工作中!!!!"); Thread.sleep(100); workOff(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { LockDemo lockDemo = new LockDemo(); int i = 0; List<Thread> list = new ArrayList<>(30); do { Thread a = new Thread(new Runnable() { @Override public void run() { lockDemo.work(); } }, "小A_" + i); Thread b = new Thread(new Runnable() { @Override public void run() { lockDemo.work(); } }, "小B_" + i); list.add(a); list.add(b); } while (i++ < 10); list.parallelStream().forEach(Thread::start); Thread.sleep(3000); System.out.println("main over!"); } }
上面的示例,主要给出了lock()
和 unlock()
的配套使用,当一个线程在上班迁尝试获取锁,如果获取到,则只有在下班之后才会释放锁,保证在其上班的过程中,不会有线程也跑来上岗抢饭碗,输出如下
小A_3:上班! 小A_3工作中!!!! 小A_3:下班 小A_1:上班! 小A_1工作中!!!! 小A_1:下班 // .... 省略部分 小B_7:上班! 小B_7工作中!!!! 小B_7:下班 小B_5:上班! 小B_5工作中!!!! 小B_5:下班 main over!
从基本的使用中,确定lock的使用姿势一般如下:
- 创建锁对象
Lock lock = new ReentrantLock()
- 在希望保证线程同步的代码之前显示调用
lock.lock()
尝试获取锁,若被其他线程占用,则阻塞 - 执行完之后,一定得手动释放锁,否则会造成死锁
lock.unlock()
; 一般来讲,把释放锁的逻辑,放在需要线程同步的代码包装外的finally
块中
即常见的使用姿势应该是
try { lock.lock(); // ..... } finally { lock.unlock(); }
一个疑问,若没被加锁,仅只执行lock.unlock()
是否会有问题?
测试如下
@Test public void testLock() { Lock lock = new ReentrantLock(); // lock.lock(); // lock.unlock(); lock.unlock(); System.out.println("123"); }
执行之后,发现抛了异常(去掉上面的注释,即加锁一次,释放锁两次也会抛下面异常)
另一个疑问,代码块中多次上锁,释放锁只一次,是否会有问题?
将前面的TestDemo方法稍稍改动一下
public void work() { try { lock.lock(); workOn(); lock.lock(); System.out.println(Thread.currentThread().getName() + "工作中!!!!"); Thread.sleep(100); workOff(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
再次执行,发现其他线程都无法再次获取到锁了,运行的gif图如下
因此可以得出结论:
- lock() 和 unlock() 配套使用,不要出现一个比另一个用得多的情况
- 不要出现
lock()
,lock()
连续调用的情况,即两者之间没有释放锁unlock()
的显示调用
2. Condtion 及 Lock的配合使用
在JDK的阻塞队列中,很多地方就利用了Condition和Lock来实现出队入队的并发安全性,以 ArrayBlockingQueue
为例
内部定义了锁,非空条件,非满条件
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { // ... 省略 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
出队,入队的实现如下(屏蔽一些与锁无关逻辑)
// 入队逻辑 public void put(E e) throws InterruptedException { // ... final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 如果队列已满,则执行Condtion notFull的等待方法 // 本线程会释放锁,等待其他线程出队之后,执行 notFull.singal()方法 notFull.await(); enqueue(e); } finally { // 释放锁 lock.unlock(); } } private void enqueue(E x) { // 入队,notEmpty 条件执行,唤醒被 notEmpty.await() 阻塞的出队线程 notEmpty.signal(); } // 出队 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 队列为空,线程执行notEmpty.wait(),阻塞并释放锁 // 等待其他入队线程执行 notEmpty.signal(); 后被唤醒 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // ... // 出队,notFull 条件执行,唤醒被 notFull.await() 阻塞的入队线程 notFull.signal(); }
下面看下Condition的定义
public interface Condition { // 使当前线程处于等待状态,释放与Condtion绑定的lock锁 // 直到 singal()方法被调用后,被唤醒(若中断,就game over了) // 唤醒后,该线程会再次获取与条件绑定的 lock锁 void await() throws InterruptedException; // 相比较await()而言,不响应中断 void awaitUninterruptibly(); // 在wait()的返回条件基础上增加了超时响应,返回值表示当前剩余的时间 // < 0 ,则表示超时 long awaitNanos(long nanosTimeout) throws InterruptedException; // 同上,只是时间参数不同而已 boolean await(long time, TimeUnit unit) throws InterruptedException; // 同上,只是时间参数不同而已 boolean awaitUntil(Date deadline) throws InterruptedException; // 表示条件达成,唤醒一个被条件阻塞的线程 void signal(); // 唤醒所有被条件阻塞的线程。 void signalAll(); }
通过上面的注释,也就是说Condtion一般是与Lock配套使用,应用在多线程协同工作的场景中;即一个线程的执行,期望另一个线程执行完毕之后才完成
针对这种方式,我们写个测试类,来实现累加,要求如下:
- 线程1实现 start-middle的累加;线程2实现middle-end的累加
- 线程3实现线程1和线程2计算结果的相加
上面这种情况下,线程3的执行,要求线程1和线程2都执行完毕
说明:下面实现只是为了演示Condition和Lock的使用,上面这种场景有更好的选择,如Thread.join()或者利用Fork/Join都更加优雅
public class LockCountDemo { private int start = 10; private int middle = 90; private int end = 200; private volatile int tmpAns1 = 0; private volatile int tmpAns2 = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private AtomicInteger count = new AtomicInteger(0); private int add(int i, int j) { try { lock.lock(); int sum = 0; for (int tmp = i; tmp < j; tmp++) { sum += tmp; } return sum; } finally { atomic(); lock.unlock(); } } private int sum() throws InterruptedException { try { lock.lock(); condition.await(); return tmpAns1 + tmpAns2; } finally { lock.unlock(); } } private void atomic() { if (2 == count.addAndGet(1)) { condition.signal(); } } public static void main(String[] args) throws InterruptedException { LockCountDemo demo = new LockCountDemo(); Thread thread1 = new Thread(() -> { System.out.println(Thread.currentThread().getName() + " : 开始执行"); demo.tmpAns1 = demo.add(demo.start, demo.middle); System.out.println(Thread.currentThread().getName() + " : calculate ans: " + demo.tmpAns1); }, "count1"); Thread thread2 = new Thread(() -> { System.out.println(Thread.currentThread().getName() + " : 开始执行"); demo.tmpAns2 = demo.add(demo.middle, demo.end + 1); System.out.println(Thread.currentThread().getName() + " : calculate ans: " + demo.tmpAns2); }, "count2"); Thread thread3 = new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " : 开始执行"); int ans = demo.sum(); System.out.println("the total result: " + ans); } catch (Exception e) { e.printStackTrace(); } }, "sum"); thread3.start(); thread1.start(); thread2.start(); Thread.sleep(3000); System.out.println("over"); } }
输出如下
sum : 开始执行 count2 : 开始执行 count1 : 开始执行 count1 : calculate ans: 3960 the total result: 20055 count2 : calculate ans: 16095 over
小结Condition的使用:
- Condition与Lock配套使用,通过
Lock#newConditin()
进行实例化 Condition#await()
会释放lock,线程阻塞;直到线程中断or Condition#singal()
被执行,唤醒阻塞线程,并重新获取lock - 经典case可以参考jdk的阻塞队列实现(ArrayBlockingQueue, LinkedBlockingQueue)
II. ReentrantLock实现原理
1. AbstractQueuedSynchronizer (简称AQS)
AQS是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题
AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus
private transient volatile Node head; private transient volatile Node tail; private volatile int state; static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; //取值为 CANCELLED, SIGNAL, CONDITION, PROPAGATE 之一 volatile int waitStatus; volatile Node prev; volatile Node next; // Link to next node waiting on condition, // or the special value SHARED volatile Thread thread; Node nextWaiter; }
2. lock()实现
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
源码分析:
- 首先用一个CAS操作,判断state是否是0(表示当前锁未被占用),
- 如果是0则把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功
- 当多个线程同时尝试占用同一个锁时,CAS操作只能保证一个线程操作成功,其他被阻塞
acquire的逻辑如下:
tyrAcquire: 尝试获取锁,非阻塞立即返回
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 没有线程占用锁 if (compareAndSetState(0, acquires)) { // 占用锁成功,设置为独占 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 本线程之前已经获取到锁 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 更新重入次数 setState(nextc); return true; } return false; }
非公平锁tryAcquire的流程是:
- 检查state字段,若为0,表示锁未被占用,那么尝试占用
- 若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数。
- 如果以上两点都没有成功,则获取锁失败,返回false
addWaiter
: 入队
private Node addWaiter(Node mode) { // 创建一个节点,并将其挂在链表的最后 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点 enq(node); return node; }
acquireQueued
挂起
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // /标记线程是否被中断过 boolean interrupted = false; for (;;) { //获取前驱节点 final Node p = node.predecessor(); // 若该节点为有效的队列头(head指向的Node内部实际为空) // 尝试获取锁 if (p == head && tryAcquire(arg)) { setHead(node); // 获取成功,将当前节点设置为head节点 p.next = null; // help GC failed = false; //返回是否被中断过 return interrupted; } // 判断获取失败后是否可以挂起,若可以则挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 线程若被中断,设置interrupted为true interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
线程挂起的逻辑
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前驱节点的状态 if (ws == Node.SIGNAL) // 前驱节点状态为signal,返回true return true; if (ws > 0) { // 从队尾向前寻找第一个状态不为CANCELLED的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将前驱节点的状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
线程入队后能够挂起的前提是,它的前驱节点的状态为SIGNAL,它的含义是“Hi,前面的兄弟,如果你获取锁并且出队后,记得把我唤醒!”。
shouldParkAfterFailedAcquire
会先判断当前节点的前驱状态是否符合要求:
- 若符合则返回true,然后调用parkAndCheckInterrupt,将自己挂起
- 如果不符合,再看前驱节点是否>0(CANCELLED)
- 若是那么向前遍历直到找到第一个符合要求的前驱
- 若不是则将前驱节点的状态设置为SIGNAL
小结下lock()流程
- CAS判断state是否为0来表示锁是否被占用;若未被占用,则独占锁
- 否则,尝试获取锁
acquire()
- 若尝试获取锁成功(锁就是被当前线程占用的,重入计数+1即可;或者锁正好被释放)
- 获取锁失败,则需要创建一个Node节点(包含了线程信息)入队
- 挂起线程
acquireQueued
, 挂起之前,会先尝试获取锁,值有确认失败之后,则挂起锁,并设置前置Node的状态为SIGNAL(以保障在释放锁的时候,可以保证唤醒Node的后驱节点线程)
3. unlock的实现
尝试释放锁,成功,需要清楚各种状态(计数,释放独占锁)
此外还需要额外判断队列下个节点是否需要唤醒,然后决定唤醒被挂起的线程;
public final boolean release(int arg) { if (tryRelease(arg)) { // 尝试释放锁 Node h = head; if (h != null && h.waitStatus != 0) // 查看头结点的状态是否为SIGNAL,如果是则唤醒头结点的下个节点关联的线程 unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; // 计算释放后state值 if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果不是当前线程占用锁,那么抛出异常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 锁被重入次数为0,表示释放成功,清空独占线程 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
小结
- 创建锁对象
Lock lock = new ReentrantLock()
- 在希望保证线程同步的代码之前显示调用
lock.lock()
尝试获取锁,若被其他线程占用,则阻塞 - 执行完之后,一定得手动释放锁,否则会造成死锁
lock.unlock()
; 一般来讲,把释放锁的逻辑,放在需要线程同步的代码包装外的finally
块中 - lock() 和 unlock() 配套使用,不要出现一个比另一个用得多的情况
- 不要出现
lock()
,lock()
连续调用的情况,即两者之间没有释放锁unlock()
的显示调用 - Condition与Lock配套使用,通过
Lock#newConditin()
进行实例化 Condition#await()
会释放lock,线程阻塞;直到线程中断or Condition#singal()
被执行,唤醒阻塞线程,并重新获取lock ReentrantLock#lock
的流程图大致如下
参考
ReentrantLock实现原理详解
扫描关注,java分享