内容简介:在前两篇关于通过调用追踪我们发现下面我们将针对整个过程进行分析:
在前两篇关于 zookeeper
的分析文章中,我们熟悉了其选举投票及集群间数据同步的过程,本文将针对 zookeeper
启动前的一个重要处理过程进行分析;也即是 数据初始化
。
通过调用追踪我们发现 zookeeper
最终会执行 ZKDatabase
的方法 loadDataBase
来完成数据初始化
public long loadDataBase() throws IOException { PlayBackListener listener=new PlayBackListener(){ public void onTxnLoaded(TxnHeader hdr,Record txn){ Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(), null, null); r.txn = txn; r.hdr = hdr; r.zxid = hdr.getZxid(); // 将事务日志包装成 proposal 集群模式下会同步给follower addCommittedProposal(r); } }; long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener); initialized = true; return zxid; } 复制代码
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { // 快照文件加载至内存 snapLog.deserialize(dt, sessions); // ...... } 复制代码
下面我们将针对整个过程进行分析:
加载快照文件
FileSnap
通过反序列化快照文件,将数据写入内存数据库 DataTree
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { // 查找有效的快照文件 List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } // 省略 } 复制代码
从 deserialize
实现可知, zookeeper
在处理快照文件时首先需要查找指定个数的有效快照文件; 若不存在快照文件则返回 -1 。
查找有效的快照文件
private List<File> findNValidSnapshots(int n) throws IOException { // 查找 snapDir 目录下 snapshot 为前缀的快照文件;并按照 zxid 大小降序排列 List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false); int count = 0; List<File> list = new ArrayList<File>(); for (File f : files) { try { // 判断当前快照文件是否为有效文件 if (Util.isValidSnapshot(f)) { list.add(f); count++; // 当有效快照文件个数 == n 时退出循环 if (count == n) { break; } } } catch (IOException e) { LOG.info("invalid snapshot " + f, e); } } return list; } 复制代码
public static boolean isValidSnapshot(File f) throws IOException { if (f==null || Util.getZxidFromName(f.getName(), "snapshot") == -1) return false; // Check for a valid snapshot // 随机读文件 RandomAccessFile raf = new RandomAccessFile(f, "r"); try { // including the header and the last / bytes // the snapshot should be atleast 10 bytes // 文件内容长度至少为 10 字节 if (raf.length() < 10) { return false; } // 指针移动到文件末尾 5 个字节处 raf.seek(raf.length() - 5); byte bytes[] = new byte[5]; int readlen = 0; int l; // 读最后 5 字节至 bytes 字节数组中 while(readlen < 5 && (l = raf.read(bytes, readlen, bytes.length - readlen)) >= 0) { readlen += l; } if (readlen != bytes.length) { LOG.info("Invalid snapshot " + f + " too short, len = " + readlen); return false; } ByteBuffer bb = ByteBuffer.wrap(bytes); int len = bb.getInt(); byte b = bb.get(); // 前 4 字节为整数 1,最后一个字节为 / 则为有效快照文件 if (len != 1 || b != '/') { LOG.info("Invalid snapshot " + f + " len = " + len + " byte = " + (b & 0xff)); return false; } } finally { raf.close(); } return true; } 复制代码
从上述代码实现可知, zookeeper
的 snap file
具有以下特性:
- 快照文件内容长度至少为 10 字节
-
快照文件末尾 5 个字节内容为: 整形数字
1
+ 字符/
可以在后续生成快照文件时进行验证 复制代码
解析快照文件
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { // 查找有效的快照文件 List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; boolean foundValid = false; for (int i = 0; i < snapList.size(); i++) { snap = snapList.get(i); InputStream snapIS = null; CheckedInputStream crcIn = null; try { LOG.info("Reading snapshot " + snap); snapIS = new BufferedInputStream(new FileInputStream(snap)); crcIn = new CheckedInputStream(snapIS, new Adler32()); // 反序列化文件 InputArchive ia = BinaryInputArchive.getArchive(crcIn); // 将文件内容写入 dataTree deserialize(dt,sessions, ia); // 文件校验和判断 // 若 checkSum 不一致则继续解析下一个快照文件 long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } // 实际上当最近的一个快照文件为有效文件的时候就会退出循环 foundValid = true; break; } catch(IOException e) { LOG.warn("problem reading snap file " + snap, e); } finally { if (snapIS != null) snapIS.close(); if (crcIn != null) crcIn.close(); } } // 没有一个有效的快照文件,则启动失败 if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } // 记录最近的 zxid dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot"); return dt.lastProcessedZxid; } 复制代码
zookeeper
通过遍历 snap file
并反序列化解析 snap file
将其写入内存数据库 dataTree
, 在该过程中会对 check sum
进行检查以确定 snap file
的正确性(一般来讲当 snap file
正确性通过后,只会解析最新的 snap file
);若没有一个正确的 snap file
则抛出异常说明启动失败,反之 dataTree
将会记录 lastProcessedZxid
。
加载事务日志文件
下面我们继续对 FileTxSnapLog
的 restore
方法进行分析, zookeeper
在完成快照文件的处理之后,会加载事务日志文件并处理
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { // 快照文件加载至内存 snapLog.deserialize(dt, sessions); // 构造 FileTxnLog 实例 FileTxnLog txnLog = new FileTxnLog(dataDir); // 通过logDir zxid 构造 FileTxnIterator 实例并执行初始化动作 // 加载比快照数据新的日志文件并查找最接近快照的事务日志记录 TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; // 省略 } 复制代码
在执行
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); 复制代码
时会构建 FileTxnIterator
实例并执行 init
动作,代码如下:
public FileTxnIterator(File logDir, long zxid) throws IOException { this.logDir = logDir; this.zxid = zxid; init(); } 复制代码
接下来我们看下其 init
中如何处理事务日志文件
void init() throws IOException { storedFiles = new ArrayList<File>(); // 查找 logDir 目录下的所有事务日志记录文件,并按 zxid 降序排列 List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false); // storedFiles 用来记录 zxid 大于快照文件 zxid 的事务日志文件 for (File f: files) { if (Util.getZxidFromName(f.getName(), "log") >= zxid) { storedFiles.add(f); } // add the last logfile that is less than the zxid else if (Util.getZxidFromName(f.getName(), "log") < zxid) { storedFiles.add(f); break; } } // 因为一个事务日志文件中记录了多条事务日志,而事务日志文件名的后缀是当前文件的第一条事务记录的zxid // 通过判断 hdr.getZxid < zxid 查找最接近快照的事务记录 goToNextLog(); if (!next()) return; while (hdr.getZxid() < zxid) { if (!next()) return; } } 复制代码
从 init
的实现可以看出 zookeeper
对事物日志文件的处理流程如下:
hdr.getZxid() < zxid
处理快照之后的事务
上文中通过加载事务日志文件查找到快照之后所提交的事务记录,下面就看下如何处理这些事务记录的
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { // 快照文件加载至内存 snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); // 加载比快照数据新的日志文件 TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { while (true) { // iterator points to // the first valid txn when initialized hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error("{}(higestZxid) > {}(next log) for type {}", new Object[] { highestZxid, hdr.getZxid(), hdr.getType() }); } else { highestZxid = hdr.getZxid(); } try { // 处理事务日志 写入内存数据库 processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } // listener 回调将事务包装为 proposal 集群模式下同步至 follower listener.onTxnLoaded(hdr, itr.getTxn()); // 继续下一条事务日志 if (!itr.next()) break; } } finally { if (itr != null) { itr.close(); } } return highestZxid; } 复制代码
从 restore
的后续实现我们可以看出, zookeeper
在完成事务日志文件加载之后,会依次处理日志文件中每条事务记录:
- 按事务类型处理事务记录,写入内存数据库
-
listener
回调事务加载事件;将事务包装为proposal
存储到committedLog
列表中,并分别记录maxCommittedLog
,minCommittedLog
(参见上文中的PlayBackListener
)
小结
通过上文的分析,可以概括下 zookeeper
数据初始化的流程如下:
- 加载快照文件,获取快照最新的 zxid
- 加载事务日志文件,获取快照之后的事务记录
- 处理快照之后的事务记录,写入内存
以上所述就是小编给大家介绍的《zookeeper-数据初始化分析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Kratos 初始化源码分析
- Flutter框架分析(二)-- 初始化
- Spring对象创建初始化分析
- Swoole 源码分析——Server 模块之初始化
- Drone 源码分析之数据库初始化
- Spring源码分析之IoC容器初始化
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Trading and Exchanges
Larry Harris / Oxford University Press, USA / 2002-10-24 / USD 95.00
This book is about trading, the people who trade securities and contracts, the marketplaces where they trade, and the rules that govern it. Readers will learn about investors, brokers, dealers, arbit......一起来看看 《Trading and Exchanges》 这本书的介绍吧!