RocketMQ 主备同步

栏目: 后端 · 发布时间: 5年前

内容简介:介绍 RocketMQ 的主备同步机制 !RocketMQ 通过 Master-Slave 主备机制,来实现整个系统的高可用,具体表现在:我们在一台机器上搭建一个 Master 一个 Slave 的环境:

介绍 RocketMQ 的主备同步机制 !

(1) 简介

RocketMQ 通过 Master-Slave 主备机制,来实现整个系统的高可用,具体表现在:

  • Master 磁盘坏掉,Slave 依然保存了一份
  • Master 宕机,不影响消费者继续消费

(2) 搭建环境

我们在一台机器上搭建一个 Master 一个 Slave 的环境:

RocketMQ 主备同步

为了能够将 Master 和 Slave 搭建在同一台计算机上,我们除了需要将 Broker 的角色设置为 SLAVE ,还需要为其指定单独的 brokerIdstorePathRootDirstorePathCommitLog

# SLAVE 角色
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);

# 一个机器如果要启动多个 Broker,那么每个 Broker 的 store 根目录必须不同
messageStoreConfig.setStorePathRootDir(storePathRootDir);
# 一个机器如果要启动多个 Broker,那么每个 Broker 的 storePathCommitLog 根目录必须不同
messageStoreConfig.setStorePathCommitLog(storePathCommitLog);
# 设置 Slave 的 Master HA 地址
messageStoreConfig.setHaMasterAddress("localhost:10912");

# SLAVE 角色的 brokerId 必须大于 0
brokerConfig.setBrokerId(1);

注意 Slave 和 Master 的 brokerName 必须一致 ,即它们必须处于同一个 BrokerData 数据结构里面。实际上在做了如上的修改之后, Slave 和 Master 依旧不能同时运行在同一台机器上,因为 Slave 本身也可以称为 Master,接受来自其他 Slave 的请求,因此当运行 Slave 的时候,需要将 HAService 里面的启动 AcceptSocketService 运行的相关方法 注释掉

(3) 建立连接

当一个 Broker 在启动的时候,会调用 HAServicestart() 方法:

public class HAService {

    public void start() throws Exception {
        this.acceptSocketService.beginAccept();
        this.acceptSocketService.start();
        
        this.groupTransferService.start();
        this.haClient.start();
    }
    
}

AcceptSocketService 服务的功能是 Master 等待接受来自其它客户端 Slave 的连接,当成功建立连接后,会将这条连接 HAConnection 放入到 connectionList 连接列表里面。而 HAClient 服务的功能是 Slave 主动发起同其它 Master 的连接。

RocketMQ 主备同步

(4) 数据传输

当启动 HAService 之后,一旦 Master 发现和 Slave 不同步,那么Master 会自动开始同步消息到 Slave,无需其它的触发机制。

RocketMQ 主备同步

(5) 消息异步传输

如果 Master Broker 的角色是 ASYNC_MASTER ,那么消息等待从 Master 同步到 Slave 的方式是 异步传输 的方式。这意味当一条消息发送到 Master Broker 的时候,Master Broker 在存储完这条消息到本地之后,并不会等待消息同步到 Slave Broker 才返回。这种方式会缩短发送消息的响应时间。

(6) 消息同步传输

如果 Master Broker 的角色是 SYNC_MASTER ,那么消息等待从 Master 同步到 Slave 的方式是 同步传输 的方式。除此之外,进入同步方式还得满足另外两个条件:

  • 消息体的 PROPERTY_WAIT_STORE_MSG_OK 属性值为 true ,即这条消息允许等待
  • Slave 相比 Master 落下的同步进度不能超过 256MB
public class CommitLog {

    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();

            // 消息是否允许等待同步
            if (messageExt.isWaitStoreMsgOK()) {
                
                // Slave 是否没有落下 Master 太多
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    // 等待同步完成
                    // ...
                }
                
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

    }
    
}

其中 isSlaveOK 方法就是用来检测 Slave 和 Master 落下的同步进度是否太大的:

public class HAService {

    public boolean isSlaveOK(final long masterPutWhere) {
        boolean result = this.connectionCount.get() > 0;

        result =
            result

            && ((masterPutWhere - this.push2SlaveMaxOffset.get()) <
                this.defaultMessageStore
                .getMessageStoreConfig()
                .getHaSlaveFallbehindMax()); // 默认 256 * 1024 * 1024 = 256 MB
        
        return result;
    }
    
}

如果上面两个条件不满足的话,那么 Master 便不会再等待消息同步到 Slave 之后再返回,能尽早返回便尽早返回了。

消息等待是否同步到 Slave 是借助 CountDownLatch 来实现的。当消息需要等待的时候,便会构建一个 GroupCommitRequest ,每个请求在其内部都维护了一个 CountDownLatch ,然后通过调用 await(timeout) 方法来等待消息同步到 Slave 之后,或者超时之后自动返回。

public static class GroupCommitRequest {

    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    public void wakeupCustomer(final boolean flushOK) {
        this.flushOK = flushOK;
        this.countDownLatch.countDown();
    }

    public boolean waitForFlush(long timeout) {
        try {
            this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
            return this.flushOK;
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
            return false;
        }
    }
    
}

我们再重点来看几个循环体和唤醒点:

  • GroupTransferService 服务的 是否处理请求循环体唤醒点 :
class GroupTransferService extends ServiceThread {

    public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
        // ...
        // 放入请求,唤醒
        if (hasNotified.compareAndSet(false, true)) {
            waitPoint.countDown(); // notify
        }
    }

    public void run() {
        // 循环体
        while (!this.isStopped()) {
            try {
                // putRequest 会提前唤醒这句话
                this.waitForRunning(10);
                this.doWaitTransfer();
            } catch (Exception e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

    }

}
  • HAConnection是否进行消息传输循环体唤醒点
class WriteSocketService extends ServiceThread {

    @Override
    public void run() {
        // 循环体
        while (!this.isStopped()) {
            SelectMappedBufferResult selectResult =
                HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
            if (selectResult != null) {
                // 传输(写入)消息
            } else {
                // 等待 100 毫秒或者提前被唤醒
                HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
            }
        }
    }

}

public class CommitLog {

    public void handleHA(AppendMessageResult result,
                         PutMessageResult putMessageResult,
                         MessageExt messageExt) {
        GroupCommitRequest request =
            new GroupCommitRequest(result.getWroteOffset() +
                                   result.getWroteBytes());
        service.putRequest(request);
        // 提前唤醒 WriteSocketService
        service.getWaitNotifyObject().wakeupAll();
    }
    
}
  • Slave 汇报进度唤醒 GroupTransferService , 等待同步完成唤醒 GroupCommitRequestCountDownLatch :
class ReadSocketService extends ServiceThread {

    private boolean processReadEvent() {
        // 唤醒 GroupTransferService
        HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
    }
    
}

class GroupTransferService extends ServiceThread {

    // 被唤醒
    public void notifyTransferSome() {
        this.notifyTransferObject.wakeup();
    }

    private void doWaitTransfer() {
        for (CommitLog.GroupCommitRequest req : this.requestsRead) {
            boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();

            // 5 次重试
            for (int i = 0; !transferOK && i < 5; i++) {
                // 等待被唤醒或者超时
                this.notifyTransferObject.waitForRunning(1000);
                transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
            }

            // 唤醒 GroupCommitRequest 的 CountDownLatch
            req.wakeupCustomer(transferOK);
        }
    }
    
}

public static class GroupCommitRequest {

    // 被唤醒
    public void wakeupCustomer(final boolean flushOK) {
        this.flushOK = flushOK;
        this.countDownLatch.countDown();
    }

}

下图是上图一个完整的消息 唤醒链 :

RocketMQ 主备同步

(7) 主备消费

当消费者在消费的时候,如果 Master 突然宕机,那么消费者会自动切换到 Slave 机器上继续进行消费。

(8) 消费建议

RocketMQ 提供了自动从 Slave 读取老数据的功能。这个功能主要由 slaveReadEnable 这个参数控制。默认是关的(slaveReadEnable = false)。推荐把它打开,主从都要开。这个参数打开之后,在客户端消费数据时,会判断,当前读取消息的物理偏移量跟最新的位置的差值,是不是超过了内存容量的一个百分比(accessMessageInMemoryMaxRatio = 40 by default)。如果超过了,就会告诉客户端去备机上消费数据。如果采用异步主从,也就是 brokerRole 等于 ASYNC_AMSTER 的时候,你的备机 IO 打爆,其实影响不太大。但是如果你采用同步主从,那还是有影响。所以这个时候,最好挂两个备机。因为 RocketMQ 的主从同步复制, 只要一个备机响应 了确认写入就可以了,一台 IO 打爆,问题不大。

(9) 异常处理

Q: Master(Slave) 读取来自 Slave(Master) 的消息异常 ( IOExceptionread() 返回 -1 等) 的时候怎么处理?

A: 打印日志 + 关闭这条连接

Q: Master(Slave) 长时间没有收到来自 Slave(Master) 的进度汇报怎么处理?

A: 每次读取之后更新 lastReadTimestamp 或者 lastWriteTimestamp ,一旦发现在 haHousekeepingInterval 间隔内 (默认 20秒) 这个时间戳都没有改变的话,关闭这条连接

Q: Slave 检测到来自 Master 汇报的 本次传输偏移量 和本地的 传输偏移量 不同时怎么处理?

A: 打印日志 + 关闭这条连接

Q: Master 如何知道 Slave 是否 真正的存储 了刚才发送过去的消息?

A: Slave 存储完毕之后,通过向 Master 汇报进度来完成。相当于 TCP 的 ACK 机制。

Q: Master 宕掉

A: 无论 Maser 是主动关闭 Mater,还是 Master 因为异常而退出,Slave 都会每隔 5 秒重连一次 Master


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

深入解析Spring MVC与Web Flow

深入解析Spring MVC与Web Flow

Seth Ladd、Darren Davison、Steven Devijver、Colin Yates / 徐哲、沈艳 / 人民邮电出版社 / 2008-11 / 49.00元

《深入解析Spring MVCgn Web Flow》是Spring MVC 和Web Flow 两个框架的权威指南,书中包括的技巧和提示可以让你从这个灵活的框架中汲取尽可能多的信息。书中包含了一些开发良好设计和解耦的Web 应用程序的最佳实践,介绍了Spring 框架中的Spring MVC 和Spring Web Flow,以及着重介绍利用Spring 框架和Spring MVC 编写Web ......一起来看看 《深入解析Spring MVC与Web Flow》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

URL 编码/解码
URL 编码/解码

URL 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换