内容简介:Apache Kafka 作为当下最常用消息中间件之一。给到我的需求是需要我们处理大量的消息(如果单线程处理过多消息会出现性能瓶颈)。1.创建一个可以从topic中poll()消息后传递到线程池以进行进一步处理。
前言:
Apache Kafka 作为当下最常用消息中间件之一。给到我的需求是需要我们处理大量的消息(如果单线程处理过多消息会出现性能瓶颈)。
如何使用 Java 的ExecutorService框架来创建线程池处理大量消息?
1.创建一个可以从topic中poll()消息后传递到线程池以进行进一步处理。
2.创建工作线程,以执行每条消息的进一步处理。
1.topic消息传递到ThreadPoolExecutorService
/** kafka 消息处理*/ public class KafkaProcessor { private final KafkaConsumer<String, String> myConsumer; private ExecutorService executor; private static final Properties KAFKA_PROPERTIES = new Properties(); //基础的kafka配置~ static { KAFKA_PROPERTIES.put("bootstrap.servers", "localhost:9092"); KAFKA_PROPERTIES.put("group.id", "test-consumer-group"); KAFKA_PROPERTIES.put("enable.auto.commit", "true"); KAFKA_PROPERTIES.put("auto.commit.interval.ms", "1000"); KAFKA_PROPERTIES.put("session.timeout.ms", "30000"); KAFKA_PROPERTIES.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KAFKA_PROPERTIES.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } public KafkaProcessor() { this.myConsumer = new KafkaConsumer<>(KAFKA_PROPERTIES);//初始化配置 this.myConsumer.subscribe(Arrays.asList("test")); //订阅topic=test } public void init(int numberOfThreads) { //创建一个线程池 /** * public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) *corePoolSize : 核心线程数,一旦创建将不会再释放。如果创建的线程数还没有达到指定的核心线 程数量,将会继续创建新的核心线程,直到达到最大核心线程数后,核心线程数将不在增加;如果没有空闲的核心线程,同时又未达到最大线程数,则将继续创建非核心线程;如果核心线程数等于最大线程数,则当核心线程都处于激活状态时,任务将被挂起,等待空闲线程来执行。 *maximumPoolSize : 最大线程数,允许创建的最大线程数量。如果最大线程数等于核心线程数,则无法创建非核心线程;如果非核心线程处于空闲时,超过设置的空闲时间,则将被回收,释放占用的资源。 *keepAliveTime : 也就是当线程空闲时,所允许保存的最大时间,超过这个时间,线程将被释放销毁,但只针对于非核心线程。 *unit : 时间单位,TimeUnit.SECONDS等。 *workQueue : 任务队列,存储暂时无法执行的任务,等待空闲线程来执行任务。 *threadFactory : 线程工程,用于创建线程。 *handler : 当线程边界和队列容量已经达到最大时,用于处理阻塞时的程序 */ executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,0L,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); while (true) { ConsumerRecords<String, String> records = myConsumer.poll(100);//每隔时间段进行消息拉取 for (final ConsumerRecord<String, String> record : records) { executor.submit(new KafkaRecordHandler(record)); } } } //别忘记线程池的关闭! public void shutdown() { if (myConsumer != null) { myConsumer.close(); } if (executor != null) { executor.shutdown(); } try { if (executor != null && !executor.awaitTermination(60, TimeUnit.MILLISECONDS)) { executor.shutdownNow(); } }catch (InterruptedException e) { executor.shutdownNow(); } } }复制代码
2.创建工作线程
// 创建消息线程进行处理 public class KafkaRecordHandler implements Runnable { private ConsumerRecord<String, String> record; public KafkaRecordHandler(ConsumerRecord<String, String> record) { this.record = record; } @Override public void run() { //业务操作... System.out.println("value = "+record.value()); System.out.println("Thread id = "+ Thread.currentThread().getId()); } }复制代码
3.Using ?
//消费测试 public class ConsumerTest { public static void main(String[] args) { KafkaProcessor processor = new KafkaProcessor(); try { processor.init(5);//指定相应的线程数! }catch (Exception exp) { processor.shutdown(); } } }复制代码
4.总结
可能并不适合所有方案,按需定制方案。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Kafka 消费线程模型在中通消息服务运维平台的应用
- Java多线程之并发协作生产者消费者设计模式
- 消费端如何保证消息队列MQ的有序消费
- 《吊打面试官》系列-分布式事务、重复消费、顺序消费
- 十一贝:航延险智能判定,公平消费环境惠及消费者
- Kafka消费者的偏移量和高级/简单消费者
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Effective C++
梅耶 (Scott Meyers) / 侯捷 / 电子工业出版社 / 2011-1-1 / 65.00元
《Effective C++:改善程序与设计的55个具体做法(第3版)(中文版)(双色)》内容简介:有人说C++程序员可以分为两类,读过Effective C++的和没读过的。世界项级C++大师scott Meyers成名之作的第三版的确当得起这样的评价。当您读过《Effective C++:改善程序与设计的55个具体做法(第3版)(中文版)(双色)》之后,就获得了迅速提升自己C++功力的一个契机......一起来看看 《Effective C++》 这本书的介绍吧!