内容简介:转载请注明出处:http://zhongmingmao.me/2019/05/24/java-concurrent-worker-thread/
- Worker Thread模式可以类比现实世界里车间的工作模式,Worker Thread对应车间里的工人(人数确定)
- 用 阻塞队列 做任务池,然后创建 固定数量的线程 消费阻塞队列中的任务 – 这就是 Java 中的 线程池 方案
echo服务
private ExecutorService pool = Executors.newFixedThreadPool(500); public void handle() throws IOException { // 处理请求 try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) { while (true) { // 接收请求 SocketChannel sc = ssc.accept(); // 将请求处理任务提交给线程池 pool.execute(() -> { try { // 读Socket ByteBuffer rb = ByteBuffer.allocateDirect(1024); sc.read(rb); TimeUnit.SECONDS.sleep(1); // 写Socket ByteBuffer wb = (ByteBuffer) rb.flip(); sc.write(wb); sc.close(); } catch (IOException | InterruptedException ignored) { } }); } } finally { pool.shutdown(); } }
正确地创建线程池
- Java线程池既能避免无限制地 创建线程 导致OOM,也能避免无限制地 接收任务 导致OOM( 有界队列 )
- 当请求量大于有界队列的容量时,应该合理地拒绝请求,在创建线程池时,应该清晰地指明 拒绝策略
- 为了便于调试和诊断问题,在实际工作中应该给线程赋予一个 业务相关的命名
private ExecutorService pool = new ThreadPoolExecutor(50, 500, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000), // 有界队列 runnable -> new Thread(runnable, "echo-" + runnable.hashCode()), // ThreadFactory new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
避免线程死锁
- 如果提交到 相同线程池 的任务不是相互独立的,而是有 依赖 关系的,有可能会导致线程 死锁
- 通用解决方案: 为不同的任务创建不同的线程池 ,提交到 相同线程池 中的任务一定是 相互独立 的
- 下图中第一阶段的任务会等待第二阶段的子任务完成
// L1、L2阶段共用线程池 ExecutorService pool = Executors.newFixedThreadPool(2); CountDownLatch l1 = new CountDownLatch(2); for (int i = 0; i < 2; i++) { System.out.println("L1"); pool.execute(() -> { CountDownLatch l2 = new CountDownLatch(2); for (int j = 0; j < 2; j++) { pool.execute(() -> { System.out.println("L2"); l2.countDown(); }); } try { // 线程池中的2个线程都阻塞在l2.await(),没有多余线程去执行L2阶段的任务(在线程池的任务队列中等待) l2.await(); // line 28 } catch (InterruptedException ignored) { } l1.countDown(); }); } l1.await(); // 输出 // L1 // L1
jstack
// 阻塞在l2.await() "pool-1-thread-2" #11 prio=5 os_prio=31 tid=0x00007f934e8f5000 nid=0x4303 waiting on condition [0x000070000792d000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000079609bd58> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at time.geek.worker.thread.DeadLock.lambda$deadLockTest$1(DeadLock.java:28) at time.geek.worker.thread.DeadLock$$Lambda$1/1221555852.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) // 阻塞在l2.await() "pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007f9350142800 nid=0x3c03 waiting on condition [0x000070000782a000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000795ff56a8> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at time.geek.worker.thread.DeadLock.lambda$deadLockTest$1(DeadLock.java:28) at time.geek.worker.thread.DeadLock$$Lambda$1/1221555852.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
对比Thread-Per-Message模式
- Thread-Per-Message模式:主线程直接创建子线程,主子线程之间可以直接通信
- Worker Thread模式:主线程提交任务到线程池,但主线程并不关心任务被哪个线程执行
- 能够避免线程频繁创建、销毁的问题,并且能够限制线程的最大数量
- Java利用Worker Thread模式来实现线程池
转载请注明出处:http://zhongmingmao.me/2019/05/24/java-concurrent-worker-thread/
访问原文「 Java并发 -- Worker Thread模式 」获取最佳阅读体验并参与讨论
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- [Java并发-17-并发设计模式] Immutability模式:如何利用不变性解决并发问题?
- [Java并发-18-并发设计模式] COW模式:Copy-on-Write模式的应用领域
- Go 并发模式
- go语言-常见并发模式
- 17Go 语言——几个并发模式
- Go 设计模式实战之并发组件
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。