Java并发 -- 生产者-消费者模式

栏目: Java · 发布时间: 5年前

内容简介:转载请注明出处:http://zhongmingmao.me/2019/05/26/java-concurrent-producer-consumer/

Java并发 -- 生产者-消费者模式

  1. 生产者-消费者模式的核心是一个 任务队列
    • 生产者线程生产任务,并将任务添加到任务队列中,消费者线程从任务队列中获取任务并执行
  2. 架构设计 的角度来看,生产者-消费者模式有一个很重要的优点: 解耦
  3. 生产者-消费者模式另一个重要的优点是 支持异步 ,并且能够 平衡 生产者和消费者的 速度差异 (任务队列)

支持批量执行

Java并发 -- 生产者-消费者模式

  1. 往数据库INSERT 1000条数据,有两种方案
    • 第一种方案:用1000个线程并发执行,每个线程INSERT一条数据
    • 第二种方案(更优):用1个线程,执行一个批量的SQL,一次性把1000条数据INSERT进去
  2. 将原来直接INSERT数据到数据库的线程作为生产者线程,而生产者线程只需将数据添加到任务队列
    • 然后消费者线程负责将任务从任务队列中批量取出并批量执行
// 任务队列
private BlockingQueue<Task> queue = new LinkedBlockingQueue<>(2000);

// 启动5个消费者线程,执行批量任务
public void start() {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 5; i++) {
        pool.execute(() -> {
            try {
                while (true) {
                    // 获取批量任务
                    List<Task> tasks = pollTasks();
                    // 执行批量任务
                    execTasks(tasks);
                }
            } catch (InterruptedException ignored) {
            }
        });
    }
}

// 从任务队列中获取批量任务
private List<Task> pollTasks() throws InterruptedException {
    List<Task> tasks = new LinkedList<>();
    // 阻塞式获取一个任务
    // 首先采用阻塞式的方式,如果任务队列中没有任务,能够避免无谓的循环
    Task task = queue.take();
    while (task != null) {
        tasks.add(task);
        // 非阻塞式获取一个任务
        task = queue.poll();
    }
    return tasks;
}

// 批量执行任务
private void execTasks(List<Task> tasks) {
}

支持分阶段提交

  1. 写文件如果同步刷盘性能会很慢,对于不是很重要的数据,往往采用 异步刷盘 的方式
  2. 异步刷盘的时机
    • ERROR级别的日志需要立即刷盘
    • 数据累积到500条需要立即刷盘
    • 存在未刷盘数据,且5秒钟内未曾刷盘,需要立即刷盘
  3. 该日志组件的异步刷盘本质上是一种 分阶段提交
public class Logger {
    // 批量异步刷新的数量
    private static final int FLUSH_BATCH_SIZE = 500;
    // 任务队列
    private final BlockingQueue<LogMsg> queue = new LinkedBlockingQueue<>();
    // 只需要一个线程写日志
    private ExecutorService pool = Executors.newFixedThreadPool(1);

    // 启动写日志线程
    public void start() throws IOException {
        File file = File.createTempFile("test", ".log");
        FileWriter writer = new FileWriter(file);
        pool.execute(() -> {
            // 未刷盘日志数量
            int curIdx = 0;
            long preFlushTime = System.currentTimeMillis();
            while (true) {
                try {
                    LogMsg logMsg = queue.poll(5, TimeUnit.SECONDS);
                    // 写日志
                    if (logMsg != null) {
                        writer.write(logMsg.toString());
                        ++curIdx;
                    }
                    // 如果不存在未刷盘数据,则无需刷盘
                    if (curIdx <= 0) {
                        continue;
                    }
                    // 异步刷盘规则
                    if (logMsg != null && logMsg.getLevel() == LEVEL.ERROR ||
                            curIdx == FLUSH_BATCH_SIZE ||
                            System.currentTimeMillis() - preFlushTime > 5_000) {
                        writer.flush();
                        curIdx = 0;
                        preFlushTime = System.currentTimeMillis();
                    }
                } catch (InterruptedException | IOException ignored) {
                } finally {
                    try {
                        writer.flush();
                        writer.close();
                    } catch (IOException ignored) {
                    }
                }
            }
        });
    }

    private void info(@NonNull String msg) throws InterruptedException {
        queue.put(new LogMsg(LEVEL.INFO, msg));
    }

    private void error(@NonNull String msg) throws InterruptedException {
        queue.put(new LogMsg(LEVEL.ERROR, msg));
    }
}

@Data
@AllArgsConstructor
class LogMsg {
    private LEVEL level;
    private String msg;
}

enum LEVEL {
    INFO, ERROR
}

转载请注明出处:http://zhongmingmao.me/2019/05/26/java-concurrent-producer-consumer/

访问原文「 Java并发 -- 生产者-消费者模式 」获取最佳阅读体验并参与讨论


以上所述就是小编给大家介绍的《Java并发 -- 生产者-消费者模式》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

沸騰15年

沸騰15年

林軍 / 商周出版 / 2010年09月19日 / NTD:430元

從一九九五年到二○○九年,中國互聯網崛起、發展和壯大。 在短短十五年間 產生了十五家市值超過十億的上市公司 這些前仆後繼的先行者 不但用網際網路創造了歷史,也改寫了自己的財富路徑。 這本關於中國互聯網產業歷史的書,脈絡清晰、生動鮮明地把一大批創業者的形象和他們的動人故事呈現在讀者眼前,值得一讀。 ——中國互聯網協會理事長、中國科協副主席 胡啟?? 林軍這本......一起来看看 《沸騰15年》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

html转js在线工具
html转js在线工具

html转js在线工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具