同一微服务连接多套RocketMQ集群

栏目: 后端 · 发布时间: 6年前

内容简介:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)

:以下分析均在rocketmq4.0.0-incubating源码上进行

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // 从上一句可以看出:MQClientManager为Producer连接管理器,用户管理连接MQ的TCP客户端的连接

            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "//
                + this.serviceState//
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);
    // 从上一句可以看出:找到对应的客户端通讯实例是通过一个clientId在factoryTable内存缓存中进行查询的

    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

org.apache.rocketmq.client.ClientConfig#buildMQClientId

public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.getClientIP());

    sb.append("@");
    sb.append(this.getInstanceName());
    if (!UtilAll.isBlank(this.unitName)) {
        sb.append("@");
        sb.append(this.unitName);
    }

    return sb.toString();
}
// 从以上方法可以看出:这个clientId信息中包含当前微服务的IP,当前模块实例名(默认通过changeInstanceNameToPID更改为进程ID值)和一个unitName

结论 : 经以上源码可以得到一个模块需要连接多个RocketMQ集群,则需要生产多个MQClientInstance,换言之,则需要在获取MQClientInstance时传递不同的ClientID即可。

由于 ClientID=localIP + instanceName + unitName ,所以只需要创建Producer对象时传入不同的instanceName或unitName值即可。

观点仅代表自己,期待你的留言。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

技术的本质

技术的本质

布莱恩•阿瑟(Brian Arthur) / 曹东溟、王健 / 浙江人民出版社 / 2014-4-1 / 62.90

★《技术的本质》是复杂性科学奠基人、首屈一指的技术思想家、“熊彼特奖”得主布莱恩•阿瑟所创建的一套关于技术产生和进化的系统性理论,本书是打开“技术黑箱”的钥匙,它用平实的语言将技术最本质的思想娓娓道来。 ★技术,是一个异常美丽的主题,它不动声色地创造了我们的财富,成就了经济的繁荣,改变了我们存在的方式。尽管技术如此重要,却少有人在快节奏的生活中停下来深入思考技术。我们了解技术的原理,却不知道......一起来看看 《技术的本质》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

随机密码生成器
随机密码生成器

多种字符组合密码

SHA 加密
SHA 加密

SHA 加密工具