ZooKeeper - Data Sync

栏目: 编程工具 · 发布时间: 7年前

内容简介:ZooKeeper - Data Sync

以下描述中用 zk 代指 ZooKeeper ,源码解释均基于 ZooKeeper 3.4.6

写在前面

好吧,这又是一篇:

不加全局理论抽象,局部解读具体逻辑细节的 文章

然而,我目前的能力只能解决这些非常具体的问题。其实总结出来的文章并不能够给其它同学带来什么收益,真的想懂细节,确实只有 “自己研读或者谷歌”

但是,我读完了代码,我会想:

  • 我能不能帮忙别人来加快理解这个问题呢?
  • 我这个理解是不是正确的呢,能不能 publish 出来让大家拍一拍呢?
  • 想给自己一个交代。我始终相信,写文章才能把头脑中可能忽略的细节都收集起来,才能发现思考时会遗漏的问题

当然,我非常认可需要做 全局理论抽象 ,这是我目前的 恐慌区 ,需要努力去跨越,自勉。

背景

继上次的 Session 问题 后,好学的 小冷 同学又认真地研究了下 ZooKeeper Cluster 的原理,问了我下面 5 个问题:

  • ZooKeeper 集群在发生 Leader 切换的时候,所有的 Follower 会选择新的 Leader 进行全量的数据同步吗?
  • 如果一次写入,由于丢包,导致某条日志没有写入,会怎么样呢?
  • 扩容的时候加了个节点,这时候新的写入会不会同步到这个新节点呢?
  • Follower 会主动向 Leaderping 包么?
  • 集群在选举的时候, 四字命令 都会返回 This ZooKeeper instance is not currently serving requests ,什么时候会变回正常可服务状态?

这几个问题我都不太确定,于是,踏上了新一轮的 啃码之旅

问题A

ZooKeeper 集群在发生 Leader 切换的时候,所有的 Follower 会选择新的 Leader 进行全量的数据同步吗?

这篇 帖子 里面的解释是我比较认同:

1、SNAP-全量同步

条件:peerLastZxid<minCommittedLog

说明:证明二者数据差异太大,follower数据过于陈旧,leader发送快照SNAP指令给follower全量同步数据,即leader将所有数据全量同步到follower

2、DIFF-增量同步

条件:minCommittedLog<=peerLastZxid<=maxCommittedLog

说明:证明二者数据差异不大,follower上有一些leader上已经提交的提议proposal未同步,此时需要增量提交这些提议即可

3、TRUNC-仅回滚同步

条件:peerLastZxid>minCommittedLog

说明:证明follower上有些提议proposal并未在leader上提交,follower需要回滚到zxid为minCommittedLog对应的事务操作

4、TRUNC+DIFF-回滚+增量同步

条件:minCommittedLog<=peerLastZxid<=maxCommittedLog且特殊场景leader a已经将事务truncA提交到本地事务日志中,但没有成功发起proposal协议进行投票就宕机了;然后集群中剔除原leader a重新选举出新leader b,又提交了若干新的提议proposal,然后原leader a重新服务又加入到集群中,不管是否被选举为新leader。

说明:此时a,b都有一些对方未提交的事务,若b是leader, a需要先回滚truncA然后增量同步新leader a上的数据

对应的代码在 LearnerHandler.run 中:

// packetToSend默认为Leader.SNAP
try {
    rl.lock();
    final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
    final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
    LOG.info("Synchronizing with Follower sid: " + sid
            +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
            +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
            +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));

    // ZKDatabase.committedLog是仅存在于内存中的结构,虽然Follower或者Leader退出时,
    // 会调用ZKDatabase.clear清空内存中的数据,但是FastLeaderElection.getInitLastLoggedZxid
    // 最终会调用ZKDatabase.loadDataBase重新加载数据到ZKDatabase.committedLog等内存结构中
    // 当然,ZKDatabase.loadDataBase是把日志文件中的内容加载加这个结构中
    // 如果加载之前,对应的zk节点刚好是新生成的snapshot,就会导致ZKDatabase.committedLog为空
    LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();

    if (proposals.size() != 0) {
        LOG.debug("proposal size is {}", proposals.size());
        if ((maxCommittedLog >= peerLastZxid)
                && (minCommittedLog <= peerLastZxid)) {
            // 如果Follower的zxid在[minCommittedLog, maxCommittedLog],说明可能可以通过重新应用Follower上没有的事务日志来恢复数据    
            LOG.debug("Sending proposals to follower");

            // as we look through proposals, this variable keeps track of previous
            // proposal Id.
            long prevProposalZxid = minCommittedLog;

            // Keep track of whether we are about to send the first packet.
            // Before sending the first packet, we have to tell the learner
            // whether to expect a trunc or a diff
            boolean firstPacket=true;

            // If we are here, we can use committedLog to sync with
            // follower. Then we only need to decide whether to
            // send trunc or not
            packetToSend = Leader.DIFF;
            zxidToSend = maxCommittedLog;

            for (Proposal propose: proposals) {
                // 遍历ZKDatabase.committedLog中的事务日志,跳过那些已经在Follower上应用过的日志     
                // skip the proposals the peer already has
                if (propose.packet.getZxid() <= peerLastZxid) {
                    prevProposalZxid = propose.packet.getZxid();
                    continue;
                } else {
                    // If we are sending the first packet, figure out whether to trunc
                    // in case the follower has some proposals that the leader doesn't
                    if (firstPacket) {
                        firstPacket = false;
                        // Does the peer have some proposals that the leader hasn't seen yet
                        if (prevProposalZxid < peerLastZxid) {
                            // send a trunc message before sending the diff
                            // 如果出现这种情况,我们需要先在Follower上应用Leader.TRUNC,让其回滚到prevProposalZxid的位置                
                            packetToSend = Leader.TRUNC;
                            zxidToSend = prevProposalZxid;
                            updates = zxidToSend;
                        }
                    }
                    // 下面这几行语句是针对Follower上没有的事务日志构造出PROPOSAL和COMMIT请求,放到队列中
                    // 可以注意到,这个时候LearnerHandler的发送线程是还没有启动的,所以对应的FOLLOWER肯定是先响应Leader.DIFF或者Leader.TRUNC请求的
                    queuePacket(propose.packet);
                    QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                            null, null);
                    queuePacket(qcommit);
                }
            }
        } else if (peerLastZxid > maxCommittedLog) {
            // 如果Follower的zxid比当前的Leader还要大,发送Leader.TRUNC让Follower的事务日志回滚到当前Leader的maxCommittedLog
            // 和Leader.SNAP一样,Leader.TRUNC会导致对应的Follower调用ZKDatabase.loadDataBase重新加载数据到内存中
            LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
                    Long.toHexString(maxCommittedLog),
                    Long.toHexString(updates));

            packetToSend = Leader.TRUNC;
            zxidToSend = maxCommittedLog;
            updates = zxidToSend;
        } else {
            // 进入这块代码的条件是peerLastZxid<minCommittedLog,使用默认的packetToSend:Leader.SNAP
            // 这个条件有一个case:在zk集群中加入一个新的节点,这时候如果Leader        的事务日志中有内容,就会进入这个情况
            LOG.warn("Unhandled proposal scenario");
        }
    } else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
        // 如果ZKDatabase.committedLog为空,而且对应的Follower和当前Leader内存中的数据是一致的,只发送一个Leader.DIFF的请求
        // 而不进行数据的同步,甚至也没有对应的PROPOSAL/COMMIT请求
        // The leader may recently take a snapshot, so the committedLog
        // is empty. We don't need to send snapshot if the follow
        // is already sync with in-memory db.
        LOG.debug("committedLog is empty but leader and follower "
                + "are in sync, zxid=0x{}",
                Long.toHexString(peerLastZxid));
        packetToSend = Leader.DIFF;
        zxidToSend = peerLastZxid;
    } else {
        // 如果ZKDatabase.committedLog为空,但是对应的Follower和当前Leader内存中的数据不一致
        // 这时候会使用默认的packetToSend:Leader.SNAP
        // just let the state transfer happen
        LOG.debug("proposals is empty");
    }

    LOG.info("Sending " + Leader.getPacketType(packetToSend));
    leaderLastZxid = leader.startForwarding(this, updates);

} finally {
    rl.unlock();
}

问题B

如果一次写入,由于丢包,导致某条日志没有写入,会怎么样呢?

先引入上次的 Session 问题 中使用过的一张图:

ZooKeeper - Data Sync

Leader 会向 Follower 发送 ProposalCommit 请求,其中, Follower 收到 Proposal 请求之后,会写入日志;收到 Commit 请求之后,会更改内存中的 DataTree

问题中提到的 日志没有写入 ,也就是发送到某个 FollowerProposal 请求被丢弃了, 对应的 Follower 会是怎样一个逻辑呢?

回答这个问题,最直观的方法就是模拟场景,在实验中将 Proposal 请求丢弃,观察对应的 FollowerLeader 的表现。

那么,问题来了,如何模拟呢?

Jepsen 中,使用的是 iptables 来模拟网络故障(周期性地丢包、网络分区等)。在我的模拟环境中, zk 集群都是部署在本地,使用 iptables 来操作会比较繁琐,而且我的需求是精确地丢弃掉 Proposal 请求,而不是 FollowerLeader 之间发送的所有的请求。

最终还是使用了 Byteman ,对应的脚本如下:

RULE trace zk.skip_proposal_packet
CLASS org.apache.zookeeper.server.quorum.Follower
METHOD processPacket
AT ENTRY
IF $1.getType() == 2
DO
  traceln("*** drop PROPOSAL packet");
  return;
ENDRULE

使用 Byteman 做故障场景模拟并不是我的原创, Cassandra 中使用了 Byteman 来做故障场景注入。这种方法的优点是可以完成代码级别在精确错误注入;缺点也很明显,需要待注入服务是运行在 JVM 之上的。

进行了场景模拟之后,发现被测的丢弃 Proposal 请求的 Follower 进入了 LOOKING 状态,然后重新加入了集群。原因是 Leader 主动断开了和 Proposal 的连接。

那么,为什么 Follower 丢弃 Proposal 请求会导致 Leader 主动断开了和 Proposal 的连接呢?

这个逻辑和 LearnerHandler$SyncLimitCheck 有关, Leader 会定时去调用 LearnerHandler.pingFollower 发送 Leader.PING 请求,逻辑如下:

public void ping() {
    long id;
    if (syncLimitCheck.check(System.nanoTime())) {
        synchronized(leader) {
            id = leader.lastProposed;
        }
        QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
        queuePacket(ping);
    } else {
        LOG.warn("Closing connection to peer due to transaction timeout.");
        shutdown();
    }
}

如果 Leader 发送出去的 Leader.PROPOSAL 请求在一段时间内(这个时间由 conf/zoo.cfg 中的 syncLimit 决定)没有收到对应的 ACK ,就会导致 syncLimitCheck.check 失败,从而调用 LearnerHandler.shutdown 关闭到这个 Follower 的连接,并停止对应的发送、接收请求的线程。

Follower 这边,由于 Leader 连接关闭,调用 Learner.readPacket 时会抛出异常,退出 Follower.followLeader 方法,重新进入 LOOKING 状态。

综上,我们知道了,发送到某个 FollowerProposal 请求被丢弃,会导致对应的 Follower 重新进入 LOOKING 状态。

那么,如果被丢弃的请求是 Commit 请求呢?

同样使用 Byteman 进行了模拟,由于 Commit 请求是不需要返回 ACKLeader 的,所以,如果模拟时有两个写入请求 ReqAReqB ,如果两个请求对应的 Commit 请求都丢弃了,这个时候其实对系统并没有什么影响,但是连接到对应 Follower 上的客户端看到的数据就是 stale 的。

如果丢弃 ReqA 对应的 Commit 请求之后就撤销故障场景, ReqB 对应的 Commit 请求正常执行会是什么情况呢?对应的逻辑在 FollowerZooKeeperServer.commit 中:

// 收到Leader发送的Leader.COMMIT请求之后,会调用这个方法
public void commit(long zxid) {
    // FollowerZooKeeperServer.logRequest中会添加条目到pendingTxns中
    // 这个field保存了所有调用了syncProcessor.processRequest,但是没有收到
    // Commit请求的Request
    if (pendingTxns.size() == 0) {
        LOG.warn("Committing " + Long.toHexString(zxid)
                + " without seeing txn");
        return;
    }
    // 如果收到的Commit请求中的zxid和pendingTxns中第一个请求的zxid是不一样的
    // 打印Error级别的日志并退出 java 进程
    // 在我们的模拟情况中,pendingTxns中第一个请求是ReqA
    // 所以当ReqB对应的Commit请求被Follower收到时,会进入到这个逻辑
    long firstElementZxid = pendingTxns.element().zxid;
    if (firstElementZxid != zxid) {
        LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
                + " but next pending txn 0x"
                + Long.toHexString(firstElementZxid));
        System.exit(12);
    }
    // 如果是匹配的,就从pendingTxns中移除这个请求
    // 并执行commitProcessor.commit
    Request request = pendingTxns.remove();
    commitProcessor.commit(request);
}

是的,对应的 Follower 进程退出了。

问题C

扩容的时候加了个节点,这时候新的写入会不会同步到这个新节点呢?

会的。虽然这时候 Leaderconf/zoo.cfg 里面还没有新加入节点的信息,但是 Leader 会为这个节点创建相应的 LearnerHandler ,对应的逻辑在 Leader$LearnerCnxAcceptor.run 中:

Socket s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
LearnerHandler fh = new LearnerHandler(s, Leader.this);
fh.start();

新加入的节点也会经历如下的阶段:

ZooKeeper - Data Sync

上图中的步骤是在 Learner.registerWithLeaderLearner.syncWithLeader 中完成的,也是新加入节点从 Leader 中同步数据的步骤。再看看 Leader 是如何把增量的数据同步到 Follower 的。

LeaderFollower 发送请求的方法 Leader.sendPacket 实现如下:

void sendPacket(QuorumPacket qp) {
    synchronized (forwardingFollowers) {
        for (LearnerHandler f : forwardingFollowers) {
            f.queuePacket(qp);
        }
    }
}

新加入节点的 LearnerHandler 是在 LearnerHandler.run 中通过调用 Leader.startForwarding 加入到 Leader.forwardingFollowers 中的,加入之后 Leader 就会开始同步数据到新的节点了。

那么,在计算 QuorumVerifier.containsQuorum 的时候,会涉及到新加入的节点么?

答案是,某种程度上,会。

验证 Ping 或者 Proposal 是否达到大多数的逻辑是在 QuorumPeer.quorumConfig 中实现的。

QuorumPeer.quorumConfig 这个 field 对应的类型是 QuorumVerifier 这个 接口 ,这个 接口 有两个实现: QuorumMajQuorumHierarchical ,我们的部署比较简单,没有 weight 相关的配置,所以使用的实现都是 QuorumMaj

QuorumMaj 中有一个 fieldhalf ,标志着这个集群里面半数的值(对于 3节点 的集群, half1 ;对于 4节点 的集群, half2 ;依此类推);对应的 containsQuorum 方法实现也非常简单粗暴:

public boolean containsQuorum(HashSet<Long> set){
    return (set.size() > half);
}

由于 QuorumPeer.setQuorumVerifier 只有在节点启动的时候才会被调用,所以 QuorumMaj.half 的值在节点启动之后就不会改变。

如果在一个 3节点zk0zk1zk2 )的集群中,扩容一个新节点 zk3 。在没有启动集群原有 3节点 的情况下, Leader 中的 QuorumMaj.half 会一直为 1 ,只是这时候, containsQuorum 方法的输入可能是一个大小为 4 的集合。

可以理解为 Leader 中判断 Proposal 是否达到大多数的标准是没有变化的,但是输入产生了变化。

问题D

Follower 会主动向 Leaderping 包么?

不会, FollowerLeader 发的 Leader.PING 包只是 response ,并没有线程会定期向 Leader 来发。那么,这时候会有个问题,如果 Follower 长时间没有收到 Leader 发的 Leader.PING 请求会怎么样呢?

依然是使用 Byteman Leader 发送给 FollowerLeader.PING 请求给丢弃掉,对应的脚本如下:

RULE trace zk.skip_ping
CLASS org.apache.zookeeper.server.quorum.LearnerHandler
METHOD ping
AT ENTRY
IF $0.sid == 2
DO
  traceln("*** drop ping packet to sid: 2");
  return;
ENDRULE

可以看到对应的 Follower 会不断进入 LOOKING 状态,连上 Leader 之后相隔 10s 就会有如下日志,错误为 Read timed out

2017-06-12 15:19:34,963 [myid:2] - INFO - Snapshotting: 0x100000000 to data/version-2/snapshot.100000000
2017-06-12 15:19:44,973 [myid:2] - WARN - Exception when following the leader
Exception when following the leader
java.net.SocketTimeoutException: Read timed out
  ...
  at org.apache.zookeeper.server.quorum.Learner.readPacket(Learner.java:153)
  at org.apache.zookeeper.server.quorum.Follower.followLeader(Follower.java:85)
  at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:786)

为什么是 10s 这个时间呢?

原因在于 Leader.syncWithLeader 这个方法中,在收到 Leader.UPTODATE 后,会调用:

sock.setSoTimeout(self.tickTime * self.syncLimit);

conf/zoo.cfg 中,我们目前的配置为:

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5

因此,和 Leader 建立的 socket 的读写超时时间为 2000ms * 5 = 10s

不止 Follower 会超时断连, Leader 的日志显示 Leader 也会出现读超时:

2017-06-12 15:31:15,988 [myid:1] - INFO - Received NEWLEADER-ACK message from 2
2017-06-12 15:31:25,994 [myid:1] - ERROR - Unexpected exception causing shutdown while sock still open
java.net.SocketTimeoutException: Read timed out
  ...
  at org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:103)
  at org.apache.zookeeper.server.quorum.LearnerHandler.run(LearnerHandler.java:546)

然后,关闭对应的 LearnerHandler

问题E

集群在选举的时候, 四字命令 都会返回 This ZooKeeper instance is not currently serving requests ,什么时候会变回正常可服务状态?

先看一下 四字命令 返回 ZK_NOT_SERVING 的逻辑,以 mntr 这个四字命令为例,对应的代码在 NIOServerCnxn$MonitorCommand.commandRun 中:

private class MonitorCommand extends CommandThread {

    MonitorCommand(PrintWriter pw) {
        super(pw);
    }

    @Override
    public void commandRun() {
        if(zkServer == null) {
            pw.println(ZK_NOT_SERVING);
            return;
        }
        ...
    }
    ...
}

NIOServerCnxn 这个对象是每个连接都会创建一个的,创建 NIOServerCnxn 对象的逻辑在 NIOServerCnxnFactory.createConnection 中:

protected NIOServerCnxn createConnection(SocketChannel sock,
        SelectionKey sk) throws IOException {
    return new NIOServerCnxn(zkServer, sock, sk, this);
}

NIOServerCnxn 构造方法参数里面使用的是 ServerCnxnFactory.zkServer ,那么, ServerCnxnFactory.zkServer 是在什么时候设置的呢?分别看下 FollowerLeader 中对应的逻辑。

Follower

Follower 会在收到 Leader 发送的 Leader.UPTODATE 之后去设置 ,对应的逻辑在 Learner.syncWithLeader 中,调用 ServerCnxnFactory.setZooKeeperServer 来设置 ServerCnxnFactory.zkServer

case Leader.UPTODATE:
    if (!snapshotTaken) { // true for the pre v1.0 case
        zk.takeSnapshot();
        self.setCurrentEpoch(newEpoch);
    }
    self.cnxnFactory.setZooKeeperServer(zk);
    break outerLoop;

Leader

Leader 会在 Leader.leaderwaitForNewLeaderAck 之后去设置 ,对应的逻辑如下:

try {
    waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
} catch (InterruptedException e) {
    shutdown("Waiting for a quorum of followers, only synced with sids: [ "
            + getSidSetString(newLeaderProposal.ackSet) + " ]");
    HashSet<Long> followerSet = new HashSet<Long>();
    for (LearnerHandler f : learners)
        followerSet.add(f.getSid());

    if (self.getQuorumVerifier().containsQuorum(followerSet)) {
        LOG.warn("Enough followers present. "
                + "Perhaps the initTicks need to be increased.");
    }
    Thread.sleep(self.tickTime);
    self.tick++;
    return;
}

startZkServer();
...
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
    self.cnxnFactory.setZooKeeperServer(zk);
}

代码里面可以看到,调用 ServerCnxnFactory.setZooKeeperServer 的前提是 zookeeper.leaderServes 这个属性是设置为 true 的(默认为 true )。这个属性的含义是, Leader 节点是否接受客户端的请求。

总结

其实这些细节在实际维护中应用到的比较少,维护中会遇到的问题可能是“我的 snap 太多了,应该怎么清理”、“我想给某个包加个自定义的日志级别怎么办”、“我的 Curator 报这个错是什么意思”。虽然如此,我比较认可的观点仍然是:

维护开源产品不了解源码,或者没有找到看的有效入口,是很被动的,缺少定位解决问题的根本手段

有了源码定位以及相关 工具 的经验,遇到问题才不会轻易 、不会轻易 炸毛 、不会轻易 甩锅 。当这些代码不再是坨翔而是 My precious 时,解决问题就变成一种愉悦的体验了。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

金融数量分析

金融数量分析

郑志勇 / 北京航空航天大学出版社 / 2014-7-1 / CNY 58.00

《金融数量分析——基于MATLAB编程(第3版)》一书中的案例均来源于作者的工作实际,并充分体现“案例的实用性、程序的可模仿性”,程序中附有详细的注释。例如,投资组合管理、KMV模型计算、期权定价模型与数值方法、风险价值VaR的计算等案例程序,读者可以直接使用或根据需要在源代码的基础上修改、完善。 本书共23章。前两章分别对金融市场的基本概况与MATLAB的基础知识进行概述;接下来为20个金......一起来看看 《金融数量分析》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具