内容简介:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
注
:以下分析均在rocketmq4.0.0-incubating源码上进行
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 从上一句可以看出:MQClientManager为Producer连接管理器,用户管理连接MQ的TCP客户端的连接
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
// 从上一句可以看出:找到对应的客户端通讯实例是通过一个clientId在factoryTable内存缓存中进行查询的
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
org.apache.rocketmq.client.ClientConfig#buildMQClientId
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
// 从以上方法可以看出:这个clientId信息中包含当前微服务的IP,当前模块实例名(默认通过changeInstanceNameToPID更改为进程ID值)和一个unitName
结论
: 经以上源码可以得到一个模块需要连接多个RocketMQ集群,则需要生产多个MQClientInstance,换言之,则需要在获取MQClientInstance时传递不同的ClientID即可。
由于 ClientID=localIP + instanceName + unitName ,所以只需要创建Producer对象时传入不同的instanceName或unitName值即可。
观点仅代表自己,期待你的留言。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Git多套配置的条件应用
- RxRetroHttp,为多套API请求适配而生
- android 一套代码多用 以及 多套代码用于一个项目
- 搞定 Spring Boot 多数据源(一):多套源策略
- JEECG 3.7.7 发布,增加多套主流 UI 代码生成器模板
- JEECG 3.7.7 闪电版本发布,增加多套主流UI代码生成器模板
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Blockchain Basics
Daniel Drescher / Apress / 2017-3-16 / USD 20.99
In 25 concise steps, you will learn the basics of blockchain technology. No mathematical formulas, program code, or computer science jargon are used. No previous knowledge in computer science, mathema......一起来看看 《Blockchain Basics》 这本书的介绍吧!