Kafka Producer 设置 Interceptor 来统计消息

栏目: 后端 · 发布时间: 6年前

内容简介:Kafka 消息的 Producer 在调用这里的关系是Kafka 的所有配置项常量可以在这个页面

Kafka 消息的 Producer 在调用 producer.send() 方法发送消息时会先把消息放到本地缓冲中,然后由  Kafka 网络线程从缓冲中提取消息再送到 Kafka 代理上去。本地缓冲区大小由 buffer.memory 来配置,默认为 32M(32 * 1024 * 1024L)。如果发消息到网络慢于提交消息到缓冲区的话,缓冲区就可能会满就无法接受新的消息,这时候就要依照 block.on.buffer.full 设置是否暂停还是抛出异常,默认为暂停 producer.send() ;暂停时间由 max.block.ms 决定,默认为 60 秒。 producer.send() 返回一个 Future<RecordMetadata> , 也就是每次调用 send() 方法在缓冲区满后要等待 60 秒才能获得结果(异常)。

这里的关系是 send() --a--> 缓冲区 --b--> 发送到 Kafka 代理 ,自然要在 ab 之间进行流量控制,如果 b 太慢,缓冲区满的话必须把 a 放慢下来。如果能基于缓冲区已使用大小来放缓 a 也是也行的,留待以后进行研究。本文提供另一种实现参考,为 Producer 配置一个 Interceptor 能够大致统计多少消息提交到缓冲区,多少消息从缓冲区取出。

Kafka 的所有配置项常量可以在这个页面 https://kafka.apache.org/0100/javadoc/constant-values.html 找到。对 interceptor.classes 的解释是:可以为 Producer 配置一个或多个 Interceptor(需要实现 ProducerInterceptor)。另外 Consumer 也有自己的  Interceptor(实现 ConsumerInterceptor)。

ProducerInterceptor 有三个接口方法:

  1. void close(): Interceptor 关闭时调用,会在 Producer 关闭前被调用
  2. ProducerRecord<K,V> onSend(ProducerRecord<K, V> record): 由 KafkaProducer.send(ProducerRecord) 和 KafkaProducer.send(ProducerRecord, Callback) 调用,在序列化 key 和  value 和指定 partition(如果没有指定) 之前调用,就是说在把消息放到缓冲区之前调用。该方法可能再次对消息进行修改。
  3. void onAcknowledgement(RecordMetadata metadata, Exception exception): 该方法在消息从缓冲区提出来成功发送到了网络,或发送失败后都被调用
  4. void configure(Map<String, ?> configs): 在创建 KafkaProducer 之前还有一次机会对属性进行配置

现在用代码来演示来统计提交到缓冲区,发送成功,发送失败的消息记录数

Producer 相关代码

public class Main {
    private static final Logger logger = LoggerFactory.getLogger(Main.class);
 
    public static void main(String[] args) {
        String topic = "test_topic";
 
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "blog.yanbin.StatisticsProducerInterceptor");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<>(topic, String.valueOf(i), String.valueOf(i)));
        }
 
        producer.close();
 
        logger.info(StatisticsProducerInterceptor.getRecordStatistics());
    }

上面用 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG 指定了一个 Interceptor 的实现类  StatisticsProducerInterceptor ,它的代码如下

public class StatisticsProducerInterceptor implements ProducerInterceptor<String, String> {
    private static final Logger logger = LoggerFactory.getLogger(StatisticsProducerInterceptor.class);
 
    private static LongAdder submittedRecords = new LongAdder();
    private static LongAdder deliveredRecords = new LongAdder();
    private static LongAdder failedRecords = new LongAdder();
 
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        ProducerRecord<String, String> updatedRecord = record.value().compareTo("3") < 0 ? record :
            new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(),
                record.value() + "+U"); //演示修改消息
 
        logger.info("record: {} to be sent, updated value from {} to {}",
            updatedRecord, record.value(), updatedRecord.value());
 
        submittedRecords.increment(); //如果消息最终无法被序列化,将不被放到缓冲区,并触发 onAcknowledgement() 方法并带有异常
        return updatedRecord;
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if(exception == null) {
            deliveredRecords.increment();
            logger.info("sent message: topic: {}, partition: {}, offset: {}, timestamp: {}, checksum: {}",
                metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp(), metadata.checksum());
        } else {
            failedRecords.increment();
            logger.error("failed to send message: {}", metadata, exception);
        }
 
        logger.info(getRecordStatistics());
    }
 
    @Override
    public void close() {
        logger.info("producer closed");
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
        logger.info("configuration: {}", configs);
    }
 
    public static String getRecordStatistics() {
        return String.format("record statistics, submitted: %s, delivered: %s, failed: %s",
            submittedRecords.longValue(), deliveredRecords.longValue(), failedRecords.longValue());
    }
}

执行后效果大概如下

2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=0, value=0, timestamp=null) to be sent, updated value from 0 to 0  2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1, value=1, timestamp=null) to be sent, updated value from 1 to 1  2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=2, value=2, timestamp=null) to be sent, updated value from 2 to 2  2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=3, value=3+U, timestamp=null) to be sent, updated value from 3 to 3+U  2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=4, value=4+U, timestamp=null) to be sent, updated value from 4 to 4+U  2018-11-01 00:33:23 [main] INFO KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6351, timestamp: 1541050403463, checksum: 1478612472  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 1, failed: 0  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6352, timestamp: 1541050403475, checksum: 4199907714  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 2, failed: 0  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6353, timestamp: 1541050403475, checksum: 3855131286  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 3, failed: 0  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6354, timestamp: 1541050403475, checksum: 1502822821  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 4, failed: 0  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6355, timestamp: 1541050403475, checksum: 3673351358  2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 5, failed: 0  2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - producer closed  2018-11-01 00:33:23 [main] INFO Main - record statistics, submitted: 5, delivered: 5, failed: 0

从日志中可以看到总共提交了 5 条消息,成功发送了 5 条消息,失败消息数为 0。消息能在 onSend(..) 函数中被修改。而且看起来好像完成把全部消息放到缓冲区后才开始发送消息,main 函数中数字改为 10,也差不多, onSend(..) 调用完 10 才开始真正发送消息到网络。但是注意到  onSend(..)onAcknowledgement(..) 是由不同的线程调用的,所以它们不该存在先后顺序的。

若是不信,我们可以一次性发送 2000 条消息,修改 main 函数的循环次数为 2000,执行后再查看日志,以下是片断

2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=0, value=0, timestamp=null) to be sent, updated value from 0 to 0  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1, value=1, timestamp=null) to be sent, updated value from 1 to 1  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=821, value=821+U, timestamp=null) to be sent, updated value from 820 to 820+U  2018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 821, delivered: 1, failed: 0  2018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 19357, timestamp: 1541052100757, checksum: 791494235  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=855, value=855+U, timestamp=null) to be sent, updated value from 855 to 855+U  2018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 855, delivered: 2, failed: 0  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1612, value=1612, timestamp=null) to be sent, updated value from 1612 to 1612  2018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 1611, delivered: 242, failed: 0  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1614, value=1614, timestamp=null) to be sent, updated value from 1614 to 1614  2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1999, value=1999, timestamp=null) to be sent, updated value from 1999 to 1999  2018-11-01 01:01:40 [main] INFO KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.  2018-11-01 01:01:41 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 2000, delivered: 1999, failed: 0  2018-11-01 01:01:41 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 21355, timestamp: 1541052100856, checksum: 2489747570  2018-11-01 01:01:41 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 2000, delivered: 2000, failed: 0  2018-11-01 01:01:41 [main] INFO StatisticsProducerInterceptor - producer closed  2018-11-01 01:01:41 [main] INFO Main - record statistics, submitted: 2000, delivered: 2000, failed: 0

从日志来说明,发送从缓冲区中取消息发送到网络上并不需要等待所有的消息都放到缓冲区后再进行,它们是不同的两个线程,但是从最络来看待发送的消息都成功发送到了 Kafka 代理上。

使用 ProducerInterceptor 还是可以比较准确的统计到待发送消息与成功送到网络的记录数,如果消息不能被序列化将直接带异常的触发 onAcknowledgement(..) 方法,并统计为发送失败。这也是我们想要的结果。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Google模式

Google模式

Eric Schmidt、Jonathan Rosenberg / 李芳齡 / 天下雜誌出版社 / 2014-11-7 / TWD 420.00

上市即登紐約時報暢銷書、Amazon科技經營排行榜TOP1 未上市即售出美、英、德、日、荷等12國版權 Google創辦人Larry Page專文推薦 第一本由Google領導團隊人首度公開的企業內部運作與思維 Google董事會執行主席艾力克.施密特獨家揭露 Google從崛起到稱霸超過10年的管理與工作秘笈, 以及如何吸引21世紀最搶手的人才-智慧創做者(S......一起来看看 《Google模式》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具