内容简介:CyclicBarrier 是可循环使用的屏障,主要功能是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开;所有被屏障拦截的线程才会继续执行。从 oneAwaitInterrupted 方法执行结果可以看出,当一个线程 A 执行中断时,另外一个线程 B 会抛出 BrokenBarrierException简单的理解就是,当线程都到达屏障的时候,会打开屏障。
CyclicBarrier 是可循环使用的屏障,主要功能是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开;所有被屏障拦截的线程才会继续执行。
使用示例
public class CyclicBarrierTest {
// 线程个数
private int parties = 3;
private AtomicInteger atomicInteger = new AtomicInteger(parties);
private CyclicBarrier cyclicBarrier;
class Protector implements Runnable {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " - 到达屏障前");
TimeUnit.SECONDS.sleep(2);
cyclicBarrier.await();
atomicInteger.decrementAndGet();
System.out.println(Thread.currentThread().getName() + " - 到达屏障后");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " - 等待中断");
} catch (BrokenBarrierException e) {
System.out.println(Thread.currentThread().getName() + " - 屏障被破坏");
}
}
}
@Before
public void init() {
cyclicBarrier = new CyclicBarrier(parties);
}
@Test
public void allAwait() {
for (int i = 0; i < parties; i++) {
new Thread(new Protector(), "Thread-" + i).start();
}
while (true) {
if (atomicInteger.get() == 0) {
// 所有线程到达屏障后退出结束
System.out.println("test over");
break;
}
}
}
@Test
public void oneAwaitInterrupted() throws InterruptedException {
Thread threadA = new Thread(new Protector(), "Thread-A");
Thread threadB = new Thread(new Protector(), "Thread-B");
threadA.start();
threadB.start();
// 等待 3 秒,避免是 time sleep 触发中断异常
TimeUnit.SECONDS.sleep(3);
threadA.interrupt();
while (true) {
if (atomicInteger.get() == 0) {
System.out.println("test over");
break;
}
if (cyclicBarrier.isBroken()) {
System.out.println("屏障中断退出");
break;
}
}
}
}
复制代码
Thread-A - 到达屏障前 Thread-B - 到达屏障前 屏障中断退出 Thread-A - 等待中断 Thread-B - 屏障被破坏 Thread-0 - 到达屏障前 Thread-1 - 到达屏障前 Thread-2 - 到达屏障前 Thread-2 - 到达屏障后 Thread-0 - 到达屏障后 Thread-1 - 到达屏障后 test over 复制代码
从 oneAwaitInterrupted 方法执行结果可以看出,当一个线程 A 执行中断时,另外一个线程 B 会抛出 BrokenBarrierException
构造
// 可以指定拦截线程个数
public CyclicBarrier(int parties) {
this(parties, null);
}
// 指定拦截线程个数和所有线程到达屏障处后执行的动作
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
复制代码
实现
概念
- barrier : 屏障
- parties : 为屏障拦截的线程数
- tripped : 跳闸,可以理解为打开屏障
- generation.broken : 屏障是否破损,当屏障被打开或被重置的时候会改变值
简单的理解就是,当线程都到达屏障的时候,会打开屏障。
await()
await 说明线程到达屏障
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
复制代码
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 获取排他锁
lock.lock();
try {
final Generation g = generation;
// 屏障被破坏则抛异常
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
// 线程中断 则退出屏障
breakBarrier();
throw new InterruptedException();
}
// 到达屏障的计数减一
int index = --count;
if (index == 0) { // tripped
// index == 0, 说明指定 count 的线程均到达屏障
// 此时可以打开屏障
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 若指定了 barrierCommand 则执行
command.run();
ranAction = true;
// 唤醒阻塞在屏障的线程并重置 generation
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
// 若未指定阻塞在屏障处的等待时间,则一直等待;直至最后一个线程到达屏障处的时候被唤醒
trip.await();
else if (nanos > 0L)
// 若指定了阻塞在屏障处的等待时间,则在指定时间到达时会返回
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
// 若等待过程中,线程发生了中断,则退出屏障
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 屏障被破坏 则抛出异常
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
// g != generation 说明所有线程均到达屏障处 可直接返回
// 因为所有线程到达屏障处的时候,会重置 generation
// 参考 nextGeneration
return index;
if (timed && nanos <= 0L) {
// 说明指定时间内,还有线程未到达屏障处,也就是等待超时
// 退出屏障
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
复制代码
private void nextGeneration() {
// signal completion of last generation
// 唤醒阻塞在等待队列的线程
trip.signalAll();
// set up next generation
// 重置 count
count = parties;
// 重置 generation
generation = new Generation();
}
复制代码
private void breakBarrier() {
// broken 设置为 true
generation.broken = true;
// 重置 count
count = parties;
// 唤醒等待队列的线程
trip.signalAll();
}
复制代码
如下图为 CyclicBarrier 实现效果图:
isBroken()
返回屏障是否被破坏,也是是否被中断
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
复制代码
reset()
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 唤醒阻塞的线程
breakBarrier(); // break the current generation
// 重新设置 generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
复制代码
getNumberWaiting
获取阻塞在屏障处的线程数
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 拦截线程数 - 未到达屏障数
return parties - count;
} finally {
lock.unlock();
}
}
复制代码
小结
CyclicBarrier 和 CountDownLatch 功能类似,不同之处在于 CyclicBarrier 支持重复利用,而 CountDownLatch 计数只能使用一次。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Java并发编程的艺术(十一)——倒计时器、同步屏障、信号量
- 垃圾回收之写屏障
- Java 并发编程(三):MESI、内存屏障
- Java 并发编程(三):MESI、内存屏障
- 刘睿民:数据库是保障数据安全的有力屏障
- Golang三色标记、混合写屏障GC模式图文全分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Learn Python the Hard Way
Zed A. Shaw / Addison-Wesley Professional / 2013-10-11 / USD 39.99
Master Python and become a programmer-even if you never thought you could! This breakthrough book and CD can help practically anyone get started in programming. It's called "The Hard Way," but it's re......一起来看看 《Learn Python the Hard Way》 这本书的介绍吧!