内容简介:上次完成rocketMq的安装,并测试了发送和接收,这次简单的利用rocketMq的源码连接rocketMq集群。源码:https://github.com/limingios/netFuture/tree/master/jms
上次完成rocketMq的安装,并测试了发送和接收,这次简单的利用rocketMq的源码连接rocketMq集群。
源码:https://github.com/limingios/netFuture/tree/master/jms
(一)broker的properties配置文件
- 启动2m-2s-sync的rocket集群
-
配置文件
上节直接用了我提供的properties文件,并没有详细解释每个字段的含义这次详细说下。
- broker参数
| 参数名 | 默认值 | 描述 |
|---|---|---|
| listenPort | 10911 | broker的服务端口号,作为对producer和consumer使用服务的端口号 |
| namesrvAddr | null | namesrv的ip地址。格式: ip:port;ip:port |
| brokerIP1 | 本机IP | broker所在的机器ip,默认不用设置,如果机器有多个网卡,需要手动设置 |
| brokerName | 本机主机名 | 作用为一组master与slave通过brokerName是否相同来标示,通过brokerId来区分master还是slave brokerClusterName DefaultCluster 整个broker集群的名字,创建topic时需要指定。 |
| brokerId | 0 | 0:master 非0:slave |
| storePathCommitLog | $HOME/store/commitlog/ | commitLog存储路径 |
| storePathConsumerQueue | $HOME/store/consumequeue/ | 消费队列存储路径 |
| mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commitLog每个文件的大小,默认1G |
| deleteWhen | 4 | 删除文件时间点,默认凌晨 4点 |
| fileReservedTime | 72 | 文件保留时间,默认72小时. |
| brokerRole | ASYNC_MASTER | Broker 的角色ASYNC_MASTER 异步复制Master SYNC_MASTER 同步双写Master SLAVE |
| flushDiskType | ASYNC_FLUSH | 刷盘方式 ASYNC_FLUSH 异步刷盘 SYNC_FLUSH 同步刷盘 |
| defaultTopicQueueNums | 4 | 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。 |
| autoCreateTopicEnable | true | 是否自动创建topic。 |
| autoCreateSubscriptionGroup | true | 是否允许Broker自动创建订阅组,建议线下开启,线上关闭 |
| rejectTransactionMessage | false | 是否拒绝事务消息接入 |
| fetchNamesrvAddrByAddressServer | false | 是否从web服务器获取Name Server地址,针对大规模的Broker集群建议使用这种方式 |
| storePathIndex | $HOME/store/index | 消息索引存储路径 |
| storeCheckpoint | $HOME/store/checkpoint | checkpoint文件存储路径 |
| abortFile | $HOME/store/abort | abort文件存储路径 |
| maxTransferBytesOnMessageInMemory | 262144 | 单次Pull消息(内存)传输的最大字节数 |
| maxTransferCountOnMessageInMemory | 32 | 单次Pull消息(内存)传输的最大条数 |
| maxTransferBytesOnMessageInDisk | 65536 | 单次Pull消息(磁盘)传输的最大字节数 |
| maxTransferCountOnMessageInDisk | 8 | 单次Pull消息(磁盘)传输的最大条数 |
| messageIndexEnable | true | 是否开启消息索引功能 |
| messageIndexSafe | false | 是否提供安全的消息索引机制,索引保证不丢 |
| haMasterAddress | 在Slave上直接设置Master地址,默认从Name Server上自动获取,也可以手工强制配置 | |
| cleanFileForciblyEnable | true | 磁盘满、且无过期文件情况下 TRUE 表示强制删除文件,优先保证服务可用 FALSE 标记服务不可用,文件不删除 |
- Consumer
| 参数名 | 默认值 | 描述 |
|---|---|---|
| namesrvAddr | Name Server地址列表,多个NameServer地址用分号隔开 | |
| clientIP | 本机IP | 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
| instanceName | DEFAULT | 客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等) |
| clientCallbackExecutorThreads | 4 | 通信层异步回调线程数 |
| pollNameServerInteval | 30000 | 轮询Name Server间隔时间,单位毫秒 |
| heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
| persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
-
Producer参数
>Producer配置
| 参数名 | 默认值 | 描述 |
|---|---|---|
| producerGroup | DEFAULT_PRODUCER | Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组 |
| createTopicKey | TBW102 | 在发送消息时,自动创建服务器不存在的topic,需要指定Key。 |
| defaultTopicQueueNums | 4 | 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 |
| sendMsgTimeout | 10000 | 发送消息超时时间,单位毫秒 |
| compressMsgBodyOverHowmuch | 4096 | 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
| retryAnotherBrokerWhenNotStoreOK | FALSE | 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
| maxMessageSize | 131072 | 客户端限制的消息大小,超过报错,同时服务端也会限制 |
| transactionCheckListener | 事务消息回查监听器,如果发送事务消息,必须设置 | |
| checkThreadPoolMinSize | 1 | Broker回查Producer事务状态时,线程池大小 |
| checkThreadPoolMaxSize | Broker回查Producer事务状态时,线程池大小 | |
| checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
Push Consumer配置
| 参数名 | 默认值 | 描述 |
|---|---|---|
| consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
| messageModel | CLUSTERING | 消息模型,支持以下两种1、集群消费(CLSUTER)2、广播消费(BROADCASTING) |
| consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer启动后,默认从什么位置开始消费1、CONSUME_FROM_LAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息2、CONSUME_FROM_FIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍3、CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 |
| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
| subscription | {} | 订阅关系 |
| messageListener | 消息监听器 | |
| offsetStore | 消费进度存储 | |
| consumeThreadMin | 10 | 消费线程池数量 |
| consumeThreadMax | 20 | 消费线程池数量 |
| consumeConcurrentlyMaxSpan | 2000 | 单队列并行消费允许的最大跨度 |
| pullThresholdForQueue | 1000 | 拉消息本地队列缓存消息最大数 |
| pullInterval | 0 | 拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒 |
| consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
| pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
Pull Consumer配置
| 参数名 | 默认值 | 描述 |
|---|---|---|
| consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
| brokerSuspendMaxTimeMillis | 20000 | 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 |
| consumerTimeoutMillisWhenSuspend | 30000 | 长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒 |
| consumerPullTimeoutMillis | 10000 | 非长轮询,拉消息超时时间,单位毫秒 |
| messageModel | BROADCASTING | 消息模型,支持以下两种1、集群消费2、广播消费 |
| messageQueueListener | 监听队列变化 | |
| offsetStore | 消费进度存储 | |
| registerTopics | [] | 注册的topic集合 |
| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
-
Meesage数据结构
>Message数据结构各个字段都可以通过get、set方式访问,例如访问topic:
msg.getTopic();
msg.setTopic(“test”);
| 字段名 | 默认值 | 必填 | 说明 |
|---|---|---|---|
| Topic | null | true | 线下环境不需要申请,线上环境需要申请后才能使用 |
| Body | null | true | 二进制形式,序列化由应用决定,Producer与Consumer要协商好序列化形式。 |
| Tags | null | false | 类似于Gmail为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个tag,所以也可以类比为Notify的MessageType概念。 |
| Keys | null | false | 代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以再Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品ID等。 |
| Flag | 0 | false | 完全由应用来设置,RocketMQ不做敢于。 |
| DelayTimeLevel | 0 | false | 消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费。 |
| WaitStoreMsgOK | TRUE | false | 表示消息是否在服务器罗盘后才返回应答。 |
(二)源码测试
-
连接集群
> 修改这2个文件:Producer 和 Consumer
Producer
Consumer
-
部署rocketmq-console
>GitHub地址:https://github.com/apache/rocketmq-externals
添加192.168.89.100:9876;192.168.89.101:9876
(三)流程梳理
生产者流程
1. 生产者首先需要设置namesrv,或者指定其他方式更新namesrv。
2. 从namesrv获取topic的路由信息,路由信息包括broker以及Message Queue等信息,同时将路由信息保存在本地内存中,方便下次使用。
3. 从Message Queue列表中选择合适的Queue发送消息,实现负载均衡。
消费者流程
1. namesrv告诉消费者,他从broker中获取消息。
2. 获取完之后开始消费。
-
RocketMq高可用
>特点:master挂了之后角色不会做切换(slave不会成为master)(商用版本的这种情况不知) 然后master和slave需要制定。
| | 发送消息 | 存储消息 | 接受消息 |
| :——: | :——–: | :——–: | :——–: |
|停掉一个namesrv |不受影响 |不受影响 |不受影响|
|停全部的namesrv| 影响 |不受影响| 影响|
|停单个master broker |不受影响 |受影响(很小) |不影响|
|停全部master broker |影响 |影响 |影响|
|停全部salve broker |不影响| 不影响 |不影响|
恢复任意master broker| 不受影响 |受影响(很小)| 受影响(很小)|
-
Rocketmq文档参考
> 源码:jm下有文档
PS:说了rocketmq的概念的东西,下次重点说说rocketMq在双11是如何做到的抗压,我听过一次公开课,稍后总结下,分享给各位老铁。
>>原创文章,欢迎转载。转载请注明:转载自,谢谢!>>原文链接地址:上一篇:已是最新文章
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Build Your Own Web Site the Right Way Using HTML & CSS
Ian Lloyd / SitePoint / 2006-05-02 / USD 29.95
Build Your Own Website The Right Way Using HTML & CSS teaches web development from scratch, without assuming any previous knowledge of HTML, CSS or web development techniques. This book introduces you......一起来看看 《Build Your Own Web Site the Right Way Using HTML & CSS》 这本书的介绍吧!