BookKeeper背景
BK是一个可靠的日志流记录系统,用于将系统产生的日志(也可以是其他数据)记录在BK集群上,由BK这个第三方Storage保证数据存储的可靠和一致性。典型场景是系统写write-ahead log,即先把log写到BK上,再对log做处理,比如将log写到内存的数据结构中。BookKeeper同时适用于任何单点写入并要求保证高性能和数据不丢失(Strong Durabilty Guarantees)的场景。
BK诞生于Hadoop2.0的namenode HA。在Hadoop中,出于故障恢复的考虑,Namenode在对它的记录做修改前都会先将本条修改的日志写到磁盘上。但是这里有一个潜在问题,当Namenode发生故障时,很可能连本地磁盘也不能访问,这时之前的记录的日志也就没用了。基于上述考虑,可以将Namenode的日志信息保存在一个可靠的外部Storage中。最初业界通过NFS这样的Share Storage来实现日志同步。之所以选择NFS,一方面因为可以很方便地实现数据共享,另外一方面是因为NFS相对稳定成熟。虽然如此,NFS也有缺点不能满足HDFS的在线存储业务:网络单点及其存储节点单点。为了满足共享日志的高可用性,社区引入了BK。除此之外还有默认的HA方案:QJM。Hadoop2.0 Namenode HA的介绍可以参考我之前的博文:Hadoop2.0 Namenode HA实现方案介绍及汇总。
BookKeeper介绍
BK带有多个读写日志的server,称为 bookies。每一个bookie是一个bk的存储服务,存储了写到bk上的write-ahead日志,及其数据内容。写入的log流(称它为流是因为BK记录的是byte[])称为 ledgers,一个ledger是一个日志文件,每个日志单元叫 ledger entry,也就是bookies是存ledgers的。ledger只支持append操作,而且同时只能有一个单线程来写。ZK充当BK的元数据存储服务,在zk中会存储ledger相关的元数据,包括当前可用的bookies,ledger分布的位置等。
BK通过读写多个存储节点达到高可用性,同时为了恢复由于异常造成的多节点数据不一致性,引入了数据一致性算法。BK的可用性还体现在只要有足够多的bookies可用,整个服务就可用。实际上,一份entry的写入需要确保N份日志冗余在N个bookie上写成功,而我们需要>N个bookie提供服务。在启动BK的时候,需要指定一个ensemble值,即bookie可用的最小节点数量,还需要指定一个quorums值,即日志写入bk服务端的冗余份数。BK的可靠性体现在服务有多个备份,entry的记录也是冗余的。BK的可扩展性体现在可以增加bookie服务的定额数目,同时增加server数据可以一定程度提高吞吐量。
Ledger在BK中扮演了很重要的角色,其相关操作及其作用如下:
- CreateLedger:创建一个空的ledger,此时会在zk中存储相关元数据;
- AddEntry:添加一个记录到ledger中,如果客户端失败或者ledger已经关闭,则不能再追加entry;
- openLedger:开始读取数据前,必须先打开ledger,如果某ledger处于未关闭,不能读取相关数据,如果有异常,需先恢复;
- readEntries:读取ledger中的entry
从编码角度讲,操纵entry读写的类为LedgerHandle,LedgerHandle对应一个可以被client读写entry的ledger。下面是创建ledgerHandle并读写entry的例子。
ClientConfiguration conf = new ClientConfiguration(); conf.setZkServers("zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181"); BookKeeper client = new BookKeeper(conf); LedgerHandle lh = client.createLedger(3, 2, DigestType.CRC32, "foobar"); lh.addEntry("Hello World!".getBytes()); lh.close(); LedgerHandle lh2 = client.openLedger(1, DigestType.CRC32, "foobar"); long lastEntry = lh2.getLastAddConfirmed(); Enumeration<LedgerEntry> entries = lh2.readEntries(0, 9); while (entries.hasMoreElements()) { byte[] bytes = entries.nextElement().getEntry(); System.out.println(new String(bytes)); }更多BK文档可以参考官网文档。
BookKeeper in HDFS
Hdfs有两个抽象类提供对EditLog的读出和写回:EditLogOutputStream(以下简称ELOS)和EditLogInputStream(以下简称ELIS)。同时还有一个JournalManager接口,负责管理EditLog的可靠存取。它的实现包括QJM(QuorumJournalManager)和BKJM(BookKeeperJournalManager)。
写日志
对于hdfs而言,主节点写的每一个日志对象为BK的entry,entry的集合组成一个ledger,每一个日志段对应一个ledger,相同日志段追加edits即为向ledger追加entry。Ledger有一个递增的ledgerId,entry也有递增的entryId,每个entryId对应一个txId。
ELOS使用write()将FSEditLogOp往外写,对应的BookKeeperEditLogOutputStream的实现为:
@Override public void write(FSEditLogOp op) throws IOException { writer.writeOp(op); if (bufCurrent.getLength() > transmissionThreshold) { transmit(); } }
BookKeeperEditLogOutputStream内部有一个buffer,每次调用write()写FSEditLogOp的时候,会由一个Writer将此次FSEditLogOp写入buffer,当buffer长度达到门槛值时,进行transmit操作:把buffer里的editLog发送到BK上,代码如下:
/** * Transmit the current buffer to bookkeeper. * Synchronised at the FSEditLog level. #write() and #setReadyToFlush() * are never called at the same time. */ private void transmit() throws IOException { if (!transmitResult.compareAndSet(BKException.Code.OK, BKException.Code.OK)) { throw new IOException("Trying to write to an errored stream;" + " Error code : (" + transmitResult.get() + ") " + BKException.getMessage(transmitResult.get())); } if (bufCurrent.getLength() > 0) { byte[] entry = Arrays.copyOf(bufCurrent.getData(), bufCurrent.getLength()); lh.asyncAddEntry(entry, this, null); bufCurrent.reset(); outstandingRequests.incrementAndGet(); } }
lh为BK的LedgerHandle,asyncAddEntry方法异步将entry写往一个open状态的ledger。这就是一个简单的把Editlog写往BK的过程。
BKJM简单写的代码如下:
public void testSimpleWrite() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); String zkpath = bkjm.finalizedLedgerZNode(1, 100); }
BKJM的startLogSegment(txId)将产生一个新的ledger,对应一个新的日志段,该日志段状态为接收写入日志的状态。创建ledger之前有一些校验工作
if (txId <= maxTxId.get()) { throw new IOException("We‘ve already seen " + txId + ". A new stream cannot be created with it"); } try { String existingInprogressNode = ci.read(); if (null != existingInprogressNode && zkc.exists(existingInprogressNode, false) != null) { throw new IOException("Inprogress node already exists"); } if (currentLedger != null) { // bookkeeper errored on last stream, clean up ledger currentLedger.close(); } currentLedger = bkc.createLedger(ensembleSize, quorumSize, BookKeeper.DigestType.MAC, digestpw.getBytes()); } catch (BKException bke) { throw new IOException("Error creating ledger", bke); } catch (KeeperException ke) { throw new IOException("Error in zookeeper while creating ledger", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IOException("Interrupted creating ledger", ie); }
Ledger的创建还对应一个新的EditLogLedgerMetadata,该类记录这个日志段的元信息,包括zkPath,ledgerId,开始和结束txId等,在读取ledger里的日志内容的时候需要这些元数据信息。
BKJM的finalizeLogSegment()将文件由正在写入日志的状态转化为不接收写日志的状态。BKJM会create ledger,delete ledger,open ledger,这里的ledger即LedgerHandler类,它对每个ledger entry进行读写操作。
写日志总体流程
ZK作为BK的元数据服务器,里面存储了哪些bookie服务是可用的,同时也记录了目前系统有哪些ledger,及其ledger相关信息,如该ledger数据存储在哪些机器上,及其该ledger起始,结束entryid等。Bookie节点存储实际的数据,及其数据的读写服务。
写操作由主节点来完成,当主节点调用setReadyToFlush操作,会调用RPC同时向N(N=quorums)个bookie节点写,flush异步等待响应。
主节点对bk的操作,其实就是对ledger的操作,在开始向bk服务写数据前,首先需要打开ledger,打开ledger就会与配置的所有bookie节点建立连接;打开连接后,数据以entry为单位以RR算法选择向N(N=quorums)个bookie节点写entry数据,并且异步地等待结果返回,有任何一个bookie写入失败,则需要重新选择一个bookie写入失败的副本。
当bookie服务端接收到写入数据后,首先会写日志,然后根据同步或者异步算法将数据同步到磁盘上。写入数据过程中,首先会写入log文件,写入的内容包含ledgerid,entryid,EntrySize,LastConfirmed,及其真实数据内容。然后在相应ledger文件中记录下entryid,及其该entry所在的日志文件,偏移量等。读日志
读日志相比写日志过程,相对简单一些。同样,读日志过程也支持高可用。BKJM通过selectInputStreams方法读出一个范围内的ELIS集合,每个ELIS是BookKeeperEditLogInputStream类,new BookKeeperEditLogInputStream需要得到一个EditLogLedgerMetadata,并打开对应的ledger。具体BookKeeperEditLogInputStream类里的内容就不详细说明了。
@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk, boolean forReading) throws IOException { List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId, inProgressOk); try { BookKeeperEditLogInputStream elis = null; for (EditLogLedgerMetadata l : currentLedgerList) { long lastTxId = l.getLastTxId(); if (l.isInProgress()) { lastTxId = recoverLastTxId(l, false); } // Check once again, required in case of InProgress and is case of any // gap. if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) { LedgerHandle h; if (l.isInProgress()) { // we don‘t want to fence the current journal h = bkc.openLedgerNoRecovery(l.getLedgerId(), BookKeeper.DigestType.MAC, digestpw.getBytes()); } else { h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC, digestpw.getBytes()); } elis = new BookKeeperEditLogInputStream(h, l); elis.skipTo(fromTxId); } else { // If mismatches then there might be some gap, so we should not check // further. return; } streams.add(elis); if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) { return; } fromTxId = elis.getLastTxId() + 1; } } catch (BKException e) { throw new IOException("Could not open ledger for " + fromTxId, e); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IOException("Interrupted opening ledger for " + fromTxId, ie); } }
首先选择日志文件,建立输入流。从节点触发消化日志后,首先会查询ZK,获取到主节点写入ZK的edits元数据信息(不包含inprocess状态的edits元数据),这个元数据包含日志段的startTxid,lastTxid,ledgerID,同时也会打开相应的ledger,并获取其元数据,如ledger的quorumSize,ensembleSize,lastEntryId等,同时按照txid先后顺序对ledger进行排序,放入输入流集合。需要强调的是,当打开ledger时,会检查其entry副本之间的一致性,如果不一致需恢复。
准备好输入流以后,开始消化日志,依次操作输入流集合的ledgers,读取每个ledger内的entry:
- 通过查询ledger元数据,同时通过RR算法确定该entry存储在哪几个bookies;
- 尝试从bookies集合的第一个bookie服务读取entry,如果成功,该entry就读取成功,如果失败,转入第3步;
- 尝试从bookies集合的第二个bookie服务读取entry,如果成功,该entry就读取成功,如果失败,依次类推,如果尝试读取完所有的bookies均失败,则该entry读取失败;
恢复
BKJM还有恢复机制,相关接口有recoverUnfinalizedSegments(),recoverLastTxId()。Bookie数据恢复检查通过定时或者人工发起,集群数据修复流程:
- 通过zk查询到ledger元数据;
- 通过元数据,查询相关bookie中存储的ledger的entry是否完整;
- 如果查询到存储在某bookie上的entry不完整,则需要进入数据恢复流程;
- 首先从bk服务端读取到ledger相关的entry,然后将其写到需要恢复entry的某bookie服务端;
- Ledger数据恢复完成后,需要更新ledger的segment相关元数据。
总结
本文首先介绍了BookKeeper的背景和使用场景,然后简单介绍了BK的主要部件及使用方法,最后粗略地分析了hadoop2.0 namenode BKJM的HA实现,介绍了EditLog写入和读出BK的过程。通过阅读hadoopBKJM部分的代码,帮助学习怎样在自己的系统里加入BookKeeper,让BK来保证日志的可靠和容灾恢复等功能。