public class CyclicBarrierTest { // 线程个数 private int parties = 3; private AtomicInteger atomicInteger = new AtomicInteger(parties); private CyclicBarrier cyclicBarrier; class Protector implements Runnable { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " - 到达屏障前"); TimeUnit.SECONDS.sleep(2); cyclicBarrier.await(); atomicInteger.decrementAndGet(); System.out.println(Thread.currentThread().getName() + " - 到达屏障后"); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " - 等待中断"); } catch (BrokenBarrierException e) { System.out.println(Thread.currentThread().getName() + " - 屏障被破坏"); } } } @Before public void init() { cyclicBarrier = new CyclicBarrier(parties); } @Test public void allAwait() { for (int i = 0; i < parties; i++) { new Thread(new Protector(), "Thread-" + i).start(); } while (true) { if (atomicInteger.get() == 0) { // 所有线程到达屏障后退出结束 System.out.println("test over"); break; } } } @Test public void oneAwaitInterrupted() throws InterruptedException { Thread threadA = new Thread(new Protector(), "Thread-A"); Thread threadB = new Thread(new Protector(), "Thread-B"); threadA.start(); threadB.start(); // 等待 3 秒,避免是 time sleep 触发中断异常 TimeUnit.SECONDS.sleep(3); threadA.interrupt(); while (true) { if (atomicInteger.get() == 0) { System.out.println("test over"); break; } if (cyclicBarrier.isBroken()) { System.out.println("屏障中断退出"); break; } } } } 复制代码
Thread-A - 到达屏障前 Thread-B - 到达屏障前 屏障中断退出 Thread-A - 等待中断 Thread-B - 屏障被破坏 Thread-0 - 到达屏障前 Thread-1 - 到达屏障前 Thread-2 - 到达屏障前 Thread-2 - 到达屏障后 Thread-0 - 到达屏障后 Thread-1 - 到达屏障后 test over 复制代码
从 oneAwaitInterrupted 方法执行结果可以看出,当一个线程 A 执行中断时,另外一个线程 B 会抛出 BrokenBarrierException
// 可以指定拦截线程个数 public CyclicBarrier(int parties) { this(parties, null); } // 指定拦截线程个数和所有线程到达屏障处后执行的动作 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } 复制代码
- barrier : 屏障
- parties : 为屏障拦截的线程数
- tripped : 跳闸,可以理解为打开屏障
- generation.broken : 屏障是否破损,当屏障被打开或被重置的时候会改变值
await 说明线程到达屏障
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } 复制代码
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 获取排他锁 lock.lock(); try { final Generation g = generation; // 屏障被破坏则抛异常 if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { // 线程中断 则退出屏障 breakBarrier(); throw new InterruptedException(); } // 到达屏障的计数减一 int index = --count; if (index == 0) { // tripped // index == 0, 说明指定 count 的线程均到达屏障 // 此时可以打开屏障 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) // 若指定了 barrierCommand 则执行 command.run(); ranAction = true; // 唤醒阻塞在屏障的线程并重置 generation nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) // 若未指定阻塞在屏障处的等待时间,则一直等待;直至最后一个线程到达屏障处的时候被唤醒 trip.await(); else if (nanos > 0L) // 若指定了阻塞在屏障处的等待时间,则在指定时间到达时会返回 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { // 若等待过程中,线程发生了中断,则退出屏障 breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 屏障被破坏 则抛出异常 if (g.broken) throw new BrokenBarrierException(); if (g != generation) // g != generation 说明所有线程均到达屏障处 可直接返回 // 因为所有线程到达屏障处的时候,会重置 generation // 参考 nextGeneration return index; if (timed && nanos <= 0L) { // 说明指定时间内,还有线程未到达屏障处,也就是等待超时 // 退出屏障 breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } 复制代码
private void nextGeneration() { // signal completion of last generation // 唤醒阻塞在等待队列的线程 trip.signalAll(); // set up next generation // 重置 count count = parties; // 重置 generation generation = new Generation(); } 复制代码
private void breakBarrier() { // broken 设置为 true generation.broken = true; // 重置 count count = parties; // 唤醒等待队列的线程 trip.signalAll(); } 复制代码
如下图为 CyclicBarrier 实现效果图:
public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } 复制代码
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { // 唤醒阻塞的线程 breakBarrier(); // break the current generation // 重新设置 generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } 复制代码
public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { // 拦截线程数 - 未到达屏障数 return parties - count; } finally { lock.unlock(); } } 复制代码
CyclicBarrier 和 CountDownLatch 功能类似,不同之处在于 CyclicBarrier 支持重复利用,而 CountDownLatch 计数只能使用一次。
