内容简介:随着 Apache Pulsar 成为 Apache 的顶级开源项目,其存储层的解决方案 Apache BookKeeper 再次受到业界广泛关注。BookKeeper 在 Pulsar 之前也有很多成功的应用,比如使用 BookKeeper 实现了 HDFS NameNode 的 HA 机制(可能大部分公司使用的还是 Quorum Journal Manage 方案)、Twitter 开源的 DistributedLog 系统(可参考这里先对 BookKeeper 的基本概念做一下介绍,下图是 Book
随着 Apache Pulsar 成为 Apache 的顶级开源项目,其存储层的解决方案 Apache BookKeeper 再次受到业界广泛关注。BookKeeper 在 Pulsar 之前也有很多成功的应用,比如使用 BookKeeper 实现了 HDFS NameNode 的 HA 机制(可能大部分公司使用的还是 Quorum Journal Manage 方案)、Twitter 开源的 DistributedLog 系统(可参考 Twitter开源分布式高性能日志复制服务 ),BookKeeper 作为一个高扩展、强容错、低延迟的存储服务(A scalable, fault-tolerant, and low-latency storage service optimized for real-time workloads),它相当于把底层的存储层系统服务化(BookKeeper 是更底层的存储服务,类似于 Kafka 的存储层)。这样可以使得依赖于 BookKeeper 实现的分布式存储系统(包括分布式消息队列)在设计时可以只关注其应用层和功能层的内容,存储层比较难解决的问题像一致性、容错等,BookKeeper 已经实现了,从这个层面看,BookKeeper 确实解决业内的一些问题,而且 BookKeeper (Ledger 化,Ledger 相当于 Kafka segment)天生适合云上部署,未来还是有很大潜力的。近段对 BookKeeper 做了一些相应的调研,做了一些总结,本文将会主要从集群部署和使用角度来介绍一下 Apache BookKeeper,后面准备再写一篇文章来深入讲述其架构设计及实现原理。
BookKeeper 简介
这里先对 BookKeeper 的基本概念做一下介绍,下图是 BookKeeper 的架构图(图片来自 Introduction to Apache BookKeeper ):
在 BookKeeper 中节点(Server)被称作 Bookie(类似于 Kafka 中 Broker,HDFS 中的 DN,但是 BookKeeper 没有 Master 节点,它是典型 Slave/Slave 架构),数据在 Bookie 上以 Ledger 的形式存储(类似 Kafka 中的 Segment,HDFS 中的 Block), BookKeeper 相关的基本概念如下:
- Cluster: 所有的 Bookie 组成一个集群(连接到同一个 zk 地址的 Bookie 集合);
- Bookie:BookKeeper 的存储节点,也即 Server 节点;
- Ledger:Ledger 是对一个 log 文件的抽象,它本质上是由一系列 Entry (类似与 Kafka 每条 msg)组成的,client 在向 BookKeeper 写数据时也是往 Ledger 中写的;
- Entry:entry 本质上就是一条数据,它会有一个 id 做标识;
- Journal: Write ahead log,数据是先写到 Journal 中,这个也是 BookKeeper 读写分离实现机制的一部分,后续会详细分析;
- Ensemble: Set of Bookies across which a ledger is striped,一个 Ledger 所涉及的 Bookie 集合,初始化 Ledger 时,需要指定这个 Ledger 可以在几台 Bookie 上存储;
- Write Quorum Size: Number of replicas,要写入的副本数;
- Ack Quorum Size: Number of responses needed before client’s write is satisfied,当这么多副本写入成功后才会向 client 返回成功,比如副本数设置了 3,这个设置了2,client 会同时向三副本写入数据,当收到两个成功响应后,会认为数据已经写入成功;
- LAC: Last Add Confirmed,Ledger 中已经确认的最近一条数据的 entry id。
BookKeeper 集群搭建
关于 BookKeeper 集群的搭建可以参考 Apache BookKeeper Manual deployment 这篇文章。
集群搭建前准备
BookKeeper 集群搭建需要:
- ZooKeeper 集群;
- 一些 Bookie 节点(在集群的模式下最好是选取三台);
- JDK 版本要求是 JDK8;
这里先看下 BookKeeper 的目录结构,跟其他分布式系统也类似,命令在 bin 目录下,配置文件在 conf 目录下,lib 是其依赖的相关 jar 包,如下所示:
[matt@XXX2 bookkeeper]$ ll total 64 drwxr-xr-x 2 matt matt 4096 Sep 20 18:35 bin drwxr-xr-x 2 matt matt 4096 Sep 20 18:35 conf drwxrwxr-x 9 matt matt 4096 Oct 9 21:41 deps drwxrwxr-x 2 matt matt 12288 Oct 9 21:41 lib -rw-r--r-- 1 matt matt 24184 Sep 20 18:35 LICENSE -rw-r--r-- 1 matt matt 5114 Sep 20 18:35 NOTICE -rw-r--r-- 1 matt matt 4267 Sep 20 18:35 README.md
bin 目录下提供了 BookKeeper 相应的操作命令,这里用的命令主要是 bin/bookkeeper*
( bookkeeper-daemon.sh
可以让 Bookie 进程在后台自动运行),可以在 bin/common.sh
配置一些通用的配置(比如 JAVA_HOME),关于 bookkeeper 命令的使用方法见 bookkeeper cli
[matt@XXX2 bookkeeper]$ ll bin/ total 56 -rwxr-xr-x 1 matt matt 2319 Sep 20 18:35 bkctl -rwxr-xr-x 1 matt matt 5874 Sep 20 18:35 bookkeeper -rwxr-xr-x 1 matt matt 2869 Sep 20 18:35 bookkeeper-cluster.sh -rwxr-xr-x 1 matt matt 4590 Sep 20 18:35 bookkeeper-daemon.sh -rwxr-xr-x 1 matt matt 7785 Sep 20 18:35 common.sh -rwxr-xr-x 1 matt matt 4575 Sep 20 18:35 dlog -rwxr-xr-x 1 matt matt 1738 Sep 20 18:35 standalone -rwxr-xr-x 1 matt matt 5128 Sep 20 18:35 standalone.docker-compose -rwxr-xr-x 1 matt matt 1854 Sep 20 18:35 standalone.process
在 bookkeper 命令中,又提供了 shell 的相关命令,这里提供的命令非常丰富,可以参考 BookKeeper Shell ,如下所示:
[matt@XXX2 bookkeeper]$ bin/bookkeeper shell Usage: bookkeeper shell [-localbookie [<host:port>]] [-ledgeridformat <hex/long/uuid>] [-entryformat <hex/string>] [-conf configuration] <command> where command is one of: autorecovery [-enable|-disable] bookieformat [-nonInteractive] [-force] [-deleteCookie] bookieinfo bookiesanity [-entries N] [-timeout N] convert-to-db-storage convert-to-interleaved-storage decommissionbookie [-bookieid <bookieaddress>] deleteledger -ledgerid <ledgerid> [-force] help [COMMAND] initbookie initnewcluster lastmark ledger [-m] <ledger_id> ledgermetadata -ledgerid <ledgerid> listbookies [-readwrite|-readonly] [-hostnames] listfilesondisc [-journal|-entrylog|-index] listledgers [-meta] [-bookieid <bookieaddress>] listunderreplicated [[-missingreplica <bookieaddress>] [-excludingmissingreplica <bookieaddress>]] [-printmissingreplica] [-printreplicationworkerid] lostbookierecoverydelay [-get|-set <value>] metaformat [-nonInteractive] [-force] nukeexistingcluster -zkledgersrootpath <zkledgersrootpath> [-instanceid <instanceid> | -force] readjournal [-dir] [-msg] <journal_id | journal_file_name> readledger [-bookie <address:port>] [-msg] -ledgerid <ledgerid> [-firstentryid <firstentryid> [-lastentryid <lastentryid>]] [-force-recovery] readlog [-msg] <entry_log_id | entry_log_file_name> [-ledgerid <ledgerid> [-entryid <entryid>]] [-startpos <startEntryLogBytePos> [-endpos <endEntryLogBytePos>]] readlogmetadata <entry_log_id | entry_log_file_name> rebuild-db-ledger-locations-index recover [-deleteCookie] <bookieSrc[:bookieSrc]> simpletest [-ensemble N] [-writeQuorum N] [-ackQuorum N] [-numEntries N] triggeraudit updatecookie [-bookieId <hostname|ip>] [-expandstorage] [-list] [-delete <force>] updateledgers -bookieId <hostname|ip> [-updatespersec N] [-limit N] [-verbose true/false] [-printprogress N] whatisinstanceid whoisauditor
conf 目录下是关于 BookKeeper 的相关配置,如下所示,主要配置在 bk_server.conf
中,这里可以提供的配置非常多,具体可配置的参数可以参考 BookKeeper Config ,
[matt@XXX2 bookkeeper]$ ll conf/ total 84 -rw-r--r-- 1 matt matt 1804 Sep 20 18:35 bk_cli_env.sh -rw-r--r-- 1 matt matt 2448 Sep 20 18:35 bkenv.sh -rwxr-xr-x 1 matt matt 42269 Sep 20 18:35 bk_server.conf -rw-r--r-- 1 matt matt 1211 Sep 20 18:35 jaas_example.conf -rw-r--r-- 1 matt matt 2311 Sep 20 18:35 log4j.cli.properties -rw-r--r-- 1 matt matt 2881 Sep 20 18:35 log4j.properties -rw-r--r-- 1 matt matt 1810 Sep 20 18:35 log4j.shell.properties -rw-r--r-- 1 matt matt 1117 Sep 20 18:35 nettyenv.sh -rwxr-xr-x 1 matt matt 1300 Sep 20 18:35 standalone.conf -rw-r--r-- 1 matt matt 3275 Sep 20 18:35 zookeeper.conf -rw-r--r-- 1 matt matt 843 Sep 20 18:35 zookeeper.conf.dynamic
集群搭建
在 Apache BookKeeper Releases 中下载 BookKeeper 最新的安装包(这里以 bookkeeper-server-4.8.0-bin.tar.gz 为例)。
将安装包在指定目录下解压后,启动的操作分为以下几步:
- 修改相关配置(
zkServers
、bookiePort
、journalDir
、ledgerDir
等); - 在相应的机器上启动 Bookie 进程(使用
./bin/bookkeeper-daemon.sh start bookie
启动 Bookie); - 当所有的 Bookie 启动完成后,随便选择一台,初始化集群 meta 信息(使用
bookkeeper-server/bin/bookkeeper shell metaformat
命令初始化集群的 meta 信息,这里只需要初始化一次)。
如果启动成功的话(如果有异常日志,即使 Bookie 进程存在,也可能没有启动成功),启动正常的情况下,在日志中,可以看到类似下面的信息:
2018-10-15 11:24:49,549 - INFO [main:ComponentStarter@81] - Started component bookie-server.
Admin REST API
BookKeeper 服务提供了相应的 Rest API,可供管理员使用,具体可以参考 BookKeeper Admin REST API ,如果想要使用这个功能,首先需要 Bookie 服务将 bk_server.conf 中的 httpServerEnabled
配置设置为 true ,相关的配置参考 Http server settings 。
安装时踩的坑
在搭建 BookKeeper 集群中,并没有想象中那么顺畅,遇到了一些小问题,记录如下:
问题1:修改配置后重新启动失败
在使用 ./bin/bookkeeper-daemon.sh stop bookie
命令关闭 Bookie 进程,当关闭完 Bookie 进程后,再次启动时,发现无法启动,报出了下面的错误:
2018-10-13 21:05:40,674 - ERROR [main:Main@221] - Failed to build bookie server org.apache.bookkeeper.bookie.BookieException$InvalidCookieException: instanceId 406a08e5-911e-4ab6-b97b-40e4a56279a8 is not matching with null at org.apache.bookkeeper.bookie.Cookie.verifyInternal(Cookie.java:142) at org.apache.bookkeeper.bookie.Cookie.verify(Cookie.java:147) at org.apache.bookkeeper.bookie.Bookie.verifyAndGetMissingDirs(Bookie.java:381) at org.apache.bookkeeper.bookie.Bookie.checkEnvironmentWithStorageExpansion(Bookie.java:444) at org.apache.bookkeeper.bookie.Bookie.checkEnvironment(Bookie.java:262) at org.apache.bookkeeper.bookie.Bookie.<init>(Bookie.java:646) at org.apache.bookkeeper.proto.BookieServer.newBookie(BookieServer.java:133) at org.apache.bookkeeper.proto.BookieServer.<init>(BookieServer.java:102) at org.apache.bookkeeper.server.service.BookieService.<init>(BookieService.java:43) at org.apache.bookkeeper.server.Main.buildBookieServer(Main.java:299) at org.apache.bookkeeper.server.Main.doMain(Main.java:219) at org.apache.bookkeeper.server.Main.main(Main.java:201)
大概的意思就是说现在 zk 上的 instanceId 是 406a08e5-911e-4ab6-b97b-40e4a56279a8
,而期望的 instanceId 是 null,索引因为验证失败导致进程无法启动,instanceId 是搭建集群第三步(初始化集群 meta 信息的地方)中初始化的。此时如果我们启动测试的 client 程序,会抛出以下异常,这是因为目前集群只有2台 Bookie 处在可用状态,而 ensSize 默认是 3,writeQuorumSize 是 2,ackQuorumSize 是2。在 client 的测试程序中,新建一个 Ledger 时,由于集群当前可用的 Bookie 为2,不满足相应的条件,所以抛出了一下的异常:
org.apache.bookkeeper.client.BKException$BKNotEnoughBookiesException: Not enough non-faulty bookies available at org.apache.bookkeeper.client.SyncCallbackUtils.finish(SyncCallbackUtils.java:83) at org.apache.bookkeeper.client.SyncCallbackUtils$SyncCreateCallback.createComplete(SyncCallbackUtils.java:106) at org.apache.bookkeeper.client.LedgerCreateOp.createComplete(LedgerCreateOp.java:238) at org.apache.bookkeeper.client.LedgerCreateOp.initiate(LedgerCreateOp.java:142) at org.apache.bookkeeper.client.BookKeeper.asyncCreateLedger(BookKeeper.java:891) at org.apache.bookkeeper.client.BookKeeper.createLedger(BookKeeper.java:975) at org.apache.bookkeeper.client.BookKeeper.createLedger(BookKeeper.java:930) at org.apache.bookkeeper.client.BookKeeper.createLedger(BookKeeper.java:911) at com.matt.test.bookkeeper.ledger.LedgerTest.createLedgerSync(LedgerTest.java:110) at com.matt.test.bookkeeper.ledger.LedgerTest.main(LedgerTest.java:25) Exception in thread "main" java.lang.NullPointerException at com.matt.test.bookkeeper.ledger.LedgerTest.main(LedgerTest.java:26)
关于这个 BookieException$InvalidCookieException 异常,google 了一下并没有找到相应的解决办法,所以就直接看了相应的代码,抛出异常的代码如下:
private void verifyInternal(Cookie c,boolean checkIfSuperSet)throws BookieException.InvalidCookieException { String errMsg; if (c.layoutVersion < 3 && c.layoutVersion != layoutVersion) { errMsg = "Cookie is of too old version " + c.layoutVersion; LOG.error(errMsg); throw new BookieException.InvalidCookieException(errMsg); } else if (!(c.layoutVersion >= 3 && c.bookieHost.equals(bookieHost) && c.journalDirs.equals(journalDirs) && verifyLedgerDirs(c, checkIfSuperSet))) { errMsg = "Cookie [" + this + "] is not matching with [" + c + "]"; throw new BookieException.InvalidCookieException(errMsg); } else if ((instanceId == null && c.instanceId != null) || (instanceId != null && !instanceId.equals(c.instanceId))) { // instanceId should be same in both cookies errMsg = "instanceId " + instanceId + " is not matching with " + c.instanceId; throw new BookieException.InvalidCookieException(errMsg); // 由于 instanceId 不匹配,抛出了相应的异常 } }
这里可以看到的是从 zk 上拿到的 instanceId 是 406a08e5-911e-4ab6-b97b-40e4a56279a8
,而 Cookie 实例 c 中的 instanceId 为 null,那么 这个 Cookie 是如何初始化的呢?往上追一下代码,发现是在初始化 Bookie 时,会检查一下相应的运行环境,此时会从 journalDirectories 和 ledgerDirectories 中 current/VERSION
中初始化相应的 Cookie 对象,由于这个台机器之前启动过,所以这个文件已经创建了,文件的内容如下:
[matt@XXX2 bookkeeper]$ cat /tmp/bk-data/current/VERSION 4 bookieHost: "XXX:3181" journalDir: "/tmp/bk-txn" ledgerDirs: "1\t/tmp/bk-data" [matt@XXX2 bookkeeper]$ cat /tmp/bk-txn/current/VERSION 4 bookieHost: "XXX:3181" journalDir: "/tmp/bk-txn" ledgerDirs: "1\t/tmp/bk-data"
Cookie 从文件加载相应文件,并初始化对象的实现方法如下:
/** * Read cookie from registration manager for a given bookie <i>address</i>. * * @param rm registration manager * @param address bookie address * @return versioned cookie object * @throws BookieException when fail to read cookie */ public static Versioned<Cookie> readFromRegistrationManager(RegistrationManager rm, BookieSocketAddress address) throws BookieException { Versioned<byte[]> cookieData = rm.readCookie(address.toString()); try { try (BufferedReader reader = new BufferedReader( new StringReader(new String(cookieData.getValue(), UTF_8)))) { Builder builder = parse(reader); Cookie cookie = builder.build(); return new Versioned<Cookie>(cookie, cookieData.getVersion()); } } catch (IOException ioe) { throw new InvalidCookieException(ioe); } } private static Builder parse(BufferedReader reader)throws IOException { Builder cBuilder = Cookie.newBuilder(); int layoutVersion = 0; String line = reader.readLine(); if (null == line) { throw new EOFException("Exception in parsing cookie"); } try { layoutVersion = Integer.parseInt(line.trim()); cBuilder.setLayoutVersion(layoutVersion); } catch (NumberFormatException e) { throw new IOException("Invalid string '" + line.trim() + "', cannot parse cookie."); } if (layoutVersion == 3) { cBuilder.setBookieHost(reader.readLine()); cBuilder.setJournalDirs(reader.readLine()); cBuilder.setLedgerDirs(reader.readLine()); } else if (layoutVersion >= 4) { //这里的版本默认为 4 CookieFormat.Builder cfBuilder = CookieFormat.newBuilder(); TextFormat.merge(reader, cfBuilder); CookieFormat data = cfBuilder.build(); cBuilder.setBookieHost(data.getBookieHost()); cBuilder.setJournalDirs(data.getJournalDir()); cBuilder.setLedgerDirs(data.getLedgerDirs()); // Since InstanceId is optional if (null != data.getInstanceId() && !data.getInstanceId().isEmpty()) { //如果文件中没有 instanceId 字段,这里就不会初始化到 Cookie 中 cBuilder.setInstanceId(data.getInstanceId()); } } return cBuilder; }
解决的方法很简单,在 current/VERSION
文件中添加相应的 instanceId 字段后,Bookie 便可启动成功。但是这里还需要考虑的问题是:
- instanceId 在这里的作用是什么?instanceId 是在集群初始化时设置的,关于这个值的含义,我推测它的目的是对节点的上线做一个简单的认证,也就是说如果打算在集群中新添加一台 Bookie,需要知道当前的 instanceId 值,这样才能加入到这个集群中;
- Bookie 服务的启动流程是什么样的?这里就需要看下代码的具体实现,追一下 Bookie 的启动流程了。
BookKeeper API 使用
关于 BookKeeper API,总共提供了以下三种 API:
- The ledger API is a lower-level API that enables you to interact with ledgers directly,第一种是一种较为底层的 API 接口,直接与 Ledger 交互,见 The Ledger API ;
- The Ledger Advanced API is an advanced extension to Ledger API to provide more flexibilities to applications,第二种较高级的 API,提供了一些较高级的功能,见 The Advanced Ledger API ;
- The DistributedLog API is a higher-level API that provides convenient abstractions,这种是关于 DistributedLog 的一些操作 API,见 DistributedLog 。
在这节,我们主要看下第一种的实现,会简单讲述一下第二种,第三种这里不再介绍。
The Ledger API
关于 Ledger API 基本操作主要有以下几种:
- 创建 Ledger;
- 向 Ledger 写入数据(Entry);
- 关闭 Ledger,Ledger 关闭后数据就不能再写入,Ledger 一旦关闭它的数据就是不可变的;
- 从 Ledger 中读取数据;
- 删除 Ledger。
当然实现上述操作的前提是,需要先初始化一个 BookKeeper Client,下面开始慢慢讲述。
初始化 BookKeeper Client
BK Client 的初始化需要指定 zk 地址,BK Client 通过 zk 来连接到 BK 集群,具体实现如下:
// 第一种初始化 BookKeeper Client 的方法 try { String connectionString = zkAddr; // For a single-node, local ZooKeeper cluster BookKeeper bkClient = new BookKeeper(connectionString); logger.info("BookKeeper client init success."); } catch (InterruptedException | IOException | BKException e) { e.printStackTrace(); throw new RuntimeException( "There is an exception throw while creating the BookKeeper client."); } // 第二种初始化 BookKeeper Client 的方法 try { ClientConfiguration config = new ClientConfiguration(); config.setZkServers(zkAddr); config.setAddEntryTimeout(2000); BookKeeper bkClient = new BookKeeper(config); logger.info("BookKeeper client init success."); } catch (InterruptedException | IOException | BKException e) { e.printStackTrace(); throw new RuntimeException( "There is an exception throw while creating the BookKeeper client."); }
新建一个 Ledger
Ledger 的创建有两种,一种是同步创建,一种是异步创建(创建时需要指定相应的 password),其实现分别如下:
/** * create the ledger, default ensemble size is 3, write quorum size is 2, ack quorum size is 2 * * @param pw password * @return LedgerHandle */ public LedgerHandle createLedgerSync(String pw){ byte[] password = pw.getBytes(); try { LedgerHandle handle = bkClient.createLedger(BookKeeper.DigestType.MAC, password); return handle; } catch (BKException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } /** * create the ledger * * @param pw password */ public void createLedgerAsync(String pw){ class LedgerCreationCallbackimplements AsyncCallback.CreateCallback{ public void createComplete(int returnCode, LedgerHandle handle, Object ctx){ System.out.println("Ledger successfully created"); logger.info("Ledger successfully created async."); } } bkClient.asyncCreateLedger( 3, // ensSize 2, // writeQuorumSize and ackQuorumSize BookKeeper.DigestType.MAC, pw.getBytes(), new LedgerCreationCallback(), "some context" ); }
新建好 Ledger 之后,会返回一个 LedgerHandle 实例,对于 Ledger 的操作都是通过这个实例对象完成的,也可以通过 LedgerHandle.getId()
方法获取 Ledger 的 id,有了这个 id 就可以映射到具体的 Ledger,当需要读取数据时,通过 ledger id 初始化相应的 LedgerHandle 实例即可。
向 Ledger 写入数据
有了 Ledger 对应的 LedgerHandle 实例之后,可以通过 addEntry()
方法直接向 Ledger 写数据,如下所示:
public long addEntry(LedgerHandle ledgerHandle, String msg){ try { return ledgerHandle.addEntry(msg.getBytes()); } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } return -1; }
从 Ledger 读取数据
从 Ledger 读取数据时,也是通过 LedgerHandle 实例的方法实现,提供了以下三种方法:
- 指定读取的 entry.id 范围消费;
- 从某一个 entry.id 一直读取到 LAC (LastAddConfirmed,该 Ledger 中最近的已经确认的数据)位置;
- 从某一个 entry.id 一直读取到 lastEntryIdExpectedToRead 位置,该位置可以比 LAC 大,前提是需要该值已经有对应的数据;
方法实现如下:
/** * read entry from startId to endId * * @param ledgerHandle the ledger * @param startId start entry id * @param endId end entry id * @return the entries, if occur exception, return null */ public Enumeration<LedgerEntry> readEntry(LedgerHandle ledgerHandle,int startId, int endId){ try { return ledgerHandle.readEntries(startId, endId); } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } return null; } /** * read entry from 0 to the LAC * * @param ledgerHandle the ledger * @return the entries, if occur exception, return null */ public Enumeration<LedgerEntry> readEntry(LedgerHandle ledgerHandle){ try { return ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed()); } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } return null; } /** * read entry form 0 to lastEntryIdExpectedToRead which can larger than the LastAddConfirmed range * * @param ledgerHandle the handle * @param lastEntryIdExpectedToRead the last entry id * @return the entries, if occur exception, return null */ public Enumeration<LedgerEntry> readEntry(LedgerHandle ledgerHandle, long lastEntryIdExpectedToRead) { try { return ledgerHandle.readUnconfirmedEntries(0, lastEntryIdExpectedToRead); } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } return null; }
删除 Ledger
Ledger 的删除实现也很简洁,如下所示:
/** * delete the ledger * * @param ledgerId the ledger id * @return if occur exception, return false */ public boolean deleteLedger(long ledgerId){ try { bkClient.deleteLedger(ledgerId); return true; } catch (Exception e) { e.printStackTrace(); } return false; }
The Ledger Advanced API
Ledger 的 Advanced API 在用法上与上面的实现差异不大,它向应用提供了更大的灵活性,比如:在创建 Ledger 时,应用可以指定 LedgerId,写入 Entry 时,应用也可以指定相应的 EntryID。
新建 Ledger
在新建 Ledger 这部分,Advanced API 可以指定 LedgerId 创建相应的 Ledger,如下面示例的第三种实现。
假设当前 BK 集群的 LedgerId 已经到了5,这时候在新建 Ledger 时如果不指定 LedgerId,下一个被使用的 LedgerId 就是6,如果应用指定了 7,新建的 Leader 的 id 将会是设置的 7,id 6 会等待下次再被使用。
/** * create the ledger * * @param password pw * @return LedgerHandleAdv */ public LedgerHandleAdv createLedger(String password){ byte[] passwd = password.getBytes(); try { LedgerHandleAdv handle = (LedgerHandleAdv) bkClient.createLedgerAdv( 3, 3, 2, // replica settings BookKeeper.DigestType.CRC32, passwd); return handle; } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return null; } /** * create the ledger async * * @param password */ public void createLedgerAsync(String password){ class LedgerCreationCallbackimplements AsyncCallback.CreateCallback{ public void createComplete(int returnCode, LedgerHandle handle, Object ctx){ System.out.println("Ledger successfully created"); } } bkClient.asyncCreateLedgerAdv( 3, // ensemble size 3, // write quorum size 2, // ack quorum size BookKeeper.DigestType.CRC32, password.getBytes(), new LedgerCreationCallback(), "some context", null); } /** * create the ledger on special ledgerId * * @param password pw * @param ledgerId the ledger id, if the ledger id exist, it will return BKLedgerExistException * @return LedgerHandleAdv */ public LedgerHandleAdv createLedger(String password,long ledgerId){ byte[] passwd = password.getBytes(); try { LedgerHandleAdv handle = (LedgerHandleAdv) bkClient.createLedgerAdv( ledgerId, 3, 3, 2, // replica settings BookKeeper.DigestType.CRC32, passwd, null); return handle; } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return null; }
向 Ledger 添加 Entry
向 Ledger 添加 Entry API 中,最吸引我的是可以指定 EntryId 写入(熟悉 Kafka 的同学知道,向 Kafka 写入数据是可以指定 Partition,但是不能指定 offset,如果可以指定 offset 写入,那么在做容灾时就可以实现 topic 的完全同步,下游可以根据 commit offset 随时切换数据源),其示例如下(注意,Advanced API 在写数据时是强制要指定 entryId 的):
/** * add the msg to the ledger on the special entryId * * @param ledgerHandleAdv ledgerHandleAdv * @param entryId the entry id * @param msg msg */ public void addEntry(LedgerHandleAdv ledgerHandleAdv,long entryId, String msg){ try { ledgerHandleAdv.addEntry(entryId, msg.getBytes()); } catch (InterruptedException e) { e.printStackTrace(); } catch (BKException e) { e.printStackTrace(); } }
关于这个 API,社区官方文档有如下介绍:
- The entry id has to be non-negative.
- Clients are okay to add entries out of order.
- However, the entries are only acknowledged in a monotonic order starting from 0.
首先,说下我对上面的理解:entry.id 要求是非负的,client 在添加 entry 时可以乱序,但是 entry 只有 0 开始单调顺序增加时才会被 ack。最开始,我以为是只要 entry.id 单调递增就可以,跑了一个测试用例,第一个 entry 的 id 设置为 0,第二个设置为 2,然后程序直接 hang 在那里了,相应日志信息为:
2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:662 ] - [ DEBUG ] Got Add response from bookie:XXX.230:3181 rc:EOK, ledger:8:entry:0 2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:663 ] - [ DEBUG ] Got Add response from bookie:XXX.247:3181 rc:EOK, ledger:8:entry:0 2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:663 ] - [ DEBUG ] Submit callback (lid:8, eid: 0). rc:0 2018-10-19 16:58:34 [ main:663 ] - [ DEBUG ] Adding entry [50, 32, 109, 97, 116, 116, 32, 116, 101, 115, 116] 2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ] Got Add response from bookie:XXX.247:3181 rc:EOK, ledger:8:entry:2 2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ] Got Add response from bookie:XXX.230:3181 rc:EOK, ledger:8:entry:2 2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ] Head of the queue entryId: 2 is not the expected value: 1 2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ] Got Add response from bookie:XXX.146:3181 rc:EOK, ledger:8:entry:0 2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ] Head of the queue entryId: 2 is not the expected value: 1 2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:681 ] - [ DEBUG ] Got Add response from bookie:XXX.146:3181 rc:EOK, ledger:8:entry:2 2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:681 ] - [ DEBUG ] Head of the queue entryId: 2 is not the expected value: 1 2018-10-19 16:58:37 [ main-SendThread(zk01:2181):3702 ] - [ DEBUG ] Got ping response for sessionid: 0x3637dbff9e7486c after 0ms 2018-10-19 16:58:40 [ main-SendThread(zk01:2181):7039 ] - [ DEBUG ] Got ping response for sessionid: 0x3637dbff9e7486c after 0ms 2018-10-19 16:58:43 [ main-SendThread(zk01:2181):10374 ] - [ DEBUG ] Got ping response for sessionid: 0x3637dbff9e7486c after 0ms 2018-10-19 16:58:47 [ main-SendThread(zk01:2181):13710 ] - [ DEBUG ] Got ping response for sessionid: 0x3637dbff9e7486c after 0ms 2018-10-19 16:58:50 [ main-SendThread(zk01:2181):17043 ] - [ DEBUG ] Got ping response for sessionid: 0x3637dbff9e7486c after 0ms
可以看到有这样的异常日志 Head of the queue entryId: 2 is not the expected value: 1
,期望的 entry id 是 1,这里是 2,乱序了,导致程序直接 hang 住(hang 住的原因推测是这个 Entry 没有被 ack),该异常信息出现地方如下:
void sendAddSuccessCallbacks(){ // Start from the head of the queue and proceed while there are // entries that have had all their responses come back PendingAddOp pendingAddOp; while ((pendingAddOp = pendingAddOps.peek()) != null && blockAddCompletions.get() == 0) { if (!pendingAddOp.completed) { if (LOG.isDebugEnabled()) { LOG.debug("pending add not completed: {}", pendingAddOp); } return; } // Check if it is the next entry in the sequence. if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != pendingAddsSequenceHead + 1) { if (LOG.isDebugEnabled()) { LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId, pendingAddsSequenceHead + 1); } return; } pendingAddOps.remove(); explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed); pendingAddsSequenceHead = pendingAddOp.entryId; if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) { this.lastAddConfirmed = pendingAddsSequenceHead; } pendingAddOp.submitCallback(BKException.Code.OK); } }
如果 entry id 出现了乱序,会导致这个 add 操作没有正常处理。但是如果这里强制要求 entry.id 从 0,而还有序,那么这个 API 跟前面的 API 有什么区别?这点没有搞懂,也向社区发一封邮件咨询,还在等待社区的响应。
以上所述就是小编给大家介绍的《BookKeeper 集群搭建及使用》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Elasticsearch 集群搭建和集群原理
- Zookeeper学习系列【二】Zookeeper 集群章节之集群搭建
- Spark集群环境搭建
- Zookeeper搭建集群
- FastDFS集群搭建
- Zookeeper集群环境搭建
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python高效开发实战——Django、Tornado、Flask、Twisted(第2版)
刘长龙 / 电子工业出版社 / 2019-1 / 99
也许你听说过全栈工程师,他们善于设计系统架构,精通数据库建模、通用网络协议、后端并发处理、前端界面设计,在学术研究或工程项目上能独当一面。通过对Python 3及相关Web框架的学习和实践,你就可以成为这样的全能型人才。 《Python高效开发实战——Django、Tornado、Flask、Twisted(第2版)》分为3篇:上篇是Python基础,带领初学者实践Python开发环境,掌握......一起来看看 《Python高效开发实战——Django、Tornado、Flask、Twisted(第2版)》 这本书的介绍吧!
MD5 加密
MD5 加密工具
Markdown 在线编辑器
Markdown 在线编辑器