zookeeper-数据初始化分析

栏目: 编程工具 · 发布时间: 5年前

内容简介:在前两篇关于通过调用追踪我们发现下面我们将针对整个过程进行分析:

在前两篇关于 zookeeper 的分析文章中,我们熟悉了其选举投票及集群间数据同步的过程,本文将针对 zookeeper 启动前的一个重要处理过程进行分析;也即是 数据初始化

通过调用追踪我们发现 zookeeper 最终会执行 ZKDatabase 的方法 loadDataBase 来完成数据初始化

public long loadDataBase() throws IOException {
    PlayBackListener listener=new PlayBackListener(){
        public void onTxnLoaded(TxnHeader hdr,Record txn){
            Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
                    null, null);
            r.txn = txn;
            r.hdr = hdr;
            r.zxid = hdr.getZxid();
            // 将事务日志包装成 proposal 集群模式下会同步给follower
            addCommittedProposal(r);
        }
    };
    
    long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
    initialized = true;
    return zxid;
}
复制代码
public long restore(DataTree dt, Map<Long, Integer> sessions, 
            PlayBackListener listener) throws IOException {
    // 快照文件加载至内存
    snapLog.deserialize(dt, sessions);
    
    // ......
}
复制代码

下面我们将针对整个过程进行分析:

加载快照文件

FileSnap 通过反序列化快照文件,将数据写入内存数据库 DataTree

public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
    // 查找有效的快照文件
    List<File> snapList = findNValidSnapshots(100);
    if (snapList.size() == 0) {
        return -1L;
    }
    
    // 省略
}
复制代码

deserialize 实现可知, zookeeper 在处理快照文件时首先需要查找指定个数的有效快照文件; 若不存在快照文件则返回 -1 。

查找有效的快照文件

private List<File> findNValidSnapshots(int n) throws IOException {
    // 查找 snapDir 目录下 snapshot 为前缀的快照文件;并按照 zxid 大小降序排列
    List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false);
    int count = 0;
    List<File> list = new ArrayList<File>();
    for (File f : files) {
        try {
            // 判断当前快照文件是否为有效文件
            if (Util.isValidSnapshot(f)) {
                list.add(f);
                count++;
                // 当有效快照文件个数 == n 时退出循环
                if (count == n) {
                    break;
                }
            }
        } catch (IOException e) {
            LOG.info("invalid snapshot " + f, e);
        }
    }
    return list;
}
复制代码
public static boolean isValidSnapshot(File f) throws IOException {
    if (f==null || Util.getZxidFromName(f.getName(), "snapshot") == -1)
        return false;

    // Check for a valid snapshot
    // 随机读文件
    RandomAccessFile raf = new RandomAccessFile(f, "r");
    try {
        // including the header and the last / bytes
        // the snapshot should be atleast 10 bytes
    	// 文件内容长度至少为 10 字节
        if (raf.length() < 10) {
            return false;
        }
        // 指针移动到文件末尾 5 个字节处
        raf.seek(raf.length() - 5);
        byte bytes[] = new byte[5];
        int readlen = 0;
        int l;
        // 读最后 5 字节至 bytes 字节数组中
        while(readlen < 5 &&
              (l = raf.read(bytes, readlen, bytes.length - readlen)) >= 0) {
            readlen += l;
        }
        if (readlen != bytes.length) {
            LOG.info("Invalid snapshot " + f
                    + " too short, len = " + readlen);
            return false;
        }
        ByteBuffer bb = ByteBuffer.wrap(bytes);
        int len = bb.getInt();
        byte b = bb.get();
        // 前 4 字节为整数 1,最后一个字节为 / 则为有效快照文件
        if (len != 1 || b != '/') {
            LOG.info("Invalid snapshot " + f + " len = " + len
                    + " byte = " + (b & 0xff));
            return false;
        }
    } finally {
        raf.close();
    }

    return true;
}
复制代码

从上述代码实现可知, zookeepersnap file 具有以下特性:

  • 快照文件内容长度至少为 10 字节
  • 快照文件末尾 5 个字节内容为: 整形数字 1 + 字符 /
可以在后续生成快照文件时进行验证
复制代码

解析快照文件

public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
    // 查找有效的快照文件
    List<File> snapList = findNValidSnapshots(100);
    if (snapList.size() == 0) {
        return -1L;
    }
    File snap = null;
    boolean foundValid = false;
    for (int i = 0; i < snapList.size(); i++) {
        snap = snapList.get(i);
        InputStream snapIS = null;
        CheckedInputStream crcIn = null;
        try {
            LOG.info("Reading snapshot " + snap);
            snapIS = new BufferedInputStream(new FileInputStream(snap));
            crcIn = new CheckedInputStream(snapIS, new Adler32());
            // 反序列化文件
            InputArchive ia = BinaryInputArchive.getArchive(crcIn);
            // 将文件内容写入 dataTree
            deserialize(dt,sessions, ia);
            // 文件校验和判断
            // 若 checkSum 不一致则继续解析下一个快照文件
            long checkSum = crcIn.getChecksum().getValue();
            long val = ia.readLong("val");
            if (val != checkSum) {
                throw new IOException("CRC corruption in snapshot :  " + snap);
            }

            // 实际上当最近的一个快照文件为有效文件的时候就会退出循环
            foundValid = true;
            break;
        } catch(IOException e) {
            LOG.warn("problem reading snap file " + snap, e);
        } finally {
            if (snapIS != null) 
                snapIS.close();
            if (crcIn != null) 
                crcIn.close();
        } 
    }

    // 没有一个有效的快照文件,则启动失败
    if (!foundValid) {
        throw new IOException("Not able to find valid snapshots in " + snapDir);
    }
    // 记录最近的 zxid
    dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
    return dt.lastProcessedZxid;
}
复制代码

zookeeper 通过遍历 snap file 并反序列化解析 snap file 将其写入内存数据库 dataTree , 在该过程中会对 check sum 进行检查以确定 snap file 的正确性(一般来讲当 snap file 正确性通过后,只会解析最新的 snap file );若没有一个正确的 snap file 则抛出异常说明启动失败,反之 dataTree 将会记录 lastProcessedZxid

加载事务日志文件

下面我们继续对 FileTxSnapLogrestore 方法进行分析, zookeeper 在完成快照文件的处理之后,会加载事务日志文件并处理

public long restore(DataTree dt, Map<Long, Integer> sessions, 
            PlayBackListener listener) throws IOException {
    // 快照文件加载至内存
    snapLog.deserialize(dt, sessions);

    // 构造 FileTxnLog 实例
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    // 通过logDir zxid 构造 FileTxnIterator 实例并执行初始化动作
    // 加载比快照数据新的日志文件并查找最接近快照的事务日志记录
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    // 省略
}
复制代码

在执行

TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
复制代码

时会构建 FileTxnIterator 实例并执行 init 动作,代码如下:

public FileTxnIterator(File logDir, long zxid) throws IOException {
  this.logDir = logDir;
  this.zxid = zxid;
  init();
}
复制代码

接下来我们看下其 init 中如何处理事务日志文件

void init() throws IOException {
    storedFiles = new ArrayList<File>();
    // 查找 logDir 目录下的所有事务日志记录文件,并按 zxid 降序排列
    List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);

    // storedFiles 用来记录 zxid 大于快照文件 zxid 的事务日志文件
    for (File f: files) {
        if (Util.getZxidFromName(f.getName(), "log") >= zxid) {
            storedFiles.add(f);
        }
        // add the last logfile that is less than the zxid
        else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
            storedFiles.add(f);
            break;
        }
    }
    // 因为一个事务日志文件中记录了多条事务日志,而事务日志文件名的后缀是当前文件的第一条事务记录的zxid
    // 通过判断 hdr.getZxid < zxid 查找最接近快照的事务记录
    goToNextLog();
    if (!next())
        return;
    while (hdr.getZxid() < zxid) {
        if (!next())
            return;
    }
}
复制代码

init 的实现可以看出 zookeeper 对事物日志文件的处理流程如下:

hdr.getZxid() < zxid

处理快照之后的事务

上文中通过加载事务日志文件查找到快照之后所提交的事务记录,下面就看下如何处理这些事务记录的

public long restore(DataTree dt, Map<Long, Integer> sessions, 
            PlayBackListener listener) throws IOException {
    // 快照文件加载至内存
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    // 加载比快照数据新的日志文件
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
        while (true) {
            // iterator points to 
            // the first valid txn when initialized
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs 
                return dt.lastProcessedZxid;
            }
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(higestZxid) > {}(next log) for type {}",
                        new Object[] { highestZxid, hdr.getZxid(),
                                hdr.getType() });
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                // 处理事务日志 写入内存数据库
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
               throw new IOException("Failed to process transaction type: " +
                     hdr.getType() + " error: " + e.getMessage(), e);
            }
            // listener 回调将事务包装为 proposal 集群模式下同步至 follower
            listener.onTxnLoaded(hdr, itr.getTxn());
            // 继续下一条事务日志
            if (!itr.next()) 
                break;
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }
    return highestZxid;
}
复制代码

restore 的后续实现我们可以看出, zookeeper 在完成事务日志文件加载之后,会依次处理日志文件中每条事务记录:

  • 按事务类型处理事务记录,写入内存数据库
  • listener 回调事务加载事件;将事务包装为 proposal 存储到 committedLog 列表中,并分别记录 maxCommittedLogminCommittedLog (参见上文中的 PlayBackListener

小结

通过上文的分析,可以概括下 zookeeper 数据初始化的流程如下:

  • 加载快照文件,获取快照最新的 zxid
  • 加载事务日志文件,获取快照之后的事务记录
  • 处理快照之后的事务记录,写入内存

以上所述就是小编给大家介绍的《zookeeper-数据初始化分析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Trading and Exchanges

Trading and Exchanges

Larry Harris / Oxford University Press, USA / 2002-10-24 / USD 95.00

This book is about trading, the people who trade securities and contracts, the marketplaces where they trade, and the rules that govern it. Readers will learn about investors, brokers, dealers, arbit......一起来看看 《Trading and Exchanges》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

URL 编码/解码
URL 编码/解码

URL 编码/解码