CyclicBarrier - 同步屏障实现分析

栏目: Java · 发布时间: 6年前

内容简介:CyclicBarrier 是可循环使用的屏障,主要功能是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开;所有被屏障拦截的线程才会继续执行。从 oneAwaitInterrupted 方法执行结果可以看出,当一个线程 A 执行中断时,另外一个线程 B 会抛出 BrokenBarrierException简单的理解就是,当线程都到达屏障的时候,会打开屏障。

CyclicBarrier 是可循环使用的屏障,主要功能是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开;所有被屏障拦截的线程才会继续执行。

使用示例

public class CyclicBarrierTest {

    // 线程个数
    private int parties = 3;

    private AtomicInteger atomicInteger = new AtomicInteger(parties);

    private CyclicBarrier cyclicBarrier;

    class Protector implements Runnable {

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " - 到达屏障前");

                TimeUnit.SECONDS.sleep(2);

                cyclicBarrier.await();

                atomicInteger.decrementAndGet();

                System.out.println(Thread.currentThread().getName() + " - 到达屏障后");

            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " - 等待中断");
            } catch (BrokenBarrierException e) {
                System.out.println(Thread.currentThread().getName() + " - 屏障被破坏");
            }
        }
    }


    @Before
    public void init() {
        cyclicBarrier = new CyclicBarrier(parties);
    }

    @Test
    public void allAwait() {
        for (int i = 0; i < parties; i++) {
            new Thread(new Protector(), "Thread-" + i).start();
        }

        while (true) {
            if (atomicInteger.get() == 0) {
                // 所有线程到达屏障后退出结束
                System.out.println("test over");
                break;
            }
        }
    }

    @Test
    public void oneAwaitInterrupted() throws InterruptedException {
        Thread threadA = new Thread(new Protector(), "Thread-A");
        Thread threadB = new Thread(new Protector(), "Thread-B");

        threadA.start();
        threadB.start();
        // 等待 3 秒,避免是 time sleep 触发中断异常
        TimeUnit.SECONDS.sleep(3);

        threadA.interrupt();

        while (true) {
            if (atomicInteger.get() == 0) {
                System.out.println("test over");
                break;
            }
            if (cyclicBarrier.isBroken()) {
                System.out.println("屏障中断退出");
                break;
            }
        }
    }
}
复制代码
Thread-A - 到达屏障前
Thread-B - 到达屏障前
屏障中断退出
Thread-A - 等待中断
Thread-B - 屏障被破坏

Thread-0 - 到达屏障前
Thread-1 - 到达屏障前
Thread-2 - 到达屏障前
Thread-2 - 到达屏障后
Thread-0 - 到达屏障后
Thread-1 - 到达屏障后
test over
复制代码

从 oneAwaitInterrupted 方法执行结果可以看出,当一个线程 A 执行中断时,另外一个线程 B 会抛出 BrokenBarrierException

构造

// 可以指定拦截线程个数
public CyclicBarrier(int parties) {
    this(parties, null);
}

// 指定拦截线程个数和所有线程到达屏障处后执行的动作
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
复制代码

实现

概念

  • barrier : 屏障
  • parties : 为屏障拦截的线程数
  • tripped : 跳闸,可以理解为打开屏障
  • generation.broken : 屏障是否破损,当屏障被打开或被重置的时候会改变值

简单的理解就是,当线程都到达屏障的时候,会打开屏障。

await()

await 说明线程到达屏障

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
复制代码
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
    final ReentrantLock lock = this.lock;
    // 获取排他锁
    lock.lock();
    try {
        final Generation g = generation;
        // 屏障被破坏则抛异常
        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
        	// 线程中断 则退出屏障
            breakBarrier();
            throw new InterruptedException();
        }

        // 到达屏障的计数减一
        int index = --count;      
        if (index == 0) {  // tripped
        	// index == 0, 说明指定 count 的线程均到达屏障
        	// 此时可以打开屏障
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                	// 若指定了 barrierCommand 则执行
                    command.run();
                ranAction = true;
                // 唤醒阻塞在屏障的线程并重置 generation
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                	// 若未指定阻塞在屏障处的等待时间,则一直等待;直至最后一个线程到达屏障处的时候被唤醒
                    trip.await();
                else if (nanos > 0L)
                	// 若指定了阻塞在屏障处的等待时间,则在指定时间到达时会返回
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                	// 若等待过程中,线程发生了中断,则退出屏障
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            // 屏障被破坏 则抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
            	// g != generation 说明所有线程均到达屏障处 可直接返回
            	// 因为所有线程到达屏障处的时候,会重置 generation
            	// 参考 nextGeneration
                return index;

            if (timed && nanos <= 0L) {
            	// 说明指定时间内,还有线程未到达屏障处,也就是等待超时
            	// 退出屏障
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
复制代码
private void nextGeneration() {
    // signal completion of last generation
    // 唤醒阻塞在等待队列的线程
    trip.signalAll();
    // set up next generation
    // 重置 count
    count = parties;
    // 重置 generation
    generation = new Generation();
}
复制代码
private void breakBarrier() {
	// broken 设置为 true
    generation.broken = true;
    // 重置 count
    count = parties;
    // 唤醒等待队列的线程
    trip.signalAll();
}
复制代码

如下图为 CyclicBarrier 实现效果图:

CyclicBarrier - 同步屏障实现分析

isBroken()

返回屏障是否被破坏,也是是否被中断

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}
复制代码

reset()

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 唤醒阻塞的线程
        breakBarrier();   // break the current generation
        // 重新设置 generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}
复制代码

getNumberWaiting

获取阻塞在屏障处的线程数

public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 拦截线程数 - 未到达屏障数
        return parties - count;
    } finally {
        lock.unlock();
    }
}
复制代码

小结

CyclicBarrier 和 CountDownLatch 功能类似,不同之处在于 CyclicBarrier 支持重复利用,而 CountDownLatch 计数只能使用一次。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

失业的程序员

失业的程序员

沈逸 / 2014-5-1 / 39.00元

这是一个程序员从失业到自行创业的奋斗历程,虽然囧事连连、过程曲折,却充满了趣味。本书以作者的真实创业经历为主线,文字幽默诙谐,情节生动真实,包括了招聘、团队管理和用户公关,以及技术架构设计、核心代码编写、商务谈判、项目运作等场景经验。 从初期的创业伙伴、领路人,到商业竞争对手,各种复杂的关系在各个关键时刻却都发生了意想不到的逆转。在历经千辛万苦,眼看快要成功时,主人公却几乎再次失业。 ......一起来看看 《失业的程序员》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具