内容简介:不知不觉就遇到了线程同步器问题,查了资料写下了总结日常中会有开启多个线程去并发执行任务,而join()方法不够灵活,现在JDK提供了
不知不觉就遇到了线程同步器问题,查了资料写下了总结
1. CountDownLatch
日常中会有开启多个线程去并发执行任务,而 主线程要等所有子线程执行完之后才能运行的需求 。之前我们是使用Thread.join方法来实现的,过程如下:
public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread( () -> { try { Thread.sleep(1000); System.out.println("t1 over"); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread t2 = new Thread( () -> { try { Thread.sleep(2000); System.out.println("t2 over"); } catch (InterruptedException e) { e.printStackTrace(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("mian over"); }
t1 over t2 over mian over
join()方法不够灵活,现在JDK提供了 CountDownLatch 这个类来实现所需功能
private static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { ExecutorService t = Executors.newCachedThreadPool(); Runnable r1 = () -> { try { System.out.println("r1 sleep"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }; Runnable r2 = () -> { try { System.out.println("r2 sleep"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }; t.submit(r1); t.submit(r2); System.out.println("main wait"); countDownLatch.await(); System.out.println("main over"); }
main wait r1 sleep r2 sleep main over
CountDownLatch流程:
- 新建CountDownLatch实例,传入计数器次数
- 主线程调用CountDownLatch.await()方法后会被阻塞
- 子线程中在某处调用CountDownLatch.countDown()方法可使内部计数器减1
- 当计数器变成0时,主线程的await()方法才会返回
CountDownLatch优点:
- 调用Thread.join()调用线程会被阻塞至子线程运行完毕,而CountDownLatch.countDown()可在线程运行中执行
- 使用线程池时是提交任务的,而没有接触到线程无法使用线程方法,那么countDown()可加在Runnable中执行
CountDownLatch原理:
内部维护了一个计数器,当计数器为0就放行,源码就不放了,熟悉AQS的同学想想就知道怎么回事
- 继承了AQS,其实就是用AQS的state来表示计数器
- await()方法内部有acquireSharedInterruptibly(),后者调用了重写tryaquireShared()其实就是判断计数器是否为0,不为0则阻塞进AQS队列
- countDown()方法内部有releaseShared(),后者调用了重写tryReleaseShared()计数器减一,若为0,则唤醒阻塞线程
2. CyclicBarrier
满足多个线程都到达同一个位置后才全部开始运行的需求。CountDownLatch是一次性使用的,计数器为0后再次调用会直接返回,此时升级版的CyclicBarrier来了,其一可以满足计数器重置功能,且二还可以让一组线程达到一个状态后再全部同时执行
场景要求:假设一个任务分为3个阶段,每个线程要串行地从低阶段执行到高阶段
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("一个阶段完成")); public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); Runnable r1 = () -> { try { System.out.println(Thread.currentThread() + "Step1"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + "Step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + "Step3"); } catch (Exception e) { e.printStackTrace(); } }; Runnable r2 = () -> { try { System.out.println(Thread.currentThread() + "Step1"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + "Step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + "Step3"); } catch (Exception e) { e.printStackTrace(); } }; service.submit(r1); service.submit(r2); service.shutdown(); }
Thread[pool-1-thread-1,5,main]Step1 Thread[pool-1-thread-2,5,main]Step1 一个阶段完成 Thread[pool-1-thread-1,5,main]Step2 Thread[pool-1-thread-2,5,main]Step2 一个阶段完成 Thread[pool-1-thread-1,5,main]Step3 Thread[pool-1-thread-2,5,main]Step3
CyclicBarrier的流程
- 和上面差不多就不一一解释了
- CyclicBarrier的构造方法中,第一个参数为计数器次数,第二个为阶段结束后要执行的方法
CyclicBarrier的原理
- 基于独占锁,底层是AQS实现,独占锁可以原子性改变计数器,以及条件队列阻塞线程来实现线程同步
- 内部有parties和count变量,实现重置功能
-
await()方法内调用dowait()方法
- 获取锁更新次数减一
- 没有为0,阻塞当前线程加入条件队列
- 为0执行屏蔽点任务,然后唤醒条件队列的全部线程
3. Semaphore
不同与前两者,Semaphore信号量内部计数器是递增的,在需要同步的地方调用acquire指定需要同步的个数即可
private static Semaphore semaphore = new Semaphore(0); public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); Runnable r1 = () -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + "over"); semaphore.release();; }; Runnable r2 = () -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + "over"); semaphore.release(); }; service.submit(r1); service.submit(r2); semaphore.acquire(2); System.out.println("All child thread over"); service.shutdown(); }
Semaphore的流程
- Semaphore的构造函数传参复制当前计数器的值
- 每个线程内部调用release()即计数器加1
- 主线程调用acquire()方法传参为2 ,会被阻塞至计数器到达2
Semaphore的原理
- 底层还是使用AQS,提供了公平与非公平,也是用state表示次数
-
acquire()方法获取一个信号量,并且state减一
- 若为0,直接返回
- 不为0当前线程会被加入AQS阻塞队列
- release()方法,把当前Semaphore的信号量加1,然后会选择一个信号量满足的线程进行激活
- 内部还实现了公平与非公平策略
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Java 多线程(二)—— 线程的同步
- java synchronize - 线程同步原理
- 多线程六 同步容器&并发容器
- C++ 线程同步的四种方式
- 【译】JVM 进行线程同步背后的原理
- Java 并发编程:多线程并发访问,同步控制
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
How to Build a Billion Dollar App
George Berkowski / Little, Brown Book Group / 2015-4-1 / USD 24.95
Apps have changed the way we communicate, shop, play, interact and travel and their phenomenal popularity has presented possibly the biggest business opportunity in history. In How to Build a Billi......一起来看看 《How to Build a Billion Dollar App》 这本书的介绍吧!