消息队列(七)RocketMQ消息发送

栏目: 后端 · 发布时间: 5年前

内容简介:大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息发送最为简单,也比较容易入手,适合初中阶童鞋作为MQ研究和学习的切入点。因此,本篇主要从一条消息发送为切入点,详细阐述在RocketMQ这款分布式消息队列中发送一条普通消息的大致流程和细节。RocketMQ分布式消息队列的网络部署架构图如下图所示(其中,包含了生产者Producer发送普通消息至集群的两条主线)对于上图中几个角色的说明:

大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息发送最为简单,也比较容易入手,适合初中阶童鞋作为MQ研究和学习的切入点。因此,本篇主要从一条消息发送为切入点,详细阐述在RocketMQ这款分布式消息队列中发送一条普通消息的大致流程和细节。

一、RocketMQ网络架构图

RocketMQ分布式消息队列的网络部署架构图如下图所示(其中,包含了生产者Producer发送普通消息至集群的两条主线)

消息队列(七)RocketMQ消息发送

对于上图中几个角色的说明:

(1)NameServer:RocketMQ集群的命名服务器(也可以说是注册中心),它本身是无状态的(实际情况下可能存在每个NameServer实例上的数据有短暂的不一致现象,但是通过定时更新,在大部分情况下都是一致的),用于管理集群的元数据( 例如,KV配置、Topic、Broker的注册信息)。

(2)Broker(Master):RocketMQ消息代理服务器主节点,起到串联Producer的消息发送和Consumer的消息消费,和将消息的落盘存储的作用;

(3)Broker(Slave):RocketMQ消息代理服务器备份节点,主要是通过同步/异步的方式将主节点的消息同步过来进行备份,为RocketMQ集群的高可用性提供保障;

(4)Producer(消息生产者):在这里为普通消息的生产者,主要基于RocketMQ-Client模块将消息发送至RocketMQ的主节点。

对于上面图中几条通信链路的关系:

(1)Producer与NamerServer:每一个Producer会与NameServer集群中的一个实例建立TCP连接,从这个NameServer实例上拉取Topic路由信息;

(2)Producer和Broker:Producer会和它要发送的topic相关联的Master的Broker代理服务器建立TCP连接,用于发送消息以及定时的心跳信息;

(3)Broker和NamerServer:Broker(Master or Slave)均会和每一个NameServer实例来建立TCP连接。Broker在启动的时候会注册自己配置的Topic信息到NameServer集群的每一台机器中。即每一个NameServer均有该broker的Topic路由配置信息。其中,Master与Master之间无连接,Master与Slave之间有连接;

二、客户端发送普通消息的demo方法

在RocketMQ源码工程的example包下就有最为简单的发送普通消息的样例代码(ps:对于刚刚接触RocketMQ的童鞋使用这个包下面的样例代码进行系统性的学习和调试)。 我们可以直接run下“org.apache.rocketmq.example.simple”包下Producer类的main方法即可完成一次普通消息的发送(主要代码如下,在这里需本地将NameServer和Broker实例均部署起来):

//step1.启动DefaultMQProducer,启动时的具体流程一会儿会详细说明
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setInstanceName("producer");
        producer.start();

        try {
            {
                //step2.封装将要发送消息的内容
                Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                //step3.发送消息流程,具体流程待会儿说
                SendResult sendResult = producer.send(msg);
            }
        } catch (Exception e) {
            //Exception code
        }
        producer.shutdown();
复制代码

三、RocketMQ发送普通消息的全流程解读

从上面一节中可以看出,消息生产者发送消息的demo代码还是较为简单的,核心就几行代码,但在深入研读RocketMQ的Client模块后,发现其发送消息的核心流程还是有一些复杂的。下面将主要从DefaultMQProducer的启动流程、send发送方法和Broker代理服务器的消息处理三方面分别进行分析和阐述。

3.1 DefaultMQProducer的启动流程

在客户端发送普通消息的demo代码部分,我们先是将DefaultMQProducer实例启动起来,里面调用了默认生成消息的实现类—DefaultMQProducerImpl的start()方法。

@Override
    public void start() throws MQClientException {
        this.defaultMQProducerImpl.start();
    }
复制代码

默认生成消息的实现类—DefaultMQProducerImpl的启动主要流程如下:

(1)初始化得到MQClientInstance实例对象,并注册至本地缓存变量—producerTable中;

(2)将默认Topic(“TBW102”)保存至本地缓存变量—topicPublishInfoTable中;

(3)MQClientInstance实例对象调用自己的start()方法,启动一些客户端本地的服务线程,如拉取消息服务、客户端网络通信服务、重新负载均衡服务以及其他若干个定时任务(包括,更新路由/清理下线Broker/发送心跳/持久化consumerOffset/调整线程池),并重新做一次启动(这次参数为false);

(4)最后向所有的Broker代理服务器节点发送心跳包;

消息队列(七)RocketMQ消息发送

(1)在一个客户端中,一个producerGroup只能有一个实例;

(2)根据不同的clientId,MQClientManager将给出不同的MQClientInstance;

(3)根据不同的producerGroup,MQClientInstance将给出不同的MQProducer和MQConsumer(保存在本地缓存变量——producerTable和consumerTable中);

3.2 send发送方法的核心流程

通过Rocketmq的用户端板块发送消息主要有以下三种方法:

(1)同步方式

(2)异步方式

(3)Oneway方式

其中,用(1)、(2)种方式来发送消息比较常见,具体用哪一种方式需要根据业务情况来判断。本节内容将结合同步发送方式(同步发送模式下,假如有发送失败的最多会有3次重试(也可以自定义),其余模式均1次)进行消息发送核心流程的简析。用同步方式发送消息核心流程的入口如下:

/**
     * 同步方式发送消息核心流程的入口,默认超时时间为3s
     *
     * @param msg     发送消息的具体Message内容
     * @param timeout 其中发送消息的超时时间可以参数设置
     * @return
     * @throws MQClientException
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }
复制代码

3.2.1 尝试获取TopicPublishInfo的路由信息

我们一步步debug进去后会发现在sendDefaultImpl()方法中先对待发送的消息进行前置的验证。如果消息的Topic和Body均没有问题的话,那么会调用—tryToFindTopicPublishInfo()方法,根据待发送消息的中包含的Topic尝试从Client端的本地缓存变量—topicPublishInfoTable中查找,如果没有则会从NameServer上更新Topic的路由信息(其中,调用了MQClientInstance实例的updateTopicRouteInfoFromNameServer方法,最终执行的是MQClientAPIImpl实例的getTopicRouteInfoFromNameServer方法),这里分别会存在以下两种场景:

(1)生产者第一次发送消息(此时,Topic在NameServer中并不存在):因为第一次获取时候并不能从远端的NameServer上拉取下来并更新本地缓存变量—topicPublishInfoTable成功。因此,第二次需要通过默认Topic—TBW102的TopicRouteData变量来构造TopicPublishInfo对象,并更新DefaultMQProducerImpl实例的本地缓存变量——topicPublishInfoTable。

另外,在该种类型的场景下,当消息发送至Broker代理服务器时,在SendMessageProcessor业务处理器的sendBatchMessage/sendMessage方法里面的super.msgCheck(ctx, requestHeader, response)消息前置校验中,会调用TopicConfigManager的createTopicInSendMessageMethod方法,在Broker端完成新Topic的创建并持久化至配置文件中(配置文件路径:{rocketmq.home.dir}/store/config/topics.json)。(ps:该部分内容其实属于Broker有点超本篇的范围,不过由于涉及新Topic的创建因此在略微提了下)

(2)生产者发送Topic已存在的消息:由于在NameServer中已经存在了该Topic,因此在第一次获取时候就能够取到并且更新至本地缓存变量中topicPublishInfoTable,随后tryToFindTopicPublishInfo方法直接可以return。 在RocketMQ中该部分的核心方法源码如下(已经加了注释):

/**
     * 根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo
     * 如果没有则更新路由信息,从nameserver端拉取最新路由信息
     *
     * topicPublishInfo
     * 
     * @param topic
     * @return
     */
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //step1.先从本地缓存变量topicPublishInfoTable中先get一次
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //step1.2 然后从nameServer上更新topic路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            //step2 然后再从本地缓存变量topicPublishInfoTable中再get一次
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            /**
             *  第一次的时候isDefault为false,第二次的时候default为true,即为用默认的topic的参数进行更新
             */
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
复制代码
/**
     * 本地缓存中不存在时从远端的NameServer注册中心中拉取Topic路由信息
     *
     * @param topic
     * @param timeoutMillis
     * @param allowTopicNotExist
     * @return
     * @throws MQClientException
     * @throws InterruptedException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     * @throws RemotingConnectException
     */
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);
        //设置请求头中的Topic参数后,发送获取Topic路由信息的request请求给NameServer
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
       //这里由于是同步方式发送,所以直接return response的响应
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            //如果NameServer中不存在待发送消息的Topic
            case ResponseCode.TOPIC_NOT_EXIST: {
                if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }

                break;
            }
            //如果获取Topic存在,则成功返回,利用TopicRouteData进行解码,且直接返回TopicRouteData
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }
复制代码

将TopicRouteData转换至TopicPublishInfo路由信息的映射图如下:

消息队列(七)RocketMQ消息发送

TopicRouteData变量内容.jpg

消息队列(七)RocketMQ消息发送

TopicPublishInfo变量内容.jpg

消息队列(七)RocketMQ消息发送

3.2.2 选择消息发送的队列

在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下,selectOneMessageQueuef()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义:

public class MQFaultStrategy {
    //维护每个Broker发送消息的延迟
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
    //发送消息延迟容错开关
    private boolean sendLatencyFaultEnable = false;
    //延迟级别数组
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    //不可用时长数组
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
  ......
}
复制代码

这里通过一个sendLatencyFaultEnable开关来进行选择采用下面哪种方式:

(1)sendLatencyFaultEnable开关打开:在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L。

(2)sendLatencyFaultEnable开关关闭(默认关闭):采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息。

/**
     * 根据sendLatencyFaultEnable开关是否打开来分两种情况选择队列发送消息
     * @param tpInfo
     * @param lastBrokerName
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {

                //1.在随机递增取模的基础上,再过滤掉not available的Broker代理;对之前失败的,按一定的时间做退避
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        //2.采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
复制代码

3.2.3 发送封装后的RemotingCommand数据包

在选择完发送消息的队列后,RocketMQ就会调用sendKernelImpl()方法发送消息(该方法为,通过RocketMQ的Remoting通信模块真正发送消息的核心)。在该方法内总共完成以下几个步流程:

(1)根据前面获取到的MessageQueue中的brokerName,调用MQClientInstance实例的findBrokerAddressInPublish()方法,得到待发送消息中存放的Broker代理服务器地址,如果没有找到则跟新路由信息;

(2)如果没有禁用,则发送消息前后会有钩子函数的执行(executeSendMessageHookBefore()/executeSendMessageHookAfter()方法);

(3)将与该消息相关信息封装成RemotingCommand数据包,其中请求码RequestCode为以下几种之一:

a.SEND_MESSAGE(普通发送消息)

b.SEND_MESSAGE_V2(优化网络数据包发送)c.SEND_BATCH_MESSAGE(消息批量发送)

(4)根据获取到的Broke代理服务器地址,将封装好的RemotingCommand数据包发送对应的Broker上,默认发送超时间为3s

(5)这里,真正调用RocketMQ的Remoting通信模块完成消息发送是在MQClientAPIImpl实例sendMessageSync()方法中,代码具体如下:

private SendResult sendMessageSync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        return this.processSendResponse(brokerName, msg, response);
    }
复制代码

(6)processSendResponse方法对发送正常和异常情况分别进行不同的处理并返回sendResult对象;

(7)发送返回后,调用updateFaultItem更新Broker代理服务器的可用时间;

(8)对于异常情况,且标志位—retryAnotherBrokerWhenNotStoreOK,设置为true时,在发送失败的时候,会选择换一个Broker;

在生产者发送完成消息后,客户端日志打印如下:

SendResult [sendStatus=SEND_OK, msgId=020003670EC418B4AAC208AD46930000, offsetMsgId=AC1415A200002A9F000000000000017A, messageQueue=MessageQueue [topic=TopicTest, brokerName=HQSKCJJIDRRD6KC, queueId=2], queueOffset=1]


复制代码

3.3 Broker代理服务器的消息处理简析

Broker代理服务器中存在很多Processor业务处理器,用于处理不同类型的请求,其中一个或者多个Processor会共用一个业务处理器线程池。对于接收到消息,Broker会使用SendMessageProcessor这个业务处理器来处理。SendMessageProcessor会依次做以下处理:

(1)消息前置校验,包括broker是否可写、校验queueId是否超过指定大小、消息中的Topic路由信息是否存在,如果不存在就新建一个。这里与上文中“尝试获取TopicPublishInfo的路由信息”一节中介绍的内容对应。如果Topic路由信息不存在,则Broker端日志输出如下:

2018-06-14 17:17:24 INFO SendMessageThread_1 - receive SendMessage request command, RemotingCommand [code=310, language=JAVA, version=252, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1528967815569, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]
2018-06-14 17:17:24 WARN SendMessageThread_1 - the topic TopicTest not exist, producer: /172.20.21.162:62661
2018-06-14 17:17:24 INFO SendMessageThread_1 - Create new topic by default topic:[TBW102] config:[TopicConfig [topicName=TopicTest, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]] producer:[172.20.21.162:62661]
复制代码

Topic路由信息新建后,第二次消息发送后,Broker端日志输出如下:

2018-08-02 16:26:13 INFO SendMessageThread_1 - receive SendMessage request command, RemotingCommand [code=310, language=JAVA, version=253, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1533198373524, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]
2018-08-02 16:26:13 INFO SendMessageThread_1 - the msgInner's content is:MessageExt [queueId=2, storeSize=0, queueOffset=0, sysFlag=0, bornTimestamp=1533198373524, bornHost=/172.20.21.162:53914, storeTimestamp=0, storeHost=/172.20.21.162:10911, msgId=null, commitLogOffset=0, bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={KEYS=OrderID188, UNIQ_KEY=020003670EC418B4AAC208AD46930000, WAIT=true, TAGS=TagA}, body=11body's content is:Hello world]]

复制代码

(2)构建MessageExtBrokerInner;

(3)调用“brokerController.getMessageStore().putMessage”将MessageExtBrokerInner做落盘持久化处理;

(4)根据消息落盘结果(正常/异常情况),BrokerStatsManager做一些统计数据的更新,最后设置Response并返回;

note:我走得很慢,但是我从不后悔。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Data Mining

Web Data Mining

Bing Liu / Springer / 2006-12-28 / USD 59.95

Web mining aims to discover useful information and knowledge from the Web hyperlink structure, page contents, and usage data. Although Web mining uses many conventional data mining techniques, it is n......一起来看看 《Web Data Mining》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具