Kafka从上手到实践-实践真知:Kafka Java Producer

栏目: 后端 · 发布时间: 5年前

内容简介:创建Maven工程,在POM文件中加入如下两个依赖:第一个是Kafka的依赖包,用于创建Producer、ProducerRecord、Consumer等。第二个是Log4J的依赖包,用于输出日志。首先创建Producer需要的配置信息,最基本的有三个信息:

创建Maven工程,在POM文件中加入如下两个依赖:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.0.0</version>
</dependency>

<dependency>
	<groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
</dependency>

第一个是Kafka的依赖包,用于创建Producer、ProducerRecord、Consumer等。第二个是Log4J的依赖包,用于输出日志。

Java Producer

首先创建Producer需要的配置信息,最基本的有三个信息:

  • Kafka集群的地址。
  • 发送的Message中Key的序列化方式。
  • 发送的Message中Value的序列化方式。
    Properties properties = new Properties();
    
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port");
    
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    

然后传入上面实例化好的配置信息,实例化Producer:

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

然后实例化Record对象,该对象承载了要往哪个Topic发送以及Message内容的信息:

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first_topic", "hello world!");

再然后发送Record:

kafkaProducer.send(producerRecord);

最后刷新和关闭Producer:

kafkaProducer.flush();
kafkaProducer.close();

以上就是最简单的Kafka Java Producer的编写方法。运行一下,可以看到类似如下的信息:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

Process finished with exit code 0

Java Producer with Callback

如果我们希望在发送Message后,能监控发送状态,或者在发送异常时对异常进行处理。那么我们就可以使用带有Callback的发送方法:

kafkaProducer.send(producerRecord, new Callback() {
	public void onCompletion(RecordMetadata recordMetadata, Exception e) {
		if (e == null) {
			logger.info("Received new metadata. \n" +
                            "Topic: " + recordMetadata.topic()  + "\n" +
                            "Partition: " + recordMetadata.partition() + "\n" +
                            "Offset: " + recordMetadata.offset() + "\n" +
                            "Timestamp: " + recordMetadata.timestamp());
		} else {
			logger.error("Error while producing: ", e);
		}
	}
});

这样每次发送Message后,都会进入 onCompletion 这个方法中,然后可以使用 RecordMetadata 中记录的各种元数据做一些跟踪和监控的事情,同时如果发送异常了,也可以对异常进行处理。

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[kafka-producer-network-thread | producer-1] INFO com.devtalking.jacefu.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata. 
Topic: first_topic
Partition: 0
Offset: 22
Timestamp: 1546421392063
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

Process finished with exit code 0

Java Producer with Keys

在前文中,Partition的Compaction Cleanup Policy一节中介绍到,在压缩策略时,就涉及到了Message的Key和Value。我们来看看如何在发送Message时带着Key。

首先来看看 ProducerRecord 的另一个构造函数:

public ProducerRecord(String topic, K key, V value) {
        this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}

可以看到,刚才我们只使用了 topicvalue 两个参数,其中还有一个 key ,所以我们在实例化 ProducerRecord 时传入Key就可以了:

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first_topic", "This is the key", "hello world!");

以上所述就是小编给大家介绍的《Kafka从上手到实践-实践真知:Kafka Java Producer》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

从“为什么”开始

从“为什么”开始

[美] 西蒙·斯涅克 / 苏西 / 海天出版社 / 2011-7 / 32.00元

影响人类的行为:要么靠操纵,要么靠感召。 操纵带来的是交易,是短期效益; 感召带来的是信任,是永续经营! 盖茨走后,微软面临怎样的挑战?后盖茨时代,微软为何从一个希望改变世界的公司沦落为一个做软件的公司? 沃尔玛的灵魂人物过世后,一度被人们热爱的公司,遭到的竟然多是顾客、员工的反感?沃尔玛要怎样做才能重放昔日光彩? 星巴克吸引人们购买的不是咖啡,而是理念?为什么说霍华......一起来看看 《从“为什么”开始》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

随机密码生成器
随机密码生成器

多种字符组合密码

URL 编码/解码
URL 编码/解码

URL 编码/解码