消息中间件RocketMQ消息发送

栏目: 后端 · 发布时间: 5年前

内容简介:摘要:使用客户端发送一条消息很Easy,在这背后RocketMQ完成了怎么样的操作呢?大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息发送最为简单,也比较容易入手,适合初中阶童鞋作为MQ研究和学习的切入点。因此,本篇主要从一条消息发送为切入点,详细阐述在RocketMQ这款分布式消息队列中发送一条普通消息的大致流程和细节。一、RocketMQ网络架构图
编辑推荐:

本文来自于jianshu,文章介绍了RocketMQ网络架构图以及RocketMQ发送普通消息的全流程解读等相关内容。

摘要:使用客户端发送一条消息很Easy,在这背后RocketMQ完成了怎么样的操作呢?

大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息发送最为简单,也比较容易入手,适合初中阶童鞋作为MQ研究和学习的切入点。因此,本篇主要从一条消息发送为切入点,详细阐述在RocketMQ这款分布式消息队列中发送一条普通消息的大致流程和细节。

一、RocketMQ网络架构图

RocketMQ分布式消息队列的网络部署架构图如下图所示(其中,包含了生产者Producer发送普通消息至集群的两条主线)

消息中间件RocketMQ消息发送

RocketMQ部署架构.jpg

对于上图中几个角色的说明:

(1)NameServer:RocketMQ集群的命名服务器(也可以说是注册中心),它本身是无状态的(实际情况下可能存在每个NameServer实例上的数据有短暂的不一致现象,但是通过定时更新,在大部分情况下都是一致的),用于管理集群的元数据( 例如,KV配置、Topic、Broker的注册信息)。

(2)Broker(Master):RocketMQ消息代理服务器主节点,起到串联Producer的消息发送和Consumer的消息消费,和将消息的落盘存储的作用;

(3)Broker(Slave):RocketMQ消息代理服务器备份节点,主要是通过同步/异步的方式将主节点的消息同步过来进行备份,为RocketMQ集群的高可用性提供保障;

(4)Producer(消息生产者):在这里为普通消息的生产者,主要基于RocketMQ-Client模块将消息发送至RocketMQ的主节点。

对于上面图中几条通信链路的关系:

(1)Producer与NamerServer:每一个Producer会与NameServer集群中的一个实例建立TCP连接,从这个NameServer实例上拉取Topic路由信息;

(2)Producer和Broker:Producer会和它要发送的topic相关联的Master的Broker代理服务器建立TCP连接,用于发送消息以及定时的心跳信息;

(3)Broker和NamerServer:Broker(Master or Slave)均会和每一个NameServer实例来建立TCP连接。Broker在启动的时候会注册自己配置的Topic信息到NameServer集群的每一台机器中。即每一个NameServer均有该broker的Topic路由配置信息。其中,Master与Master之间无连接,Master与Slave之间有连接;

二、客户端发送普通消息的demo方法

在RocketMQ源码工程的example包下就有最为简单的发送普通消息的样例代码(ps:对于刚刚接触RocketMQ的童鞋使用这个包下面的样例代码进行系统性的学习和调试)。

我们可以直接run下“org.apache.rocketmq.example.simple”包下Producer类的main方法即可完成一次普通消息的发送(主要代码如下,在这里需本地将NameServer和Broker实例均部署起来):

//step1.启动DefaultMQProducer,启动时的具体流程一会儿会详细说明

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

producer.setNamesrvAddr("127.0.0.1:9876");

producer.setInstanceName("producer");

producer.start();

try {

{

//step2.封装将要发送消息的内容

Message msg = new Message("TopicTest",

"TagA",

"OrderID188",

"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

//step3.发送消息流程,具体流程待会儿说

SendResult sendResult = producer.send(msg);

}

} catch (Exception e) {

//Exception code

}

producer.shutdown();

三、RocketMQ发送普通消息的全流程解读

从上面一节中可以看出,消息生产者发送消息的demo代码还是较为简单的,核心就几行代码,但在深入研读RocketMQ的Client模块后,发现其发送消息的核心流程还是有一些复杂的。下面将主要从DefaultMQProducer的启动流程、send发送方法和Broker代理服务器的消息处理三方面分别进行分析和阐述。

3.1 DefaultMQProducer的启动流程

在客户端发送普通消息的demo代码部分,我们先是将DefaultMQProducer实例启动起来,里面调用了默认生成消息的实现类—DefaultMQProducerImpl的start()方法。

@Override

public void start() throws MQClientException {

this.defaultMQProducerImpl.start();

}

默认生成消息的实现类—DefaultMQProducerImpl的启动主要流程如下:

(1)初始化得到MQClientInstance实例对象,并注册至本地缓存变量—producerTable中;

(2)将默认Topic(“TBW102”)保存至本地缓存变量—topicPublishInfoTable中;

(3)MQClientInstance实例对象调用自己的start()方法,启动一些客户端本地的服务线程,如拉取消息服务、客户端网络通信服务、重新负载均衡服务以及其他若干个定时任务(包括,更新路由/清理下线Broker/发送心跳/持久化consumerOffset/调整线程池),并重新做一次启动(这次参数为false);

(4)最后向所有的Broker代理服务器节点发送心跳包;

总结起来,DefaultMQProducer的主要启动流程如下:

消息中间件RocketMQ消息发送

DefaultMQProducer的start方法启动过程.jpg

这里有以下几点需要说明:

(1)在一个客户端中,一个producerGroup只能有一个实例;

(2)根据不同的clientId,MQClientManager将给出不同的MQClientInstance;

(3)根据不同的producerGroup,MQClientInstance将给出不同的MQProducer和MQConsumer(保存在本地缓存变量——producerTable和consumerTable中);

3.2 send发送方法的核心流程

通过Rocketmq的客户端模块发送消息主要有以下三种方法:

(1)同步方式

(2)异步方式

(3)Oneway方式

其中,使用(1)、(2)种方式来发送消息比较常见,具体使用哪一种方式需要根据业务情况来判断。本节内容将结合同步发送方式(同步发送模式下,如果有发送失败的最多会有3次重试(也可以自己设置),其他模式均1次)进行消息发送核心流程的简析。使用同步方式发送消息核心流程的入口如下:

/**

* 同步方式发送消息核心流程的入口,默认超时时间为3s

*

* @param msg 发送消息的具体Message内容

* @param timeout 其中发送消息的超时时间可以参数设置

* @return

* @throws MQClientException

* @throws RemotingException

* @throws MQBrokerException

* @throws InterruptedException

*/

public SendResult send(Message msg,

long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);

}

3.2.1 尝试获取TopicPublishInfo的路由信息

我们一步步debug进去后会发现在sendDefaultImpl()方法中先对待发送的消息进行前置的验证。如果消息的Topic和Body均没有问题的话,那么会调用—tryToFindTopicPublishInfo()方法,根据待发送消息的中包含的Topic尝试从Client端的本地缓存变量—topicPublishInfoTable中查找,如果没有则会从NameServer上更新Topic的路由信息(其中,调用了MQClientInstance实例的updateTopicRouteInfoFromNameServer方法,最终执行的是MQClientAPIImpl实例的getTopicRouteInfoFromNameServer方法),这里分别会存在以下两种场景:

(1)生产者第一次发送消息(此时,Topic在NameServer中并不存在):因为第一次获取时候并不能从远端的NameServer上拉取下来并更新本地缓存变量—topicPublishInfoTable成功。因此,第二次需要通过默认Topic—TBW102的TopicRouteData变量来构造TopicPublishInfo对象,并更新DefaultMQProducerImpl实例的本地缓存变量——topicPublishInfoTable。

另外,在该种类型的场景下,当消息发送至Broker代理服务器时,在SendMessageProcessor业务处理器的sendBatchMessage/sendMessage方法里面的super.msgCheck(ctx, requestHeader, response)消息前置校验中,会调用TopicConfigManager的createTopicInSendMessageMethod方法,在Broker端完成新Topic的创建并持久化至配置文件中(配置文件路径:{rocketmq.home.dir}/store/config/topics.json)。(ps:该部分内容其实属于Broker有点超本篇的范围,不过由于涉及新Topic的创建因此在略微提了下)

(2)生产者发送Topic已存在的消息:由于在NameServer中已经存在了该Topic,因此在第一次获取时候就能够取到并且更新至本地缓存变量中topicPublishInfoTable,随后tryToFindTopicPublishInfo方法直接可以return。

在RocketMQ中该部分的核心方法源码如下(已经加了注释):

/**

* 根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo

* 如果没有则更新路由信息,从nameserver端拉取最新路由信息

*

* topicPublishInfo

*

* @param topic

* @return

*/

private TopicPublishInfo tryToFindTopicPublishInfo(

final String topic) {

//step1.先从本地缓存变量topicPublishInfoTable中先get一次

TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);

if (null == topicPublishInfo || !topicPublishInfo.

ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());

//step1.2 然后从nameServer上更新topic路由信息

this.mQClientFactory.updateTopicRo

uteInfoFromNameServer(topic);

//step2 然后再从本地缓存变量topicPublishInfoTable中

再get一次

topicPublishInfo = this.topicPublishInfoTable.get(topic);

}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {

return topicPublishInfo;

} else {

/**

* 第一次的时候isDefault为false,第二次的时候default为true,

即为用默认的topic的参数进行更新

*/

this.mQClientFactory.updateTopicR

outeInfoFromNameServer(topic, true, this.defaultMQProducer);

topicPublishInfo = this.topicPublishInfoTable.get(topic);

return topicPublishInfo;

}

}

/**

* 本地缓存中不存在时从远端的NameServer注册中心中拉取Topic路由信息

*

* @param topic

* @param timeoutMillis

* @param allowTopicNotExist

* @return

* @throws MQClientException

* @throws InterruptedException

* @throws RemotingTimeoutException

* @throws RemotingSendRequestException

* @throws RemotingConnectException

*/

public TopicRouteData getTopicRouteInfoFromNameServer(final

String topic, final long timeoutMillis,

boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {

GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();

requestHeader.setTopic(topic);

//设置请求头中的Topic参数后,发送获取Topic路由信息的request

请求给NameServer

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode

.GET_ROUTEINTO_BY_TOPIC, requestHeader);

//这里由于是同步方式发送,所以直接return response的响应

RemotingCommand response = this.remotingClient.invokeSync(null,

request, timeoutMillis);

assert response != null;

switch (response.getCode()) {

//如果NameServer中不存在待发送消息的Topic

case ResponseCode.TOPIC_NOT_EXIST: {

if (allowTopicNotExist && !topic.equals(MixAll

.DEFAULT_TOPIC)) {

log.warn("get Topic [{}] RouteInfoFromNameServer

is not exist value", topic);

}

break;

}

//如果获取Topic存在,则成功返回,利用TopicRouteData进行

解码,且直接返回TopicRouteData

case ResponseCode.SUCCESS: {

byte[] body = response.getBody();

if (body != null) {

return TopicRouteData.decode(body, TopicRouteData.class);

}

}

default:

break;

}

throw new MQClientException(response.getCode(),

response.getRemark());

}

将TopicRouteData转换至TopicPublishInfo路由信息的映射图如下:

消息中间件RocketMQ消息发送

Client中TopicRouteData到TopicPublishInfo的映射.jpg

其中,上面的TopicRouteData和TopicPublishInfo路由信息变量大致如下:

消息中间件RocketMQ消息发送

TopicRouteData变量内容.jpg

消息中间件RocketMQ消息发送

TopicPublishInfo变量内容.jpg

3.2.2 选择消息发送的队列

在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下,selectOneMessageQueuef()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义:

public class MQFaultStrategy {

//维护每个Broker发送消息的延迟

private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl();

//发送消息延迟容错开关

private boolean sendLatencyFaultEnable = false;

//延迟级别数组

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

//不可用时长数组

private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

......

}

这里通过一个sendLatencyFaultEnable开关来进行选择采用下面哪种方式:

(1)sendLatencyFaultEnable开关打开:在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L。

(2)sendLatencyFaultEnable开关关闭(默认关闭):采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息。

/**

* 根据sendLatencyFaultEnable开关是否打开来分两种情

况选择队列发送消息

* @param tpInfo

* @param lastBrokerName

* @return

*/

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName)

{

if (this.sendLatencyFaultEnable) {

try {

//1.在随机递增取模的基础上,再过滤掉not available

的Broker代理;对之前失败的,按一定的时间做退避

int index = tpInfo.getSendWhichQueue()

.getAndIncrement();

for (int i = 0; i < tpInfo.getMessageQueueList().

size(); i++) {

int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();

if (pos < 0)

pos = 0;

MessageQueue mq = tpInfo.getMessageQueueList().

get(pos);

if (latencyFaultTolerance.isAvailable(mq.

getBrokerName())) {

if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))

return mq;

}

}

final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();

int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);

if (writeQueueNums > 0) {

final MessageQueue mq = tpInfo.

selectOneMessageQueue();

if (notBestBroker != null) {

mq.setBrokerName(notBestBroker);

mq.setQueueId(tpInfo.getSendWhichQueue().

getAndIncrement() % writeQueueNums);

}

return mq;

} else {

latencyFaultTolerance.remove(notBestBroker);

}

} catch (Exception e) {

log.error("Error occurred when selecting message

queue", e);

}

return tpInfo.selectOneMessageQueue();

}

//2.采用随机递增取模的方式选择一个队列

(MessageQueue)来发送消息

return tpInfo.selectOneMessageQueue

(lastBrokerName);

}

3.2.3 发送封装后的RemotingCommand数据包

在选择完发送消息的队列后,RocketMQ就会调用sendKernelImpl()方法发送消息(该方法为,通过RocketMQ的Remoting通信模块真正发送消息的核心)。在该方法内总共完成以下几个步流程:

(1)根据前面获取到的MessageQueue中的brokerName,调用MQClientInstance实例的findBrokerAddressInPublish()方法,得到待发送消息中存放的Broker代理服务器地址,如果没有找到则跟新路由信息;

(2)如果没有禁用,则发送消息前后会有钩子函数的执行(executeSendMessageHookBefore()/

executeSendMessageHookAfter()方法);

(3)将与该消息相关信息封装成RemotingCommand数据包,其中请求码RequestCode为以下几种之一:

a.SEND_MESSAGE(普通发送消息)

b.SEND_MESSAGE_V2(优化网络数据包发送)c.SEND_BATCH_MESSAGE(消息批量发送)

(4)根据获取到的Broke代理服务器地址,将封装好的RemotingCommand数据包发送对应的Broker上,默认发送超时间为3s;

(5)这里,真正调用RocketMQ的Remoting通信模块完成消息发送是在MQClientAPIImpl实例sendMessageSync()方法中,代码具体如下:

private SendResult sendMessageSync(

final String addr,

final String brokerName,

final Message msg,

final long timeoutMillis,

final RemotingCommand request

) throws RemotingException, MQBrokerException, InterruptedException {

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);

assert response != null;

return this.processSendResponse(brokerName, msg, response);

}

(6)processSendResponse方法对发送正常和异常情况分别进行不同的处理并返回sendResult对象;

(7)发送返回后,调用updateFaultItem更新Broker代理服务器的可用时间;

(8)对于异常情况,且标志位—retryAnotherBrokerWhenNotStoreOK,设置为true时,在发送失败的时候,会选择换一个Broker;

在生产者发送完成消息后,客户端日志打印如下:

SendResult [sendStatus=SEND_OK, msgId=020003670EC418B4AAC208AD46930000, offsetMsgId=AC1415A200002A9F000000000000017A, messageQueue=MessageQueue [topic=TopicTest, brokerName=HQSKCJJIDRRD6KC, queueId=2], queueOffset=1]

3.3 Broker代理服务器的消息处理简析

Broker代理服务器中存在很多Processor业务处理器,用于处理不同类型的请求,其中一个或者多个Processor会共用一个业务处理器线程池。对于接收到消息,Broker会使用SendMessageProcessor这个业务处理器来处理。SendMessageProcessor会依次做以下处理:

(1)消息前置校验,包括broker是否可写、校验queueId是否超过指定大小、消息中的Topic路由信息是否存在,如果不存在就新建一个。这里与上文中“尝试获取TopicPublishInfo的路由信息”一节中介绍的内容对应。如果Topic路由信息不存在,则Broker端日志输出如下:

2018-06-14 17:17:24 INFO SendMessageThread_1 - receive

SendMessage request command, RemotingCommand [code=310,

language=JAVA, version=252, opaque=6, flag(B)=0,

remark=null, extFields={a=ProducerGroupName,

b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1528967815569,

h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD

46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]

2018-06-14 17:17:24 WARN SendMessageThread_1 -

the topic TopicTest not exist, producer: /172.20.21.162:62661

2018-06-14 17:17:24 INFO SendMessageThread_1 -

Create new topic by default topic:[TBW102] config:

[TopicConfig [topicName=TopicTest, readQueueNums=4,

writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG,

topicSysFlag=0, order=false]] producer:[172.20.21.162:62661]

Topic路由信息新建后,第二次消息发送后,Broker端日志输出如下:

2018-08-02 16:26:13 INFO SendMessageThread_1 -

receive SendMessage request command,

RemotingCommand [code=310, language=JAVA,

version=253, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest,

c=TBW102, d=4, e=2, f=0, g=1533198373524, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC20

8AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]

2018-08-02 16:26:13 INFO SendMessageThread_1 -

the msgInner's content is:MessageExt [queueId=2,

storeSize=0, queueOffset=0, sysFlag=0,

bornTimestamp=1533198373524,

bornHost=/172.20.21.162:53914, storeTimestamp=0, storeHost=/172.20.21.162:10911, msgId=null,

commitLogOffset=0, bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message

[topic=TopicTest, flag=0, properties={KEYS=OrderID188, UNIQ_KEY=020003670EC418B4AAC208AD46930000, WAIT=true,

TAGS=TagA}, body=11body's content is:Hello world]]

(2)构建MessageExtBrokerInner;

(3)调用“brokerController.getMessageStore().putMessage”

将MessageExtBrokerInner做落盘持久化处理;

(4)根据消息落盘结果(正常/异常情况),BrokerStatsManager做一些统计数据的更新,最后设置Response并返回;

四、总结

使用RocketMQ的客户端发送普通消息的流程大概到这里就分析完成。关于顺序消息、分布式事务消息等内容将在后续篇幅中陆续介绍,敬请期待。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Cult of the Amateur

The Cult of the Amateur

Andrew Keen / Crown Business / 2007-6-5 / USD 22.95

Amateur hour has arrived, and the audience is running the show In a hard-hitting and provocative polemic, Silicon Valley insider and pundit Andrew Keen exposes the grave consequences of today’s......一起来看看 《The Cult of the Amateur》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

在线进制转换器
在线进制转换器

各进制数互转换器

随机密码生成器
随机密码生成器

多种字符组合密码