内容简介:介绍 RocketMQ 的主备同步机制 !RocketMQ 通过 Master-Slave 主备机制,来实现整个系统的高可用,具体表现在:我们在一台机器上搭建一个 Master 一个 Slave 的环境:
介绍 RocketMQ 的主备同步机制 !
(1) 简介
RocketMQ 通过 Master-Slave 主备机制,来实现整个系统的高可用,具体表现在:
- Master 磁盘坏掉,Slave 依然保存了一份
- Master 宕机,不影响消费者继续消费
(2) 搭建环境
我们在一台机器上搭建一个 Master 一个 Slave 的环境:
为了能够将 Master 和 Slave 搭建在同一台计算机上,我们除了需要将 Broker 的角色设置为 SLAVE
,还需要为其指定单独的 brokerId
、 storePathRootDir
、 storePathCommitLog
。
# SLAVE 角色 messageStoreConfig.setBrokerRole(BrokerRole.SLAVE); # 一个机器如果要启动多个 Broker,那么每个 Broker 的 store 根目录必须不同 messageStoreConfig.setStorePathRootDir(storePathRootDir); # 一个机器如果要启动多个 Broker,那么每个 Broker 的 storePathCommitLog 根目录必须不同 messageStoreConfig.setStorePathCommitLog(storePathCommitLog); # 设置 Slave 的 Master HA 地址 messageStoreConfig.setHaMasterAddress("localhost:10912"); # SLAVE 角色的 brokerId 必须大于 0 brokerConfig.setBrokerId(1);
注意 Slave 和 Master 的 brokerName 必须一致 ,即它们必须处于同一个 BrokerData
数据结构里面。实际上在做了如上的修改之后, Slave 和 Master 依旧不能同时运行在同一台机器上,因为 Slave 本身也可以称为 Master,接受来自其他 Slave 的请求,因此当运行 Slave 的时候,需要将 HAService
里面的启动 AcceptSocketService
运行的相关方法 注释掉 。
(3) 建立连接
当一个 Broker 在启动的时候,会调用 HAService
的 start()
方法:
public class HAService { public void start() throws Exception { this.acceptSocketService.beginAccept(); this.acceptSocketService.start(); this.groupTransferService.start(); this.haClient.start(); } }
AcceptSocketService
服务的功能是 Master 等待接受来自其它客户端 Slave 的连接,当成功建立连接后,会将这条连接 HAConnection
放入到 connectionList
连接列表里面。而 HAClient
服务的功能是 Slave 主动发起同其它 Master 的连接。
(4) 数据传输
当启动 HAService
之后,一旦 Master 发现和 Slave 不同步,那么Master 会自动开始同步消息到 Slave,无需其它的触发机制。
(5) 消息异步传输
如果 Master Broker 的角色是 ASYNC_MASTER
,那么消息等待从 Master 同步到 Slave 的方式是 异步传输 的方式。这意味当一条消息发送到 Master Broker 的时候,Master Broker 在存储完这条消息到本地之后,并不会等待消息同步到 Slave Broker 才返回。这种方式会缩短发送消息的响应时间。
(6) 消息同步传输
如果 Master Broker 的角色是 SYNC_MASTER
,那么消息等待从 Master 同步到 Slave 的方式是 同步传输 的方式。除此之外,进入同步方式还得满足另外两个条件:
- 消息体的
PROPERTY_WAIT_STORE_MSG_OK
属性值为true
,即这条消息允许等待 - Slave 相比 Master 落下的同步进度不能超过
256MB
public class CommitLog { public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); // 消息是否允许等待同步 if (messageExt.isWaitStoreMsgOK()) { // Slave 是否没有落下 Master 太多 if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { // 等待同步完成 // ... } // Slave problem else { // Tell the producer, slave not available putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } } } }
其中 isSlaveOK
方法就是用来检测 Slave 和 Master 落下的同步进度是否太大的:
public class HAService { public boolean isSlaveOK(final long masterPutWhere) { boolean result = this.connectionCount.get() > 0; result = result && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore .getMessageStoreConfig() .getHaSlaveFallbehindMax()); // 默认 256 * 1024 * 1024 = 256 MB return result; } }
如果上面两个条件不满足的话,那么 Master 便不会再等待消息同步到 Slave 之后再返回,能尽早返回便尽早返回了。
消息等待是否同步到 Slave 是借助 CountDownLatch
来实现的。当消息需要等待的时候,便会构建一个 GroupCommitRequest
,每个请求在其内部都维护了一个 CountDownLatch
,然后通过调用 await(timeout)
方法来等待消息同步到 Slave 之后,或者超时之后自动返回。
public static class GroupCommitRequest { private final CountDownLatch countDownLatch = new CountDownLatch(1); public void wakeupCustomer(final boolean flushOK) { this.flushOK = flushOK; this.countDownLatch.countDown(); } public boolean waitForFlush(long timeout) { try { this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); return this.flushOK; } catch (InterruptedException e) { log.error("Interrupted", e); return false; } } }
我们再重点来看几个循环体和唤醒点:
-
GroupTransferService
服务的 是否处理请求 的 循环体 和 唤醒点 :
class GroupTransferService extends ServiceThread { public synchronized void putRequest(final CommitLog.GroupCommitRequest request) { // ... // 放入请求,唤醒 if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } } public void run() { // 循环体 while (!this.isStopped()) { try { // putRequest 会提前唤醒这句话 this.waitForRunning(10); this.doWaitTransfer(); } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); } } } }
-
HAConnection
的 是否进行消息传输 的 循环体 和 唤醒点 :
class WriteSocketService extends ServiceThread { @Override public void run() { // 循环体 while (!this.isStopped()) { SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); if (selectResult != null) { // 传输(写入)消息 } else { // 等待 100 毫秒或者提前被唤醒 HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100); } } } } public class CommitLog { public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); // 提前唤醒 WriteSocketService service.getWaitNotifyObject().wakeupAll(); } }
- Slave 汇报进度唤醒
GroupTransferService
, 等待同步完成唤醒GroupCommitRequest
的CountDownLatch
:
class ReadSocketService extends ServiceThread { private boolean processReadEvent() { // 唤醒 GroupTransferService HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); } } class GroupTransferService extends ServiceThread { // 被唤醒 public void notifyTransferSome() { this.notifyTransferObject.wakeup(); } private void doWaitTransfer() { for (CommitLog.GroupCommitRequest req : this.requestsRead) { boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); // 5 次重试 for (int i = 0; !transferOK && i < 5; i++) { // 等待被唤醒或者超时 this.notifyTransferObject.waitForRunning(1000); transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); } // 唤醒 GroupCommitRequest 的 CountDownLatch req.wakeupCustomer(transferOK); } } } public static class GroupCommitRequest { // 被唤醒 public void wakeupCustomer(final boolean flushOK) { this.flushOK = flushOK; this.countDownLatch.countDown(); } }
下图是上图一个完整的消息 唤醒链 :
(7) 主备消费
当消费者在消费的时候,如果 Master 突然宕机,那么消费者会自动切换到 Slave 机器上继续进行消费。
(8) 消费建议
RocketMQ 提供了自动从 Slave 读取老数据的功能。这个功能主要由 slaveReadEnable
这个参数控制。默认是关的(slaveReadEnable = false)。推荐把它打开,主从都要开。这个参数打开之后,在客户端消费数据时,会判断,当前读取消息的物理偏移量跟最新的位置的差值,是不是超过了内存容量的一个百分比(accessMessageInMemoryMaxRatio = 40 by default)。如果超过了,就会告诉客户端去备机上消费数据。如果采用异步主从,也就是 brokerRole 等于 ASYNC_AMSTER 的时候,你的备机 IO 打爆,其实影响不太大。但是如果你采用同步主从,那还是有影响。所以这个时候,最好挂两个备机。因为 RocketMQ 的主从同步复制, 只要一个备机响应 了确认写入就可以了,一台 IO 打爆,问题不大。
(9) 异常处理
Q: Master(Slave) 读取来自 Slave(Master) 的消息异常 ( IOException
、 read()
返回 -1
等) 的时候怎么处理?
A: 打印日志 + 关闭这条连接
Q: Master(Slave) 长时间没有收到来自 Slave(Master) 的进度汇报怎么处理?
A: 每次读取之后更新 lastReadTimestamp
或者 lastWriteTimestamp
,一旦发现在 haHousekeepingInterval
间隔内 (默认 20秒) 这个时间戳都没有改变的话,关闭这条连接
Q: Slave 检测到来自 Master 汇报的 本次传输偏移量 和本地的 传输偏移量 不同时怎么处理?
A: 打印日志 + 关闭这条连接
Q: Master 如何知道 Slave 是否 真正的存储 了刚才发送过去的消息?
A: Slave 存储完毕之后,通过向 Master 汇报进度来完成。相当于 TCP 的 ACK 机制。
Q: Master 宕掉
A: 无论 Maser 是主动关闭 Mater,还是 Master 因为异常而退出,Slave 都会每隔 5 秒重连一次 Master
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
你必须知道的495个C语言问题
Steve Summit / 孙云、朱群英 / 人民邮电出版社 / 2009-2 / 45.00元
“本书是Summit以及C FAQ在线列表的许多参与者多年心血的结晶,是C语言界最为珍贵的财富之一。我向所有C语言程序员推荐本书。” ——Francis Glassborow,著名C/C++专家,ACCU(C/C++用户协会)前主席 “本书清晰阐明了Kernighan与Ritchie《The C programming Language》一书中许多简略的地方,而且精彩地总结了C语言编程......一起来看看 《你必须知道的495个C语言问题》 这本书的介绍吧!