CyclicBarrier源码分析
首先分析CyclicBarrier的核心方法await
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } }
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
可以看出不管是否带超时,但将会调用方法dowait方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
在代码执行前,将会使用一个ReentrantLock对象(CyclicBarrier的实例域)进行加锁,保证屏障的线程安全。
final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); }
接下来将检查屏障是否被打破,还有检查当前线程是否被中断
那么是如何检查屏障是否被打破,使用到的即是Generation对象,将根据该对象的实例域broken(一个布尔类型的值)来判断是否被打破
private static class Generation { boolean broken = false; }
Generation类只有一个布尔类型的实例域broken,用于作为屏障是否打破的标记
int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } }
接下来将开始计数,将屏障计数减一,然后判断是否为0,若为0则全部线程到达屏障,屏障将打开
在打开屏障前,将会执行CyclicBarrier的Runnable实例域barrierCommand,barrierCommand就是在构造函数中传参进行来的Runnable对象,即用户想要在所有线程到达后,而在屏障打开前锁要执行的任务
可以看出,如上说的一样,改任务是在屏障打开前执行的,而且是由最后一个到达屏障的线程执行
接下来可以看看打开屏障的方法nextGeneration
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
trip是一个Condition对象
private final Condition trip = lock.newCondition();
用于实现屏障的阻塞功能,当然,在所有线程到达屏障后,将会唤醒所有阻塞的线程
count是用于到达屏障的计数,而parties则是指定屏障的需要线程到达的数量,当屏障打开时,count将复原为需要到达数量
最后,将重新实例化一个Generation对象,然后替换掉原来的。
前面提到,Generation有个布尔类型实例域broken用于判断屏障是否被打破,那么直接用一个boolean类型不就行了吗,为什么还需要定义个Generation类。Generation所代表的是”一代”,而屏障需要标记的,不单只是是否被打破,还要标记屏障是否完成了一轮的任务,即被正常打开,显然这样需要两个标记值。而使用Generation对象用作表示每轮的屏障的状态,当验证屏障是否被打开时,只需要验证是否为同一个Generation对象即可。如果不抽象出一个Generation类,对于这种屏障状态的表达,将显得复杂而难以控制
for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } }
若然到达屏障的不是最后一个线程,将进入一个死循环,并且根据是否带超时,执行Condition对象的await方法进行阻塞,以此等待其他线程到达屏障,该过程在死循环中执行,是为了防止线程意外唤醒,破坏屏障的规则
若然线程在阻塞的过程中被中断,将会判断屏障是否完好,若然完好,则会将屏障打破,唤醒其他线程,这种情况是任何一个到达屏障的线程被中断都会引发的。若然线程在中断时,屏障已经打开或者被打破了,将再次执行当前线程的中断操作,而不做更多的操作
线程从阻塞中返回后,将有三个判断
第一个判断判断于屏障是否被打破,则线程是因为屏障被打破而唤醒的,抛出BrokenBarrierException异常
第二个判断是判断generation是否为同一个对象,则判断”本轮操作”完成了吗,屏障被正常打开,将返回一个整型index,index表达的是线程到达屏障的顺序,数值越大越早到达,index为0则表明该线程是最后一个到达的
第三个判断是用于超时判断,若线程由于超时而被唤醒,将会打破屏障,唤醒其他线程,并且抛出TimeoutException
打破屏障的方法
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
屏障打破后将唤醒所有在屏障中等待的线程,而且被打破后需要重置屏障,否则不能再使用该屏障
重置屏障方法
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // 打破当前屏障 nextGeneration(); // 下一代,即重新开始屏障计数 } finally { lock.unlock(); } }