内容简介:ZooKeeper - Data Sync
以下描述中用 zk
代指 ZooKeeper
,源码解释均基于 ZooKeeper 3.4.6
写在前面
好吧,这又是一篇:
不加全局理论抽象,局部解读具体逻辑细节的 文章
然而,我目前的能力只能解决这些非常具体的问题。其实总结出来的文章并不能够给其它同学带来什么收益,真的想懂细节,确实只有 “自己研读或者谷歌”
。
但是,我读完了代码,我会想:
- 我能不能帮忙别人来加快理解这个问题呢?
- 我这个理解是不是正确的呢,能不能
publish
出来让大家拍一拍呢? - 想给自己一个交代。我始终相信,写文章才能把头脑中可能忽略的细节都收集起来,才能发现思考时会遗漏的问题
当然,我非常认可需要做 全局理论抽象
,这是我目前的 恐慌区
,需要努力去跨越,自勉。
背景
继上次的 Session
问题 后,好学的 小冷
同学又认真地研究了下 ZooKeeper Cluster
的原理,问了我下面 5
个问题:
-
ZooKeeper
集群在发生Leader
切换的时候,所有的Follower
会选择新的Leader
进行全量的数据同步吗? - 如果一次写入,由于丢包,导致某条日志没有写入,会怎么样呢?
- 扩容的时候加了个节点,这时候新的写入会不会同步到这个新节点呢?
-
Follower
会主动向Leader
发ping
包么? - 集群在选举的时候, 四字命令 都会返回
This ZooKeeper instance is not currently serving requests
,什么时候会变回正常可服务状态?
这几个问题我都不太确定,于是,踏上了新一轮的 啃码之旅
。
问题A
ZooKeeper
集群在发生 Leader
切换的时候,所有的 Follower
会选择新的 Leader
进行全量的数据同步吗?
这篇 帖子 里面的解释是我比较认同:
1、SNAP-全量同步
条件:peerLastZxid<minCommittedLog
说明:证明二者数据差异太大,follower数据过于陈旧,leader发送快照SNAP指令给follower全量同步数据,即leader将所有数据全量同步到follower
2、DIFF-增量同步
条件:minCommittedLog<=peerLastZxid<=maxCommittedLog
说明:证明二者数据差异不大,follower上有一些leader上已经提交的提议proposal未同步,此时需要增量提交这些提议即可
3、TRUNC-仅回滚同步
条件:peerLastZxid>minCommittedLog
说明:证明follower上有些提议proposal并未在leader上提交,follower需要回滚到zxid为minCommittedLog对应的事务操作
4、TRUNC+DIFF-回滚+增量同步
条件:minCommittedLog<=peerLastZxid<=maxCommittedLog且特殊场景leader a已经将事务truncA提交到本地事务日志中,但没有成功发起proposal协议进行投票就宕机了;然后集群中剔除原leader a重新选举出新leader b,又提交了若干新的提议proposal,然后原leader a重新服务又加入到集群中,不管是否被选举为新leader。
说明:此时a,b都有一些对方未提交的事务,若b是leader, a需要先回滚truncA然后增量同步新leader a上的数据
对应的代码在 LearnerHandler.run
中:
// packetToSend默认为Leader.SNAP try { rl.lock(); final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog(); final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog(); LOG.info("Synchronizing with Follower sid: " + sid +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog) +" minCommittedLog=0x"+Long.toHexString(minCommittedLog) +" peerLastZxid=0x"+Long.toHexString(peerLastZxid)); // ZKDatabase.committedLog是仅存在于内存中的结构,虽然Follower或者Leader退出时, // 会调用ZKDatabase.clear清空内存中的数据,但是FastLeaderElection.getInitLastLoggedZxid // 最终会调用ZKDatabase.loadDataBase重新加载数据到ZKDatabase.committedLog等内存结构中 // 当然,ZKDatabase.loadDataBase是把日志文件中的内容加载加这个结构中 // 如果加载之前,对应的zk节点刚好是新生成的snapshot,就会导致ZKDatabase.committedLog为空 LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog(); if (proposals.size() != 0) { LOG.debug("proposal size is {}", proposals.size()); if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { // 如果Follower的zxid在[minCommittedLog, maxCommittedLog],说明可能可以通过重新应用Follower上没有的事务日志来恢复数据 LOG.debug("Sending proposals to follower"); // as we look through proposals, this variable keeps track of previous // proposal Id. long prevProposalZxid = minCommittedLog; // Keep track of whether we are about to send the first packet. // Before sending the first packet, we have to tell the learner // whether to expect a trunc or a diff boolean firstPacket=true; // If we are here, we can use committedLog to sync with // follower. Then we only need to decide whether to // send trunc or not packetToSend = Leader.DIFF; zxidToSend = maxCommittedLog; for (Proposal propose: proposals) { // 遍历ZKDatabase.committedLog中的事务日志,跳过那些已经在Follower上应用过的日志 // skip the proposals the peer already has if (propose.packet.getZxid() <= peerLastZxid) { prevProposalZxid = propose.packet.getZxid(); continue; } else { // If we are sending the first packet, figure out whether to trunc // in case the follower has some proposals that the leader doesn't if (firstPacket) { firstPacket = false; // Does the peer have some proposals that the leader hasn't seen yet if (prevProposalZxid < peerLastZxid) { // send a trunc message before sending the diff // 如果出现这种情况,我们需要先在Follower上应用Leader.TRUNC,让其回滚到prevProposalZxid的位置 packetToSend = Leader.TRUNC; zxidToSend = prevProposalZxid; updates = zxidToSend; } } // 下面这几行语句是针对Follower上没有的事务日志构造出PROPOSAL和COMMIT请求,放到队列中 // 可以注意到,这个时候LearnerHandler的发送线程是还没有启动的,所以对应的FOLLOWER肯定是先响应Leader.DIFF或者Leader.TRUNC请求的 queuePacket(propose.packet); QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(), null, null); queuePacket(qcommit); } } } else if (peerLastZxid > maxCommittedLog) { // 如果Follower的zxid比当前的Leader还要大,发送Leader.TRUNC让Follower的事务日志回滚到当前Leader的maxCommittedLog // 和Leader.SNAP一样,Leader.TRUNC会导致对应的Follower调用ZKDatabase.loadDataBase重新加载数据到内存中 LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}", Long.toHexString(maxCommittedLog), Long.toHexString(updates)); packetToSend = Leader.TRUNC; zxidToSend = maxCommittedLog; updates = zxidToSend; } else { // 进入这块代码的条件是peerLastZxid<minCommittedLog,使用默认的packetToSend:Leader.SNAP // 这个条件有一个case:在zk集群中加入一个新的节点,这时候如果Leader 的事务日志中有内容,就会进入这个情况 LOG.warn("Unhandled proposal scenario"); } } else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) { // 如果ZKDatabase.committedLog为空,而且对应的Follower和当前Leader内存中的数据是一致的,只发送一个Leader.DIFF的请求 // 而不进行数据的同步,甚至也没有对应的PROPOSAL/COMMIT请求 // The leader may recently take a snapshot, so the committedLog // is empty. We don't need to send snapshot if the follow // is already sync with in-memory db. LOG.debug("committedLog is empty but leader and follower " + "are in sync, zxid=0x{}", Long.toHexString(peerLastZxid)); packetToSend = Leader.DIFF; zxidToSend = peerLastZxid; } else { // 如果ZKDatabase.committedLog为空,但是对应的Follower和当前Leader内存中的数据不一致 // 这时候会使用默认的packetToSend:Leader.SNAP // just let the state transfer happen LOG.debug("proposals is empty"); } LOG.info("Sending " + Leader.getPacketType(packetToSend)); leaderLastZxid = leader.startForwarding(this, updates); } finally { rl.unlock(); }
问题B
如果一次写入,由于丢包,导致某条日志没有写入,会怎么样呢?
先引入上次的 Session
问题 中使用过的一张图:
Leader
会向 Follower
发送 Proposal
和 Commit
请求,其中, Follower
收到 Proposal
请求之后,会写入日志;收到 Commit
请求之后,会更改内存中的 DataTree
。
问题中提到的 日志没有写入
,也就是发送到某个 Follower
的 Proposal
请求被丢弃了, 对应的 Follower
会是怎样一个逻辑呢?
回答这个问题,最直观的方法就是模拟场景,在实验中将 Proposal
请求丢弃,观察对应的 Follower
和 Leader
的表现。
那么,问题来了,如何模拟呢?
在 Jepsen
中,使用的是 iptables
来模拟网络故障(周期性地丢包、网络分区等)。在我的模拟环境中, zk
集群都是部署在本地,使用 iptables
来操作会比较繁琐,而且我的需求是精确地丢弃掉 Proposal
请求,而不是 Follower
和 Leader
之间发送的所有的请求。
最终还是使用了 Byteman
,对应的脚本如下:
RULE trace zk.skip_proposal_packet CLASS org.apache.zookeeper.server.quorum.Follower METHOD processPacket AT ENTRY IF $1.getType() == 2 DO traceln("*** drop PROPOSAL packet"); return; ENDRULE
使用 Byteman
做故障场景模拟并不是我的原创, Cassandra
中使用了 Byteman
来做故障场景注入。这种方法的优点是可以完成代码级别在精确错误注入;缺点也很明显,需要待注入服务是运行在 JVM
之上的。
进行了场景模拟之后,发现被测的丢弃 Proposal
请求的 Follower
进入了 LOOKING
状态,然后重新加入了集群。原因是 Leader
主动断开了和 Proposal
的连接。
那么,为什么 Follower
丢弃 Proposal
请求会导致 Leader
主动断开了和 Proposal
的连接呢?
这个逻辑和 LearnerHandler$SyncLimitCheck
有关, Leader
会定时去调用 LearnerHandler.ping
向 Follower
发送 Leader.PING
请求,逻辑如下:
public void ping() { long id; if (syncLimitCheck.check(System.nanoTime())) { synchronized(leader) { id = leader.lastProposed; } QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null); queuePacket(ping); } else { LOG.warn("Closing connection to peer due to transaction timeout."); shutdown(); } }
如果 Leader
发送出去的 Leader.PROPOSAL
请求在一段时间内(这个时间由 conf/zoo.cfg
中的 syncLimit
决定)没有收到对应的 ACK
,就会导致 syncLimitCheck.check
失败,从而调用 LearnerHandler.shutdown
关闭到这个 Follower
的连接,并停止对应的发送、接收请求的线程。
在 Follower
这边,由于 Leader
连接关闭,调用 Learner.readPacket
时会抛出异常,退出 Follower.followLeader
方法,重新进入 LOOKING
状态。
综上,我们知道了,发送到某个 Follower
的 Proposal
请求被丢弃,会导致对应的 Follower
重新进入 LOOKING
状态。
那么,如果被丢弃的请求是 Commit
请求呢?
同样使用 Byteman
进行了模拟,由于 Commit
请求是不需要返回 ACK
给 Leader
的,所以,如果模拟时有两个写入请求 ReqA
、 ReqB
,如果两个请求对应的 Commit
请求都丢弃了,这个时候其实对系统并没有什么影响,但是连接到对应 Follower
上的客户端看到的数据就是 stale
的。
如果丢弃 ReqA
对应的 Commit
请求之后就撤销故障场景, ReqB
对应的 Commit
请求正常执行会是什么情况呢?对应的逻辑在 FollowerZooKeeperServer.commit
中:
// 收到Leader发送的Leader.COMMIT请求之后,会调用这个方法 public void commit(long zxid) { // FollowerZooKeeperServer.logRequest中会添加条目到pendingTxns中 // 这个field保存了所有调用了syncProcessor.processRequest,但是没有收到 // Commit请求的Request if (pendingTxns.size() == 0) { LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn"); return; } // 如果收到的Commit请求中的zxid和pendingTxns中第一个请求的zxid是不一样的 // 打印Error级别的日志并退出 java 进程 // 在我们的模拟情况中,pendingTxns中第一个请求是ReqA // 所以当ReqB对应的Commit请求被Follower收到时,会进入到这个逻辑 long firstElementZxid = pendingTxns.element().zxid; if (firstElementZxid != zxid) { LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid)); System.exit(12); } // 如果是匹配的,就从pendingTxns中移除这个请求 // 并执行commitProcessor.commit Request request = pendingTxns.remove(); commitProcessor.commit(request); }
是的,对应的 Follower
进程退出了。
问题C
扩容的时候加了个节点,这时候新的写入会不会同步到这个新节点呢?
会的。虽然这时候 Leader
的 conf/zoo.cfg
里面还没有新加入节点的信息,但是 Leader
会为这个节点创建相应的 LearnerHandler
,对应的逻辑在 Leader$LearnerCnxAcceptor.run
中:
Socket s = ss.accept(); // start with the initLimit, once the ack is processed // in LearnerHandler switch to the syncLimit s.setSoTimeout(self.tickTime * self.initLimit); s.setTcpNoDelay(nodelay); LearnerHandler fh = new LearnerHandler(s, Leader.this); fh.start();
新加入的节点也会经历如下的阶段:
上图中的步骤是在 Learner.registerWithLeader
和 Learner.syncWithLeader
中完成的,也是新加入节点从 Leader
中同步数据的步骤。再看看 Leader
是如何把增量的数据同步到 Follower
的。
Leader
向 Follower
发送请求的方法 Leader.sendPacket
实现如下:
void sendPacket(QuorumPacket qp) { synchronized (forwardingFollowers) { for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); } } }
新加入节点的 LearnerHandler
是在 LearnerHandler.run
中通过调用 Leader.startForwarding
加入到 Leader.forwardingFollowers
中的,加入之后 Leader
就会开始同步数据到新的节点了。
那么,在计算 QuorumVerifier.containsQuorum
的时候,会涉及到新加入的节点么?
答案是,某种程度上,会。
验证 Ping
或者 Proposal
是否达到大多数的逻辑是在 QuorumPeer.quorumConfig
中实现的。
QuorumPeer.quorumConfig
这个 field
对应的类型是 QuorumVerifier
这个 接口
,这个 接口
有两个实现: QuorumMaj
和 QuorumHierarchical
,我们的部署比较简单,没有 weight
相关的配置,所以使用的实现都是 QuorumMaj
。
QuorumMaj
中有一个 field
叫 half
,标志着这个集群里面半数的值(对于 3节点
的集群, half
为 1
;对于 4节点
的集群, half
为 2
;依此类推);对应的 containsQuorum
方法实现也非常简单粗暴:
public boolean containsQuorum(HashSet<Long> set){ return (set.size() > half); }
由于 QuorumPeer.setQuorumVerifier
只有在节点启动的时候才会被调用,所以 QuorumMaj.half
的值在节点启动之后就不会改变。
如果在一个 3节点
( zk0
、 zk1
、 zk2
)的集群中,扩容一个新节点 zk3
。在没有启动集群原有 3节点
的情况下, Leader
中的 QuorumMaj.half
会一直为 1
,只是这时候, containsQuorum
方法的输入可能是一个大小为 4
的集合。
可以理解为 Leader
中判断 Proposal
是否达到大多数的标准是没有变化的,但是输入产生了变化。
问题D
Follower
会主动向 Leader
发 ping
包么?
不会, Follower
向 Leader
发的 Leader.PING
包只是 response
,并没有线程会定期向 Leader
来发。那么,这时候会有个问题,如果 Follower
长时间没有收到 Leader
发的 Leader.PING
请求会怎么样呢?
依然是使用 Byteman
将 Leader
发送给 Follower
的 Leader.PING
请求给丢弃掉,对应的脚本如下:
RULE trace zk.skip_ping CLASS org.apache.zookeeper.server.quorum.LearnerHandler METHOD ping AT ENTRY IF $0.sid == 2 DO traceln("*** drop ping packet to sid: 2"); return; ENDRULE
可以看到对应的 Follower
会不断进入 LOOKING
状态,连上 Leader
之后相隔 10s
就会有如下日志,错误为 Read timed out
:
2017-06-12 15:19:34,963 [myid:2] - INFO - Snapshotting: 0x100000000 to data/version-2/snapshot.100000000 2017-06-12 15:19:44,973 [myid:2] - WARN - Exception when following the leader Exception when following the leader java.net.SocketTimeoutException: Read timed out ... at org.apache.zookeeper.server.quorum.Learner.readPacket(Learner.java:153) at org.apache.zookeeper.server.quorum.Follower.followLeader(Follower.java:85) at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:786)
为什么是 10s
这个时间呢?
原因在于 Leader.syncWithLeader
这个方法中,在收到 Leader.UPTODATE
后,会调用:
sock.setSoTimeout(self.tickTime * self.syncLimit);
在 conf/zoo.cfg
中,我们目前的配置为:
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5
因此,和 Leader
建立的 socket
的读写超时时间为 2000ms * 5 = 10s
。
不止 Follower
会超时断连, Leader
的日志显示 Leader
也会出现读超时:
2017-06-12 15:31:15,988 [myid:1] - INFO - Received NEWLEADER-ACK message from 2 2017-06-12 15:31:25,994 [myid:1] - ERROR - Unexpected exception causing shutdown while sock still open java.net.SocketTimeoutException: Read timed out ... at org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:103) at org.apache.zookeeper.server.quorum.LearnerHandler.run(LearnerHandler.java:546)
然后,关闭对应的 LearnerHandler
。
问题E
集群在选举的时候, 四字命令 都会返回 This ZooKeeper instance is not currently serving requests
,什么时候会变回正常可服务状态?
先看一下 四字命令 返回 ZK_NOT_SERVING
的逻辑,以 mntr
这个四字命令为例,对应的代码在 NIOServerCnxn$MonitorCommand.commandRun
中:
private class MonitorCommand extends CommandThread { MonitorCommand(PrintWriter pw) { super(pw); } @Override public void commandRun() { if(zkServer == null) { pw.println(ZK_NOT_SERVING); return; } ... } ... }
NIOServerCnxn
这个对象是每个连接都会创建一个的,创建 NIOServerCnxn
对象的逻辑在 NIOServerCnxnFactory.createConnection
中:
protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws IOException { return new NIOServerCnxn(zkServer, sock, sk, this); }
NIOServerCnxn
构造方法参数里面使用的是 ServerCnxnFactory.zkServer
,那么, ServerCnxnFactory.zkServer
是在什么时候设置的呢?分别看下 Follower
和 Leader
中对应的逻辑。
Follower
Follower
会在收到 Leader
发送的 Leader.UPTODATE
之后去设置 ,对应的逻辑在 Learner.syncWithLeader
中,调用 ServerCnxnFactory.setZooKeeperServer
来设置 ServerCnxnFactory.zkServer
:
case Leader.UPTODATE: if (!snapshotTaken) { // true for the pre v1.0 case zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); } self.cnxnFactory.setZooKeeperServer(zk); break outerLoop;
Leader
Leader
会在 Leader.leader
中 waitForNewLeaderAck
之后去设置 ,对应的逻辑如下:
try { waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT); } catch (InterruptedException e) { shutdown("Waiting for a quorum of followers, only synced with sids: [ " + getSidSetString(newLeaderProposal.ackSet) + " ]"); HashSet<Long> followerSet = new HashSet<Long>(); for (LearnerHandler f : learners) followerSet.add(f.getSid()); if (self.getQuorumVerifier().containsQuorum(followerSet)) { LOG.warn("Enough followers present. " + "Perhaps the initTicks need to be increased."); } Thread.sleep(self.tickTime); self.tick++; return; } startZkServer(); ... if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) { self.cnxnFactory.setZooKeeperServer(zk); }
代码里面可以看到,调用 ServerCnxnFactory.setZooKeeperServer
的前提是 zookeeper.leaderServes
这个属性是设置为 true
的(默认为 true
)。这个属性的含义是, Leader
节点是否接受客户端的请求。
总结
其实这些细节在实际维护中应用到的比较少,维护中会遇到的问题可能是“我的 snap
太多了,应该怎么清理”、“我想给某个包加个自定义的日志级别怎么办”、“我的 Curator
报这个错是什么意思”。虽然如此,我比较认可的观点仍然是:
维护开源产品不了解源码,或者没有找到看的有效入口,是很被动的,缺少定位解决问题的根本手段
有了源码定位以及相关 工具 的经验,遇到问题才不会轻易 方
、不会轻易 炸毛
、不会轻易 甩锅
。当这些代码不再是坨翔而是 My precious
时,解决问题就变成一种愉悦的体验了。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。