消息中间件客户端消费控制实践

栏目: 后端 · 发布时间: 6年前

内容简介:本文来自网易云社区,转载务必请注明出处。消息中间件是用来系统间通信、异步解耦、削峰填谷的重要手段,个人认为一个比较靠谱的Mq,应该具备以下特点

本文来自网易云社区,转载务必请注明出处。

背景

消息中间件是用来系统间通信、异步解耦、削峰填谷的重要手段,个人认为一个比较靠谱的Mq,应该具备以下特点

  • 控制投递:消息消费失败,支持消息有节奏的重新投递

  • 延迟消费:支持消息延迟消费,用来解决诸如消息乱序的场景

  • 流控消费:消费支持流控,真正的支持削峰填谷

  • 消费监控:消息消费的监控

目前考拉常用的消息中间件有rabbitMq和kafka,各自都有一些问题,不能完美胜任以上功能,因此本文试着探索并实践了一个消息控制的框架。

知己知彼

RabbitMq

先拿rabbitMq来说点事儿,这个mq的问题比较大,但是目前交易核心的消息都在rabbitMq上面。

对比原始需求

  • 控制投递:消息消费失败后,消息会被服务端无节制投递,一但处理逻辑有bug,触发无限投递,瞬间服务器会被消息打爆。

  • 延迟消费:乱序问题是mq共用的问题,rabbitMq并没有提供解决方案

  • 流控消费:消息对象保存在mq实例内存中,因此rabbitMq本身不支持堆积太多消息。

  • 消费监控:控制台数据简陋

Kafka

然后拿kafka做下对比,kafka性能、扩展性都优于rabbitMq,不过也不能完全满足我们的需求

对比原始需求

  • 控制投递:kafka消费有offset的概念,通过消费者代码实现可以主动控制消费节奏。

  • 延迟消费:kafka也没有提供结局方案

  • 流控消费:kafka消息数据落在磁盘上,可以堆积比较多的消息,但是对于消费方怎么流控并没有提供方案。

  • 消费监控:数据也比较简陋

设计思路

最理想情况,这些功能可以直接做在服务端上面,客户端不用做太多改造。不过,考虑现实情况,没办法直接去改kafka和rabbitMq源码,只能退而求其次去改造消息客户端,在消息客户端和消费者之间增加一个消息控制框架。

消息控制框架主要结构如下:

消息中间件客户端消费控制实践

另外针对持久化到客户端的数据,还结合k-scheduler提供了一个消息重推模块,如下:

消息中间件客户端消费控制实践

下面针对原始需求,看看消息控制框架都做了什么事情:

  • 控制投递:消息控制层catch消费异常,rabbitMq的消息会直接持久化消息后续重推,kafka消息异常,重置offset,有节制的重推。

  • 延迟消费:消息适配层提供延迟推送接口,需要业务方识别出消息乱序后,调用接口,接口会直接持久化乱序消息,在指定延迟时间后重新推送。

  • 流控消费:对于rabbitMq消息,控制层提供单机流控接口,被流控的消息直接持久化到DB,等待后续重推。另外针对kafka消息,集成了nfc全局流控,框架识别流控错误码,有节制的重推消息。

  • 消费监控:对接哨兵监控,所有消息消费、失败、流控等数据都会采集到哨兵

详细实现

RabbitMq消息的详细实现

rabbitMq和kafka都有各自特点,因此虽然整体框架的思路是一致的,但是一些细节处理还是略有不同,此处先拿rabbitMq的实现来作分析:

先上图

消息中间件客户端消费控制实践

如图展示了一条rabbitMq消息是如何经过消息控制框架的,异常消息、延迟推送以及被流控的消息都会落库,然后等待重推。

被持久化的消息主要包含以下字段

  • 应用名

  • 业务名

  • 协议名(kafka或者rabbitMq)

  • 环境名(预发或者线上或者beta)

  • 消息体

  • 消息重推时间

  • 当前重试次数(根据重试次数实现了一个退避算法,来计算下次重推时间)

  • 消息状态

其中,为了表明一个消息和消费者之间的归属关系,提出了一个 消费者分组 的概念。

一个消费者分组包含应用名、业务名、协议名以及环境名,可以对应到唯一的消费者

重推逻辑如下

消息中间件客户端消费控制实践

重推任务依赖于外部驱动,可以是cron可以是k-schedule,动动手指配置一下就ok。

目前重推任务只支持单机重试,因此大批量的消息重推消费速度不能得到保证。

kafka消息的差异实现

kafka本身可以堆积消息,因此摒弃了流控落库的逻辑,直接重置offset,有节奏的重试。另外,集成了nfc的全局流控,kafka的消费者直接使用nfc全局流控。

此外,对于kafka异常消息的处理,框架也是利用offset来重试,没有落库。

监控示例

消息中间件客户端消费控制实践

核心代码实现

核心类图

消息中间件客户端消费控制实践

针对交易消息做了定制化处理,对于kafka交易消息对接方只需要继承实现AbstractTradeKafkaControlProcessor,对于rabbitMq类型交易消息继承实现AbstractTradeRabbitControlListener即可。

AbstractControlListener中核心的消息控制代码如下,AbstractTradeKafkaControlProcessor中有针对kafka的特点做改动,不再赘述。

/**
     * 消息处理流程
     *
     * @param message
     * @param controlDTO
     */
    protected void processControlMessage(T message, ControlDTO controlDTO) {
        BizIdTypeBond bizIdTypeBond = buildBizIdTypeBond(message);
        MonitorNameSpace monitorNameSpace = buildMonitorNameSpace(bizIdTypeBond);        // 统计消息处理个数
        MonitorFactory.getMonitorService().onNewMessage(monitorNameSpace, 1, false);		// 是否流控
        boolean needRelease = false;        if (isOpenFlowControl()) {
            flowControlService.aquireResource();
            needRelease = true;
        }        // 执行业务逻辑
        try {
            onControlMessage(message, controlDTO);            if (controlDTO.getDelayPush() != null) {                // 延迟推送
                storeService.storeMessage(encodeStoreMessage(message, bizIdTypeBond), controlDTO.getDelayPush(),                        "delay push");
                MonitorFactory.getMonitorService().onStoreMessage4DelayPush(monitorNameSpace, 1, false);
                NotifyConstants.NOTIFY_LOG.warn("delay push messageDTO=", message);
            }
        } catch (Throwable t) {            // 异常控制
            String note = NotifyCommonUtil.buildCallStatck(t, 500);
            storeService.storeMessage(encodeStoreMessage(message, bizIdTypeBond), null, note);
            MonitorFactory.getMonitorService().onStoreMessage4Exception(monitorNameSpace, 1, false, note);
            NotifyConstants.NOTIFY_LOG.warn("process failed messageDTO=", message);            return;
        } finally {            if (needRelease) {
                flowControlService.releaseResoure();
            }
        }
    }复制代码

对接示例

xml配置

<bean id="globalControlConfig"
          >
        <property name="applicationName" value="order"/>
        <property name="enviroment" value="${message.control.environment}"/>
        <property name="tableName" value="tb_mq_message_control"/>
        <property name="dataSource" ref="rdsDataSource"/>
    </bean>

 <!--交易事件变更监听器-->
    <bean id="tradeEventListener"
          >
        <property name="notifyControlConfig" ref="notifyControlConfigTrade"/>
    </bean>

    <bean id="notifyControlConfigTrade"
          >
        <property name="bizGroup" value="trade"/>
    </bean>

 <!-- 交易事件兜底重试任务 -->复制代码
<bean id="retryTaskEntry" class="com.netease.haitao.notify.base.task.runner.RetryTaskEntry"/>复制代码
代码示例

rabbitMq

public class TradeEventListener extends AbstractTradeRabbitControlListener {

   @Resource
   private OrderComposeConfigHolder orderComposeConfigHolder;

   @Resource
   private TradeEventService tradeEventService;

   @Override
   protected boolean isOpenMessageControl() {
      return orderComposeConfigHolder.isOpenTradeMessageControl();
   }

   @Override
   protected int flowControlThreshold() {
      return orderComposeConfigHolder.tradeEventFlowControlThreshold();
   }

   @Override
   protected boolean isOpenFlowControl() {
      return orderComposeConfigHolder.isOpenFlowControl();
   }

   @Override
   public void onControlTradeEvent(TradeEvent tradeEvent, ControlDTO controlDTO) throws Exception {

        OrderComposeLogConstants.notifyLog.info("onTradeEvent,message=" + tradeEvent.toString());
      try {
         tradeEventService.processTradeEvent(tradeEvent);
      } catch (OrderComposeException e) {
         if (e.getErrorCode().equals(OrderComposeErrorEnum.TRADE_EVENT_WRONG_ORDER.intValue())) {
            OrderComposeLogConstants.notifyLog.warn("message wrong order,tradeEvent=" + tradeEvent.toString()
                  + ",delayPush=" + orderComposeConfigHolder.tradeEventWrongOrderDelayPushTime());
            // 乱序之后的延迟消费
            controlDTO.setDelayPush(orderComposeConfigHolder.tradeEventWrongOrderDelayPushTime());
         } else {
            OrderComposeLogConstants.notifyLog.warn("process failed,tradeEvent=" + tradeEvent.toString(), e);
            throw e;
         }
      } catch (Exception e) {
         OrderComposeLogConstants.notifyLog.warn("process failed,tradeEvent=" + tradeEvent.toString(), e);
         throw e;
      }
   }
}复制代码

kafka

/**
 * 订单创建订单占用库存消息处理
 */
public class OrderInvUnpayCloseEventProcessor extends AbstractTradeKafkaControlProcessor<UnpayCancelEvent> {

    @Resource
    private OrderInvModule orderInvModule;

    @Override
    @GlobalResource(resourceName = NfcResources.orderInvNotify)
    public void onControlMessage(UnpayCancelEvent unpayCancelEvent, ControlDTO controlDTO) throws Exception {
        OrderComposeLogConstants.notifyLog.info("OrderInvCreateEventProcessor,unpayCancelEvent=" + unpayCancelEvent);
        orderInvModule.processUnpayClose(unpayCancelEvent.getGorderId());
    }

}复制代码
复制代码

本文来自网易云社区 ,经作者程汉授权发布。

网易云免费体验馆,0成本体验20+款云产品!

更多网易研发、产品、运营经验分享请访问网易云社区。


以上所述就是小编给大家介绍的《消息中间件客户端消费控制实践》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

破茧成蝶:用户体验设计师的成长之路

破茧成蝶:用户体验设计师的成长之路

刘津、李月 / 人民邮电出版社 / 2014-7 / 69.00

市面上已经有很多专业的用户体验书籍,但解决用户体验设计师在职场中遇到的众多现实问题的图书并不多见。本书从用户体验设计师的角度出发,系统地介绍了其职业生涯中的学习方法、思维方式、工作流程等,覆盖了用户体验设计基础知识、设计师的角色和职业困惑、工作流程、需求分析、设计规划和设计标准、项目跟进和成果检验、设计师职业修养以及需要具备的意识等,力图帮助设计师解决在项目中遇到的一些常见问题,找到自己的职业成长......一起来看看 《破茧成蝶:用户体验设计师的成长之路》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

html转js在线工具
html转js在线工具

html转js在线工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试