戳蓝字「TopCoder 」关注我们哦!
Java并发工具类主要有CyclicBarrier、CountDownLatch、Semaphore和Exchanger,日常开发中经常使用的是CountDownLatch和Semaphore。下面就简单分析下这几个并发工具类:
CyclicBarrier 内存屏障
CyclicBarrier底层借助于一个count计数器和Lock/Condition实现内存内存屏障功能,在对count--时必须先获取到lock,如果count不为0,则调用condition.wait进行阻塞操作;直到当count为0时,执行barrierCommand(如果配置的话,执行barrierCommand的线程是刚好将count减到0的那个线程),然后调用condition.signalAll唤醒所有等待的线程。
CyclicBarrier可用于多线程同步、多线程计算最后合并计算结果的场景,比如分片计算最后使用CyclicBarrier统计最后的结果等。
CyclicBarrier使用示例如下:
public static void main(String[] args) throws Exception { CyclicBarrier barrier = new CyclicBarrier(2, () -> System.out.println(Thread.currentThread().getName() + ": all is ok")); Runnable task = () -> { try { System.out.println(Thread.currentThread().getName() + ": start wait"); barrier.await(); System.out.println(Thread.currentThread().getName() + ": start ok"); } catch (Exception e) { e.printStackTrace(); } }; Thread t1 = new Thread(task, "thread1"); Thread t2 = new Thread(task, "thread2"); t2.start(); t1.start(); t1.join(); t2.join(); }
CountDownLatch 计数器
CountDownLatch允许一个或多个线程等待其他线程完成操作。CountDownLatch底层借助于AQS来实现功能,初始化一个CountDownLatch(n)时,相当于创建了一个state为n的AQS,当调用countDown()时会对AQS进行减一操作,如果state为0,则会对阻塞队列中所有线程进行唤醒操作。
CountDownLatch计数器必须大于等于0,等于0的时候调用await方法时不会阻塞当前线程,注意CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数的值。一个线程调用coundDown方法happen-before,另一个线程调用await方法。
public static void main(String[] args) throws Exception { CountDownLatch downLatch = new CountDownLatch(2); Runnable task = () -> { try { System.out.println(Thread.currentThread().getName() + ": start countDown"); downLatch.countDown(); System.out.println(Thread.currentThread().getName() + ": start ok"); } catch (Exception e) { e.printStackTrace(); } }; Thread t1 = new Thread(task, "thread1"); Thread t2 = new Thread(task, "thread2"); t1.start(); t2.start(); downLatch.await(); System.out.println("main wait ok"); t1.join(); t2.join(); }
Semaphore信号量
Semaphore用来控制同时访问特定资源的线程数量,它通过协调各个线程,保证合理的使用公共资源。Semaphore可用作流量控制,特别是公共资源有限的应用场景,比如数据库连接。
Semaphore底层也是基于AQS,初始化Semaphore(n)相当于初始化一个state为n的AQS,调用acquire()时会对进行state - 1操作,如果结果大于0则CAS设置state为state-1,相当于获取到了信号量,否则进行阻塞操作(调用tryAcquire则不会阻塞线程)。调用release会对state进行++操作。
public static void main(String[] args) { Semaphore semaphore = new Semaphore(2); ExecutorService executor = Executors.newFixedThreadPool(10); Runnable task = () -> { try { System.out.println(Thread.currentThread().getName() + " acquire before"); semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " acquire ok"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }; executor.execute(task); executor.execute(task); executor.execute(task); executor.execute(task); }
Exchanger 线程间交换数据
Exchanger是一个用户线程间交换数据的工具类,它提供了一个同步点,在这个同步点上,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange方法,他会一直等待第二个线程也执行exchange方法,当两个线程都达到同步点时,这两个线程交换数据,将本线程产生的数据传递给对方。
public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); Runnable task = () -> { try { String result = exchanger.exchange(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getName() + ": " + result); } catch (InterruptedException e) { e.printStackTrace(); } }; ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(task); executor.execute(task); }
Exchanger实现分析
Exchanger算法的核心是通过一个可交换数据的slot,以及一个可以带有数据item的参与者,slot是Node类型,Node定义如下:
@sun.misc.Contended static final class Node { int index; // Arena index int bound; // Last recorded value of Exchanger.bound int collides; // Number of CAS failures at current bound int hash; // Pseudo-random for spins Object item; // This thread's current item volatile Object match; // Item provided by releasing thread volatile Thread parked; // Set to this thread when parked, else null } static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); } }
每一个参与者都带有一个Participant,当调用exchange时,如果slot为空,则将自己携带的数据CAS设置到slot上,然后park自己;如果slot不为空,则表示已经有线程在slot里设置了数据,则读取Node.item字段,并将自己携带的数据设置到Node.match字段,然后唤醒之前设置数据的线程(之前阻塞的线程在唤醒后读取Node.match字段返回),然后返回数据即可。
以上所述就是小编给大家介绍的《Java并发工具那些事儿》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Java 并发编程那些事儿(八):死锁
- Java 并发编程那些事儿(一):任务与线程
- Java 并发编程那些事儿(七):取消及关闭
- JDK源码那些事儿之并发ConcurrentHashMap上篇
- Java 并发编程那些事儿(五):闭锁、栅栏、信号量
- Java 并发编程那些事儿(六):Executor 框架及线程池
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Web 2.0 Architectures
Duane Nickull、Dion Hinchcliffe、James Governor / O'Reilly / 2009 / USD 34.99
The "Web 2.0" phenomena has become more pervasive than ever before. It is impacting the very fabric of our society and presents opportunities for those with knowledge. The individuals who understand t......一起来看看 《Web 2.0 Architectures》 这本书的介绍吧!