Java并发工具那些事儿

栏目: IT技术 · 发布时间: 5年前

Java并发 <a href='https://www.codercto.com/tool.html'>工具</a> 那些事儿 戳蓝字「TopCoder 」关注我们哦!

Java并发工具那些事儿

Java并发工具类主要有CyclicBarrier、CountDownLatch、Semaphore和Exchanger,日常开发中经常使用的是CountDownLatch和Semaphore。下面就简单分析下这几个并发工具类:

CyclicBarrier 内存屏障

CyclicBarrier底层借助于一个count计数器和Lock/Condition实现内存内存屏障功能,在对count--时必须先获取到lock,如果count不为0,则调用condition.wait进行阻塞操作;直到当count为0时,执行barrierCommand(如果配置的话,执行barrierCommand的线程是刚好将count减到0的那个线程),然后调用condition.signalAll唤醒所有等待的线程。

CyclicBarrier可用于多线程同步、多线程计算最后合并计算结果的场景,比如分片计算最后使用CyclicBarrier统计最后的结果等。

CyclicBarrier使用示例如下:

public static void main(String[] args) throws Exception {
    CyclicBarrier barrier = new CyclicBarrier(2, 
            () -> System.out.println(Thread.currentThread().getName() + ": all is ok"));
    Runnable task = () -> {
        try {
            System.out.println(Thread.currentThread().getName() + ": start wait");
            barrier.await();
            System.out.println(Thread.currentThread().getName() + ": start ok");
        } catch (Exception e) {
            e.printStackTrace();
        }
    };

    Thread t1 = new Thread(task, "thread1");
    Thread t2 = new Thread(task, "thread2");
    t2.start();
    t1.start();
    t1.join();
    t2.join();
}

CountDownLatch 计数器

CountDownLatch允许一个或多个线程等待其他线程完成操作。CountDownLatch底层借助于AQS来实现功能,初始化一个CountDownLatch(n)时,相当于创建了一个state为n的AQS,当调用countDown()时会对AQS进行减一操作,如果state为0,则会对阻塞队列中所有线程进行唤醒操作。

CountDownLatch计数器必须大于等于0,等于0的时候调用await方法时不会阻塞当前线程,注意CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数的值。一个线程调用coundDown方法happen-before,另一个线程调用await方法。

public static void main(String[] args) throws Exception {
    CountDownLatch downLatch = new CountDownLatch(2);
    Runnable task = () -> {
        try {
            System.out.println(Thread.currentThread().getName() + ": start countDown");
            downLatch.countDown();
            System.out.println(Thread.currentThread().getName() + ": start ok");
        } catch (Exception e) {
            e.printStackTrace();
        }
    };

    Thread t1 = new Thread(task, "thread1");
    Thread t2 = new Thread(task, "thread2");
    t1.start();
    t2.start();

    downLatch.await();
    System.out.println("main wait ok");

    t1.join();
    t2.join();
}

Semaphore信号量

Semaphore用来控制同时访问特定资源的线程数量,它通过协调各个线程,保证合理的使用公共资源。Semaphore可用作流量控制,特别是公共资源有限的应用场景,比如数据库连接。

Semaphore底层也是基于AQS,初始化Semaphore(n)相当于初始化一个state为n的AQS,调用acquire()时会对进行state - 1操作,如果结果大于0则CAS设置state为state-1,相当于获取到了信号量,否则进行阻塞操作(调用tryAcquire则不会阻塞线程)。调用release会对state进行++操作。

public static void main(String[] args) {
    Semaphore semaphore = new Semaphore(2);
    ExecutorService executor = Executors.newFixedThreadPool(10);

    Runnable task = () -> {
        try {
            System.out.println(Thread.currentThread().getName() + " acquire before");
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + " acquire ok");
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    executor.execute(task);
    executor.execute(task);
    executor.execute(task);
    executor.execute(task);
}

Exchanger 线程间交换数据

Exchanger是一个用户线程间交换数据的工具类,它提供了一个同步点,在这个同步点上,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange方法,他会一直等待第二个线程也执行exchange方法,当两个线程都达到同步点时,这两个线程交换数据,将本线程产生的数据传递给对方。

public static void main(String[] args) {
    Exchanger<String> exchanger = new Exchanger<>();
    Runnable task = () -> {
        try {
            String result = exchanger.exchange(Thread.currentThread().getName());
            System.out.println(Thread.currentThread().getName() + ": " + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.execute(task);
    executor.execute(task);
}

Exchanger实现分析

Exchanger算法的核心是通过一个可交换数据的slot,以及一个可以带有数据item的参与者,slot是Node类型,Node定义如下:

@sun.misc.Contended static final class Node {
    int index;              // Arena index
    int bound;              // Last recorded value of Exchanger.bound
    int collides;           // Number of CAS failures at current bound
    int hash;               // Pseudo-random for spins
    Object item;            // This thread's current item
    volatile Object match;  // Item provided by releasing thread
    volatile Thread parked; // Set to this thread when parked, else null
}

static final class Participant extends ThreadLocal<Node> {
    public Node initialValue() { return new Node(); }
}

每一个参与者都带有一个Participant,当调用exchange时,如果slot为空,则将自己携带的数据CAS设置到slot上,然后park自己;如果slot不为空,则表示已经有线程在slot里设置了数据,则读取Node.item字段,并将自己携带的数据设置到Node.match字段,然后唤醒之前设置数据的线程(之前阻塞的线程在唤醒后读取Node.match字段返回),然后返回数据即可。


以上所述就是小编给大家介绍的《Java并发工具那些事儿》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

阿里巴巴正传:我们与马云的“一步之遥”

阿里巴巴正传:我们与马云的“一步之遥”

方兴东、刘伟 / 江苏凤凰文艺出版社 / 2015-1 / 45.00

十几年来,方兴东与马云每年一次,老友聚首,开怀畅谈,阿里上市前,作者再次与马云深度对话,阿里上市前的布局,深入探讨了一系列人们关心的话题。 本书忠实记录了阿里壮大、马云封圣的历史。作者通过细致梳理和盘点,对阿里巴巴的15年成长史进行了忠实回顾。从海博翻译社到淘宝网,从淘宝商城到天猫,从支付宝到阿里云计算,从拉来软银的第一笔投资到纽交所上市,作者对其中涉及到的人物、细节都有生动展现;对于马云、......一起来看看 《阿里巴巴正传:我们与马云的“一步之遥”》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

SHA 加密
SHA 加密

SHA 加密工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具