一次机房停电引发的思考

栏目: IT技术 · 发布时间: 4年前

内容简介:后台回复如有收获,点个在看,诚挚感谢

一次机房停电引发的思考

今天早上到公司的时候,接到开发反馈 DEV 环境所有接口都卡,耗时都在一分钟以上,严重影响开发正常工作,然后通过网关的日志定位到原因是因为 kafka 集群不可用(总共 3 个 broker,前一天晚上机房停电导致 leader 节点挂了),导致网关的反爬过滤器里面发送 kafka 消息的代码 kafkaTemplat.send 阻塞了 60s,当时在想这个 send 方法不是异步的吗,为什么会阻塞 60s?于是查阅了一些资料,大致搞清楚了原因,这里稍作整理,分享给可能踩坑或者以及踩过坑的同学。

版本信息

  • spring-boot:2.0.6.RELEASE

  • spring-kafka:2.1.2.RELEASE

  • kafka-clients:1.0.2

为什么阻塞了 60s?

首先我们知道 kafkaTemplat.send 底层是调用 KafkaProducer 的 send 方法

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {}


根据 文档的说明 [1] 它是一个异步的发送方法,按道理不管如何它都不应该阻塞主线程,但实际中某些情况下会出现阻塞线程,比如 broker 未正确运行,topic 未创建等情况。具体得源码分析参考 https://www.cnblogs.com/felixzh/p/11849296.html [2]

查询官方文档 http://kafka.apache.org/10/documentation.html [3] 得知,具体阻塞多久是由 max.block.ms 参数决定的,由于我们的业务场景是高容忍消息丢失,低容忍阻塞请求,所以需要进行优化,下面简单介绍一下 2 种优化方案。

!!!注意,以下方案只适用于高容忍消息丢失,低容忍阻塞请求业务场景

优化方案

方案 1:参数调优

  • max.block.ms 调整到 100ms,这个参数有以下 2 个作用

  1. 用于配置 send 数据或 partitionFor 函数得到对应的 leader 时,最大的等待时间,默认值为 60 秒

  2. 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽 buffer.memory 这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 max.block.ms 设定, 之后它将抛出一个 TimeoutException。

  • 关闭自动重试 retries=0 默认就是 0

  • 其他

  • acks=0,acks 有 4 个选项[all, -1, 0, 1] 。这里不确定会不会阻塞 send 方法,但是高容忍消息丢失,低容忍阻塞请求的业务场景配置成 0 就好了

    • 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个 server 失败的情况下,有点像 TCP

    • 1:发送消息,并会等待 leader 收到确认后,一定的可靠性

    • -1 或 all:发送消息,等待 leader 收到确认,并进行复制操作后,才返回,最高的可靠性

  • 其他参数参考 http://kafka.apache.org/10/documentation.html [4]

  • 虽然调整一些参数,但是 kafka 集群不可用或请求量过大时,还是对主流程有短暂的阻塞

    方案 2:真异步

    kafkaTemplat.send 方法其实是个假异步方法,所以需要自己实现真异步,这里构造一个公用的线程池来处理就可以了,下面为参考代码

    package com.qiaofang.tortoise.gateway.component;


    import com.qiaofang.tortoise.gateway.config.KafkaAsyncProperties;

    import org.springframework.kafka.core.KafkaTemplate;

    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;


    import java.util.concurrent.ThreadPoolExecutor;


    /**

    * kafka异步操作 工具

    *

    * @author chenhao

    * @version 1.0

    * @date 2020/7/2 3:47 下午

    */

    public class KafkaAsyncUtil {


    private final KafkaTemplate kafkaTemplate;


    private final KafkaAsyncProperties kafkaAsyncProperties;


    public KafkaAsyncUtil(KafkaTemplate kafkaTemplate, KafkaAsyncProperties kafkaAsyncProperties) {

    this.kafkaTemplate = kafkaTemplate;

    this.kafkaAsyncProperties = kafkaAsyncProperties;

    init();

    }


    private ThreadPoolTaskExecutor executor;


    private void init() {

    executor = new ThreadPoolTaskExecutor();

    executor.setCorePoolSize(kafkaAsyncProperties.getThreadPoolCoreThreads());

    executor.setMaxPoolSize(kafkaAsyncProperties.getThreadPoolMaxThreads());

    executor.setQueueCapacity(kafkaAsyncProperties.getThreadPoolQueueSize());

    executor.setThreadNamePrefix("kafka-async-util-pool-");

    //高容忍消息丢失场景,工作队列满了之后直接丢弃

    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

    executor.initialize();

    }


    /**

    * 发送消息

    *

    * @param topic

    * @param data

    */

    public void send(String topic, Object data) {

    executor.execute(() -> kafkaTemplate.send(topic, data));

    }


    }


    /**

    * kafka异步操作相关配置

    * @author chenhao

    * @version 1.0

    * @date 2020/7/2 3:47 下午

    */

    @Data

    @ConfigurationProperties(prefix = "tortoise.kafka.async")

    public class KafkaAsyncProperties {


    /**

    * core

    */

    private Integer threadPoolCoreThreads = 3;

    /**

    * max

    */

    private Integer threadPoolMaxThreads = 3;

    /**

    * queue大小

    */

    private Integer threadPoolQueueSize = 10000;


    }



    有文章《 关于高并发下 kafka producer send 异步发送耗时问题的分析 [5] 》说多线程高并发下 producer.send 的损耗比较严重,这个还要等到后续压测之后再更新文章吧

    参考文章

    站在巨人的肩膀上

    • Kafka producer 异步发送在某些情况会阻塞主线程,使用时候慎重 [6]

    • HAVENT 原创 Spring Boot + Spring-Kafka 异步配置 [7]

    • 关于高并发下 kafka producer send 异步发送耗时问题的分析 [8]

    • http://kafka.apache.org/10/documentation.html [9]

    参考资料

    [1]

    文档的说明: https://github.com/apache/kafka/blob/1f2d230bfdaafb34c9be12a370ab2eb4d3016039/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L853

    [2]

    https://www.cnblogs.com/felixzh/p/11849296.html

    [3]

    http://kafka.apache.org/10/documentation.html

    [4]

    http://kafka.apache.org/10/documentation.html

    [5]

    关于高并发下 kafka producer send 异步发送耗时问题的分析: https://www.cnblogs.com/dafanjoy/p/10292875.html

    [6]

    Kafka producer 异步发送在某些情况会阻塞主线程,使用时候慎重: https://www.cnblogs.com/felixzh/p/11849296.html

    [7]

    HAVENT 原创 Spring Boot + Spring-Kafka 异步配置: https://my.oschina.net/u/943746/blog/1928471

    [8]

    关于高并发下 kafka producer send 异步发送耗时问题的分析: https://www.cnblogs.com/dafanjoy/p/10292875.html

    [9]

    http://kafka.apache.org/10/documentation.html

    相关推荐

    后台回复 学习资料   领取学习视频

    一次机房停电引发的思考

    如有收获,点个在看,诚挚感谢


    以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

    查看所有标签

    猜你喜欢:

    本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

    C++数值算法(第二版)

    C++数值算法(第二版)

    William T.Vetterling、Brian P.Flannery、Saul A.Teukolsky / 胡健伟、赵志勇、薛运华 / 电子工业出版社 / 2005年01月 / 68.00

    本书选材内容丰富,除了通常数值方法课程的内容外,还包含当代科学计算大量用到的专题,如求特殊函数值、随机数、排序、最优化、快速傅里叶变换、谱分析、小波变换、统计描述和数据建模、常微分方程和偏微分方程数值解、若干编码算法和任意精度的计算等。 本书科学性和实用性统一。每个专题中,不仅对每种算法给出了数学分析和比较,而且根据作者的经验对算法做出了评论和建议,并在此基础上给出了用C++语言编写的实用程......一起来看看 《C++数值算法(第二版)》 这本书的介绍吧!

    Base64 编码/解码
    Base64 编码/解码

    Base64 编码/解码

    UNIX 时间戳转换
    UNIX 时间戳转换

    UNIX 时间戳转换

    HEX HSV 转换工具
    HEX HSV 转换工具

    HEX HSV 互换工具