内容简介:Kafka是分布式消息发布和订阅的消息系统。而生产者则对应的是消息的生产和发布。也就是说,生产者是消息的来源。生产者将消息发送搭kafka中。Kafka生产者发送的消息封装成ProduceRecord,其中包含了消息的主题和要发送的内容。 ProducerRecord对象有四个属性:Kafka生产者需要指定三个必要的属性:
Kafka是分布式消息发布和订阅的消息系统。而生产者则对应的是消息的生产和发布。也就是说,生产者是消息的来源。生产者将消息发送搭kafka中。
ProducerRecord
Kafka生产者发送的消息封装成ProduceRecord,其中包含了消息的主题和要发送的内容。 ProducerRecord对象有四个属性:
- Topic 消息的主题
- Partition 消息也可以指定发送到的partition
- Key, 通过计算Key,可以定位该消息要发送到那个Partition上
- Value, 也就是消息的内容了
创建生产者
Kafka生产者需要指定三个必要的属性:
- bootstrap.servers , 指定broker的地址清单,格式为host:port。(一般建议指定两个broker,防止宕机);
- key.serializer ,在网络传输中,消息的键值和值需要序列化成字节数组。即使消息的key为空,也需要指定;
- value.serializer , 将消息的值序列化成字节数组。
消息发送方式
生产者调用send()函数发送消息,有三种发送的方式,分别是: 1. 同步发送 :顾名思义。生产者给broker发送玩消息之后,需要等待broker的响应,这才能知道消息是发送成功还是发生了错误。这种模式下,send()函数会返回一个Future对象,调用Future对象的get()等待broker的响应。 2. 异步发送 :同步发送,生产者需要等到broker的响应,此时生产者是处于阻塞状态的。频繁发送消息的时候,这种等待会降低kafka的吞吐量。异步发送采用回调函数的形式,生产者发送完消息之后,不用同步阻塞,broker在响应的时候,调用回调函数向生产者发送消息。 3. 发送并忘记 :生产者发送消息之后,不再等待broker的响应。也就是说在这种模式下,生产者不关心消息是否正常到达。
其他重要参数(非必须)
前面已经讲了三个生产者的重要参数,还有几个参数值得我们注意。
-
acks:指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入成功。 acks=0时,生产者在成功写入消息之前不会等待任何来自服务器的响应。 acks-1时,只要集群的首领节点受到消息,生产者就会收到一个来自服务器的成功响应。 acks=all时,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
-
buffer.memory:改参数制定了生产者内存缓冲的大小,生产者用它缓冲要发送到服务器的消息。当消息的发送速度小于应用程序产生消息的速度是,或导致生产者空间不足。此时send()函数要么被阻塞,要么抛出异常。
-
compression.type:指定消息放松给broker之前使用的压缩算法的类型。
-
retries: 当生产者的消息发送到服务器产生错误是临时性错误时,生产者将会重发消息。retries指定了消息重发的次数,当重发次数超过这个数时,生产者将会放弃重发,并返回错误。retry.backoff.ms参数指定了重发之间的时间间隔,默认是100ms,也可以更改这个设置。
-
batch.size:当有多个消息发送到同一个分区时,生产者会把他们放在同一个批次里。这个参数指定了同一个批次可以使用的内存的大小,按照字节数计算。
-
linger.ms:指定生产者发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或者linger.ms达到上限是把批次发送出去。
-
client.id:该参数可以试试任意字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。
-
max.in.flight.requests.per.connection:该参数指定了生产者在收到服务器响应之前可以发送多少消息。把它设置为1可以保证消息是按照发送顺序写入服务器的,及时发生了重试。 Kafka一个分区内的消息有序性依靠这个参数来设置,当第一次消息发送失败的重试之前,不会有其他消息发送给broker。
-
timeout.ms / request.timeout.ms / metadata.fetch.timeout.ms: request.timeout.ms指定了生产者发送数据时等待服务器返回响应的时间; metadata.fetch.timeout.ms指定了生产者在获取元数据(比如目标分区的首领是谁)是等待服务器返回响应的时间。如果服务器响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常使用回调)。timeout.ms指定了broker等待同步副本返回消息的确认时间,与asks的配置相匹配,如果在指定时间没有收到同步副本的确认,那么broker就会返回一个错误。
-
max.block.ms:指定了在调用send()方法或者使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者发送缓冲区已满,或者没有可用的元数据的时候,这些方法就会被阻塞。在阻塞时间到达max.block.ms时,生产者就会抛出异常。
-
max.request.size:用于控制生产者发送的请求的发小,他可以指定发送的单个消息的最大值,也可以之单个请求所有消息总的大小。
-
receive.buffer.bytes和send.buffer.bytes:这两个参数分别指定了TCP socket接受和发送数据包的缓冲大小。若他们被指定为-1,则表示使用操作系统的默认值。
这篇文章记录了Kafka生产者的一些特点。生产者作为Kafka消息的生产者,在这个消息系统中的作用是极其巨大的,我们也应该掌握生产者的实现以及一些实际条件下的优化。
Kafka序列化器与分区
生产者将消息序列化成字节数组,通过网络发送给服务器。在生产者的设置中存在两种,分别是key.serializer和value.serializer.
为什么不建议使用自定义序列化器
为了避免耦合,序列化器和反序列化器之间的兼容问题值得注意。自定义序列化器一旦更改,会导致使用该序列化器的工作以及反序列化器都需要进行相应的改变。因此建议使用现有的序列化器和反序列化器,诸如JSON,Avro,Thrift,Protobuf等
Avro序列化器
Avor是一种编程语言无关性的序列化格式,这一点是采用与语言无关的schema来定义的。Schema通过Json来描述,数据被序列化成二进制文件或者Json文件。Avro在读写文件的时候,会用到schema,虽然Schema一般会嵌入到数据文件里,但是在kafka中,假如每一条消息都嵌入了schema,这样会导致大量的数据冗余。因此Kafka采用了通用的结构模式并使用 schema注册 表来实现。也就是将schema写入到注册表中,然后再记录中应用schema标识符,负责读取数据的应用程序只要从注册表中拉去schema来反序列化记录。序列化器只要将schema注册到schema注册表。
以上所述就是小编给大家介绍的《Kafka生产者简介》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- RocketMQ生产者流程篇
- RocketMQ生产者消息篇
- Java精讲:生产者-消费者
- Kafka 源码解析:生产者运行机制
- Kafka 探险-生产者源码分析: 核心组件
- Java并发 -- 生产者-消费者模式
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。