内容简介:创建Maven工程,在POM文件中加入如下两个依赖:第一个是Kafka的依赖包,用于创建Producer、ProducerRecord、Consumer等。第二个是Log4J的依赖包,用于输出日志。首先创建Producer需要的配置信息,最基本的有三个信息:
创建Maven工程,在POM文件中加入如下两个依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
第一个是Kafka的依赖包,用于创建Producer、ProducerRecord、Consumer等。第二个是Log4J的依赖包,用于输出日志。
Java Producer
首先创建Producer需要的配置信息,最基本的有三个信息:
- Kafka集群的地址。
- 发送的Message中Key的序列化方式。
-
发送的Message中Value的序列化方式。
Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
然后传入上面实例化好的配置信息,实例化Producer:
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
然后实例化Record对象,该对象承载了要往哪个Topic发送以及Message内容的信息:
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first_topic", "hello world!");
再然后发送Record:
kafkaProducer.send(producerRecord);
最后刷新和关闭Producer:
kafkaProducer.flush(); kafkaProducer.close();
以上就是最简单的Kafka Java Producer的编写方法。运行一下,可以看到类似如下的信息:
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. Process finished with exit code 0
Java Producer with Callback
如果我们希望在发送Message后,能监控发送状态,或者在发送异常时对异常进行处理。那么我们就可以使用带有Callback的发送方法:
kafkaProducer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
logger.info("Received new metadata. \n" +
"Topic: " + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset() + "\n" +
"Timestamp: " + recordMetadata.timestamp());
} else {
logger.error("Error while producing: ", e);
}
}
});
这样每次发送Message后,都会进入 onCompletion
这个方法中,然后可以使用 RecordMetadata
中记录的各种元数据做一些跟踪和监控的事情,同时如果发送异常了,也可以对异常进行处理。
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg [kafka-producer-network-thread | producer-1] INFO com.devtalking.jacefu.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata. Topic: first_topic Partition: 0 Offset: 22 Timestamp: 1546421392063 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. Process finished with exit code 0
Java Producer with Keys
在前文中,Partition的Compaction Cleanup Policy一节中介绍到,在压缩策略时,就涉及到了Message的Key和Value。我们来看看如何在发送Message时带着Key。
首先来看看 ProducerRecord
的另一个构造函数:
public ProducerRecord(String topic, K key, V value) {
this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}
可以看到,刚才我们只使用了 topic
和 value
两个参数,其中还有一个 key
,所以我们在实例化 ProducerRecord
时传入Key就可以了:
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first_topic", "This is the key", "hello world!");
以上所述就是小编给大家介绍的《Kafka从上手到实践-实践真知:Kafka Java Producer》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Java 线程池实践出真知
- 实践出真知:云计算新风向
- Kafka从上手到实践-实践真知:搭建Zookeeper集群
- Kafka从上手到实践-实践真知:搭建单机Kafka
- Kafka从上手到实践-实践真知:Kafka Java Consumer
- 【真知拙见】回调地狱和Promise
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Spring技术内幕
计文柯 / 机械工业出版社 / 2010-1-1 / 55.00元
内容简介: 本书是Spring领域的问鼎之作,由业界拥有10余年开发经验的资深Java专家亲自执笔!Java开发者社区和Spring开发者社区一致强烈推荐。 国内第一本基于Spring3.0的著作,从源代码的角度对Spring的内核和各个主要功能模块的架构、设计和实现原理进行了深入剖析。你不仅能从木书中参透Spring框架的优秀架构和设计思想,而且还能从Spring优雅的实现源码中一窥......一起来看看 《Spring技术内幕》 这本书的介绍吧!
XML、JSON 在线转换
在线XML、JSON转换工具
HSV CMYK 转换工具
HSV CMYK互换工具