内容简介:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
注
:以下分析均在rocketmq4.0.0-incubating源码上进行
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 从上一句可以看出:MQClientManager为Producer连接管理器,用户管理连接MQ的TCP客户端的连接
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
// 从上一句可以看出:找到对应的客户端通讯实例是通过一个clientId在factoryTable内存缓存中进行查询的
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
org.apache.rocketmq.client.ClientConfig#buildMQClientId
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
// 从以上方法可以看出:这个clientId信息中包含当前微服务的IP,当前模块实例名(默认通过changeInstanceNameToPID更改为进程ID值)和一个unitName
结论
: 经以上源码可以得到一个模块需要连接多个RocketMQ集群,则需要生产多个MQClientInstance,换言之,则需要在获取MQClientInstance时传递不同的ClientID即可。
由于 ClientID=localIP + instanceName + unitName ,所以只需要创建Producer对象时传入不同的instanceName或unitName值即可。
观点仅代表自己,期待你的留言。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Git多套配置的条件应用
- RxRetroHttp,为多套API请求适配而生
- android 一套代码多用 以及 多套代码用于一个项目
- 搞定 Spring Boot 多数据源(一):多套源策略
- JEECG 3.7.7 发布,增加多套主流 UI 代码生成器模板
- JEECG 3.7.7 闪电版本发布,增加多套主流UI代码生成器模板
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
WebKit技术内幕
朱永盛 / 电子工业出版社 / 2014-6 / 79.00元
《WebKit技术内幕》从炙手可热的HTML5 的基础知识入手,重点阐述目前应用最广的渲染引擎项目——WebKit。不仅着眼于系统描述WebKit 内部渲染HTML 网页的原理,并基于Chromium 的实现,阐明渲染引擎如何高效地利用硬件和最新技术,而且试图通过对原理的剖析,向读者传授实现高性能Web 前端开发所需的宝贵经验。 《WebKit技术内幕》首先从总体上描述WebKit 架构和组......一起来看看 《WebKit技术内幕》 这本书的介绍吧!
RGB HSV 转换
RGB HSV 互转工具
HEX CMYK 转换工具
HEX CMYK 互转工具