CyclicBarrier - 同步屏障实现分析

栏目: Java · 发布时间: 7年前

内容简介:CyclicBarrier 是可循环使用的屏障,主要功能是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开;所有被屏障拦截的线程才会继续执行。从 oneAwaitInterrupted 方法执行结果可以看出,当一个线程 A 执行中断时,另外一个线程 B 会抛出 BrokenBarrierException简单的理解就是,当线程都到达屏障的时候,会打开屏障。

CyclicBarrier 是可循环使用的屏障,主要功能是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开;所有被屏障拦截的线程才会继续执行。

使用示例

public class CyclicBarrierTest {

    // 线程个数
    private int parties = 3;

    private AtomicInteger atomicInteger = new AtomicInteger(parties);

    private CyclicBarrier cyclicBarrier;

    class Protector implements Runnable {

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " - 到达屏障前");

                TimeUnit.SECONDS.sleep(2);

                cyclicBarrier.await();

                atomicInteger.decrementAndGet();

                System.out.println(Thread.currentThread().getName() + " - 到达屏障后");

            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " - 等待中断");
            } catch (BrokenBarrierException e) {
                System.out.println(Thread.currentThread().getName() + " - 屏障被破坏");
            }
        }
    }


    @Before
    public void init() {
        cyclicBarrier = new CyclicBarrier(parties);
    }

    @Test
    public void allAwait() {
        for (int i = 0; i < parties; i++) {
            new Thread(new Protector(), "Thread-" + i).start();
        }

        while (true) {
            if (atomicInteger.get() == 0) {
                // 所有线程到达屏障后退出结束
                System.out.println("test over");
                break;
            }
        }
    }

    @Test
    public void oneAwaitInterrupted() throws InterruptedException {
        Thread threadA = new Thread(new Protector(), "Thread-A");
        Thread threadB = new Thread(new Protector(), "Thread-B");

        threadA.start();
        threadB.start();
        // 等待 3 秒,避免是 time sleep 触发中断异常
        TimeUnit.SECONDS.sleep(3);

        threadA.interrupt();

        while (true) {
            if (atomicInteger.get() == 0) {
                System.out.println("test over");
                break;
            }
            if (cyclicBarrier.isBroken()) {
                System.out.println("屏障中断退出");
                break;
            }
        }
    }
}
复制代码
Thread-A - 到达屏障前
Thread-B - 到达屏障前
屏障中断退出
Thread-A - 等待中断
Thread-B - 屏障被破坏

Thread-0 - 到达屏障前
Thread-1 - 到达屏障前
Thread-2 - 到达屏障前
Thread-2 - 到达屏障后
Thread-0 - 到达屏障后
Thread-1 - 到达屏障后
test over
复制代码

从 oneAwaitInterrupted 方法执行结果可以看出,当一个线程 A 执行中断时,另外一个线程 B 会抛出 BrokenBarrierException

构造

// 可以指定拦截线程个数
public CyclicBarrier(int parties) {
    this(parties, null);
}

// 指定拦截线程个数和所有线程到达屏障处后执行的动作
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
复制代码

实现

概念

  • barrier : 屏障
  • parties : 为屏障拦截的线程数
  • tripped : 跳闸,可以理解为打开屏障
  • generation.broken : 屏障是否破损,当屏障被打开或被重置的时候会改变值

简单的理解就是,当线程都到达屏障的时候,会打开屏障。

await()

await 说明线程到达屏障

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
复制代码
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
    final ReentrantLock lock = this.lock;
    // 获取排他锁
    lock.lock();
    try {
        final Generation g = generation;
        // 屏障被破坏则抛异常
        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
        	// 线程中断 则退出屏障
            breakBarrier();
            throw new InterruptedException();
        }

        // 到达屏障的计数减一
        int index = --count;      
        if (index == 0) {  // tripped
        	// index == 0, 说明指定 count 的线程均到达屏障
        	// 此时可以打开屏障
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                	// 若指定了 barrierCommand 则执行
                    command.run();
                ranAction = true;
                // 唤醒阻塞在屏障的线程并重置 generation
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                	// 若未指定阻塞在屏障处的等待时间,则一直等待;直至最后一个线程到达屏障处的时候被唤醒
                    trip.await();
                else if (nanos > 0L)
                	// 若指定了阻塞在屏障处的等待时间,则在指定时间到达时会返回
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                	// 若等待过程中,线程发生了中断,则退出屏障
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            // 屏障被破坏 则抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
            	// g != generation 说明所有线程均到达屏障处 可直接返回
            	// 因为所有线程到达屏障处的时候,会重置 generation
            	// 参考 nextGeneration
                return index;

            if (timed && nanos <= 0L) {
            	// 说明指定时间内,还有线程未到达屏障处,也就是等待超时
            	// 退出屏障
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
复制代码
private void nextGeneration() {
    // signal completion of last generation
    // 唤醒阻塞在等待队列的线程
    trip.signalAll();
    // set up next generation
    // 重置 count
    count = parties;
    // 重置 generation
    generation = new Generation();
}
复制代码
private void breakBarrier() {
	// broken 设置为 true
    generation.broken = true;
    // 重置 count
    count = parties;
    // 唤醒等待队列的线程
    trip.signalAll();
}
复制代码

如下图为 CyclicBarrier 实现效果图:

CyclicBarrier - 同步屏障实现分析

isBroken()

返回屏障是否被破坏,也是是否被中断

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}
复制代码

reset()

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 唤醒阻塞的线程
        breakBarrier();   // break the current generation
        // 重新设置 generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}
复制代码

getNumberWaiting

获取阻塞在屏障处的线程数

public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 拦截线程数 - 未到达屏障数
        return parties - count;
    } finally {
        lock.unlock();
    }
}
复制代码

小结

CyclicBarrier 和 CountDownLatch 功能类似,不同之处在于 CyclicBarrier 支持重复利用,而 CountDownLatch 计数只能使用一次。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Learn Python the Hard Way

Learn Python the Hard Way

Zed A. Shaw / Addison-Wesley Professional / 2013-10-11 / USD 39.99

Master Python and become a programmer-even if you never thought you could! This breakthrough book and CD can help practically anyone get started in programming. It's called "The Hard Way," but it's re......一起来看看 《Learn Python the Hard Way》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具