内容简介:转载请注明出处:http://zhongmingmao.me/2019/05/26/java-concurrent-producer-consumer/
- 生产者-消费者模式的核心是一个 任务队列
- 生产者线程生产任务,并将任务添加到任务队列中,消费者线程从任务队列中获取任务并执行
- 从 架构设计 的角度来看,生产者-消费者模式有一个很重要的优点: 解耦
- 生产者-消费者模式另一个重要的优点是 支持异步 ,并且能够 平衡 生产者和消费者的 速度差异 (任务队列)
支持批量执行
- 往数据库INSERT 1000条数据,有两种方案
- 第一种方案:用1000个线程并发执行,每个线程INSERT一条数据
- 第二种方案(更优):用1个线程,执行一个批量的SQL,一次性把1000条数据INSERT进去
- 将原来直接INSERT数据到数据库的线程作为生产者线程,而生产者线程只需将数据添加到任务队列
- 然后消费者线程负责将任务从任务队列中批量取出并批量执行
// 任务队列 private BlockingQueue<Task> queue = new LinkedBlockingQueue<>(2000); // 启动5个消费者线程,执行批量任务 public void start() { ExecutorService pool = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { pool.execute(() -> { try { while (true) { // 获取批量任务 List<Task> tasks = pollTasks(); // 执行批量任务 execTasks(tasks); } } catch (InterruptedException ignored) { } }); } } // 从任务队列中获取批量任务 private List<Task> pollTasks() throws InterruptedException { List<Task> tasks = new LinkedList<>(); // 阻塞式获取一个任务 // 首先采用阻塞式的方式,如果任务队列中没有任务,能够避免无谓的循环 Task task = queue.take(); while (task != null) { tasks.add(task); // 非阻塞式获取一个任务 task = queue.poll(); } return tasks; } // 批量执行任务 private void execTasks(List<Task> tasks) { }
支持分阶段提交
- 写文件如果同步刷盘性能会很慢,对于不是很重要的数据,往往采用 异步刷盘 的方式
- 异步刷盘的时机
- ERROR级别的日志需要立即刷盘
- 数据累积到500条需要立即刷盘
- 存在未刷盘数据,且5秒钟内未曾刷盘,需要立即刷盘
- 该日志组件的异步刷盘本质上是一种 分阶段提交
public class Logger { // 批量异步刷新的数量 private static final int FLUSH_BATCH_SIZE = 500; // 任务队列 private final BlockingQueue<LogMsg> queue = new LinkedBlockingQueue<>(); // 只需要一个线程写日志 private ExecutorService pool = Executors.newFixedThreadPool(1); // 启动写日志线程 public void start() throws IOException { File file = File.createTempFile("test", ".log"); FileWriter writer = new FileWriter(file); pool.execute(() -> { // 未刷盘日志数量 int curIdx = 0; long preFlushTime = System.currentTimeMillis(); while (true) { try { LogMsg logMsg = queue.poll(5, TimeUnit.SECONDS); // 写日志 if (logMsg != null) { writer.write(logMsg.toString()); ++curIdx; } // 如果不存在未刷盘数据,则无需刷盘 if (curIdx <= 0) { continue; } // 异步刷盘规则 if (logMsg != null && logMsg.getLevel() == LEVEL.ERROR || curIdx == FLUSH_BATCH_SIZE || System.currentTimeMillis() - preFlushTime > 5_000) { writer.flush(); curIdx = 0; preFlushTime = System.currentTimeMillis(); } } catch (InterruptedException | IOException ignored) { } finally { try { writer.flush(); writer.close(); } catch (IOException ignored) { } } } }); } private void info(@NonNull String msg) throws InterruptedException { queue.put(new LogMsg(LEVEL.INFO, msg)); } private void error(@NonNull String msg) throws InterruptedException { queue.put(new LogMsg(LEVEL.ERROR, msg)); } } @Data @AllArgsConstructor class LogMsg { private LEVEL level; private String msg; } enum LEVEL { INFO, ERROR }
转载请注明出处:http://zhongmingmao.me/2019/05/26/java-concurrent-producer-consumer/
访问原文「 Java并发 -- 生产者-消费者模式 」获取最佳阅读体验并参与讨论
以上所述就是小编给大家介绍的《Java并发 -- 生产者-消费者模式》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Java多线程之并发协作生产者消费者设计模式
- 高并发场景下,如何保证生产者投递到消息中间件的消息不丢失?【石杉的架构笔记】
- Kafka生产者简介
- RocketMQ生产者流程篇
- RocketMQ生产者消息篇
- Java精讲:生产者-消费者
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。