Kafka从上手到实践-实践真知:Kafka Java Producer

栏目: 后端 · 发布时间: 6年前

内容简介:创建Maven工程,在POM文件中加入如下两个依赖:第一个是Kafka的依赖包,用于创建Producer、ProducerRecord、Consumer等。第二个是Log4J的依赖包,用于输出日志。首先创建Producer需要的配置信息,最基本的有三个信息:

创建Maven工程,在POM文件中加入如下两个依赖:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.0.0</version>
</dependency>

<dependency>
	<groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
</dependency>

第一个是Kafka的依赖包,用于创建Producer、ProducerRecord、Consumer等。第二个是Log4J的依赖包,用于输出日志。

Java Producer

首先创建Producer需要的配置信息,最基本的有三个信息:

  • Kafka集群的地址。
  • 发送的Message中Key的序列化方式。
  • 发送的Message中Value的序列化方式。
    Properties properties = new Properties();
    
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port");
    
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    

然后传入上面实例化好的配置信息,实例化Producer:

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

然后实例化Record对象,该对象承载了要往哪个Topic发送以及Message内容的信息:

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first_topic", "hello world!");

再然后发送Record:

kafkaProducer.send(producerRecord);

最后刷新和关闭Producer:

kafkaProducer.flush();
kafkaProducer.close();

以上就是最简单的Kafka Java Producer的编写方法。运行一下,可以看到类似如下的信息:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

Process finished with exit code 0

Java Producer with Callback

如果我们希望在发送Message后,能监控发送状态,或者在发送异常时对异常进行处理。那么我们就可以使用带有Callback的发送方法:

kafkaProducer.send(producerRecord, new Callback() {
	public void onCompletion(RecordMetadata recordMetadata, Exception e) {
		if (e == null) {
			logger.info("Received new metadata. \n" +
                            "Topic: " + recordMetadata.topic()  + "\n" +
                            "Partition: " + recordMetadata.partition() + "\n" +
                            "Offset: " + recordMetadata.offset() + "\n" +
                            "Timestamp: " + recordMetadata.timestamp());
		} else {
			logger.error("Error while producing: ", e);
		}
	}
});

这样每次发送Message后,都会进入 onCompletion 这个方法中,然后可以使用 RecordMetadata 中记录的各种元数据做一些跟踪和监控的事情,同时如果发送异常了,也可以对异常进行处理。

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[kafka-producer-network-thread | producer-1] INFO com.devtalking.jacefu.kafka.tutorial1.ProducerDemoWithCallback - Received new metadata. 
Topic: first_topic
Partition: 0
Offset: 22
Timestamp: 1546421392063
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

Process finished with exit code 0

Java Producer with Keys

在前文中,Partition的Compaction Cleanup Policy一节中介绍到,在压缩策略时,就涉及到了Message的Key和Value。我们来看看如何在发送Message时带着Key。

首先来看看 ProducerRecord 的另一个构造函数:

public ProducerRecord(String topic, K key, V value) {
        this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}

可以看到,刚才我们只使用了 topicvalue 两个参数,其中还有一个 key ,所以我们在实例化 ProducerRecord 时传入Key就可以了:

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first_topic", "This is the key", "hello world!");

以上所述就是小编给大家介绍的《Kafka从上手到实践-实践真知:Kafka Java Producer》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Spark大数据分析技术与实战

Spark大数据分析技术与实战

董轶群、曹正凤、赵仁乾、王安 / 电子工业出版社 / 2017-7 / 59.00

Spark作为下一代大数据处理引擎,经过短短几年的飞跃式发展,正在以燎原之势席卷业界,现已成为大数据产业中的一股中坚力量。 《Spark大数据分析技术与实战》着重讲解了Spark内核、Spark GraphX、Spark SQL、Spark Streaming和Spark MLlib的核心概念与理论框架,并提供了相应的示例与解析。 《Spark大数据分析技术与实战》共分为8章,其中前4......一起来看看 《Spark大数据分析技术与实战》 这本书的介绍吧!

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具