2021SC@SDUSC HBase(十一)项目代码分析——snapshot

2021SC@SDUSC

目录

一、简述

snapshot是很多存储系统和数据库系统都支持的功能。一个snapshot是一个全部文件系统或者某个目录在某一时刻的镜像。

二、基础原理

实现数据文件镜像最简单粗暴的方式是加锁拷贝(之所以需要加锁,是因为镜像得到的数据必须是某一时刻完全一致的数据),拷贝的这段时间不允许对原数据进行任何形式的更新删除,仅提供只读操作,拷贝完成之后再释放锁。这种方式涉及数据的实际拷贝,数据量大的情况下必然会花费大量时间,长时间的加锁拷贝必然导致客户端长时间不能更新删除,这是生产线上不能容忍的。
snapshot机制并不会拷贝数据,可以理解为它是原数据的一份指针。在HBase这种LSM类型系统结构下是比较容易理解的,我们知道HBase数据文件一旦落到磁盘之后就不再允许更新删除等原地修改操作,如果想更新删除的话可以追加写入新文件(HBase中根本没有更新接口,删除命令也是追加写入)。这种机制下实现某个表的snapshot只需要给当前表的所有文件分别新建一个引用(指针),其他新写入的数据重新创建一个新文件写入即可。
2021SC@SDUSC HBase(十一)项目代码分析——snapshot
snapshot流程主要涉及3个步骤:

  1. 加一把全局锁,此时不允许任何的数据写入更新以及删除
  2. 将Memstore中的缓存数据flush到文件中(可选)
  3. 为所有HFile文件分别新建引用指针,这些指针元数据就是snapshot

三、实现

Snapshot的过程类似于两阶段提交,大体过程是,HMaster收到snapshot命令后,作为coordinator,然后从meta region中取出Photo表的region和对应的region server的信息,这些region server就作为两阶段提交的participant,prepare阶段就相当于对region server本地的Photo表的region做快照存入HDFS的临时目录,commit阶段其实就是HMaster把临时目录改成正确的目录。期间,HMaster和region server的数据共享通过ZK来完成。
2021SC@SDUSC HBase(十一)项目代码分析——snapshot

snapshot的命令:

hbase> snapshot 'sync_stage:Photo', 'PhotoSnapshot' 

对sync_stage这个namespace下的Photo表做一次snapshot(表只有一个column family,叫做PHOTO),snapshot名字叫做PhotoSnapshot

public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
snapshot = snapshot.toBuilder().setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION)
.build();
// if the table is enabled, then have the RS run actually the snapshot work
TableName snapshotTable = TableName.valueOf(snapshot.getTable());
AssignmentManager assignmentMgr = master.getAssignmentManager();
if (assignmentMgr.getZKTable().isEnabledTable(snapshotTable)) {
snapshotEnabledTable(snapshot);
}
else if (assignmentMgr.getZKTable().isDisabledTable(snapshotTable)) {
snapshotDisabledTable(snapshot);
} else {
throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
}
}

清空之前完成的备份和恢复的任务 cleanupSentinels();并设置snapshot的版本
(assignmentMgr.getZKTable().isEnabledTable(snapshotTable)) 根据表的状态选择snapshot的类型。

启用的表:

private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
throws HBaseSnapshotException {
// snapshot准备工作
prepareToTakeSnapshot(snapshot);
// new一个handler
EnabledTableSnapshotHandler handler =
new EnabledTableSnapshotHandler(snapshot, master, this);
//通过handler线程来备份
snapshotTable(snapshot, handler);
}

进行准备,并通过handler线程备份

handler.prepare();
this.executorService.submit(handler);
this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);

EnabledTableSnapshotHandler是继承TakeSnapshotHandler的,任务入口函数在TakeSnapshotHandler的process()方法,prepare方法和process方法都一样,区别在于snapshotRegions方法被重写了。
看prepare方法还是检查表的定义文件在不在

SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, this.fs);
new TableInfoCopyTask(monitor, snapshot, fs, rootDir).call();
monitor.rethrowException();
List> regionsAndLocations =
MetaReader.getTableRegionsAndLocations(this.server.getCatalogTracker(),
snapshotTable, false);
snapshotRegions(regionsAndLocations);
Set serverNames = new HashSet();
for (Pair p : regionsAndLocations) {
if (p != null && p.getFirst() != null && p.getSecond() != null) {
HRegionInfo hri = p.getFirst();
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
serverNames.add(p.getSecond().toString());
}
}
status.setStatus("Verifying snapshot: " + snapshot.getName());
verifier.verifySnapshot(this.workingDir, serverNames);
completeSnapshot(this.snapshotDir, this.workingDir, this.fs);

写一个.snapshotinfo文件到工作目录下,把表的定义信息写一份到工作目录下,即.tabledesc文件。接下来查找和表相关的Region Server和机器,开始备份,并检验snapshot的结果。确认没问题了,就把临时目录rename到正式目录。

Procedure proc = coordinator.startProcedure(this.monitor, this.snapshot.getName(),
this.snapshot.toByteArray(), Lists.newArrayList(regionServers));
try {
Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
for (Pair region : regions) {
HRegionInfo regionInfo = region.getFirst();
if (regionInfo.isOffline() && (regionInfo.isSplit() || regionInfo.isSplitParent())) {
if (!fs.exists(new Path(snapshotDir, regionInfo.getEncodedName()))) {
LOG.info("Take disabled snapshot of offline region=" + regionInfo);
snapshotDisabledRegion(regionInfo);
}
}
}

这是EnabledTableSnapshotHandler方法里的,Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);等待完成 proc.waitForCompleted()`,并备份split过的region

Procedure proc = createProcedure(fed, procName, procArgs,expectedMembers);
if (!this.submitProcedure(proc)) {
LOG.error("Failed to submit procedure '" + procName + "'");
return null;
}

进入ProcedureCoordinatorstartProcedure,先创建Procedure,然后提交它

final public Void call() {
try {
sendGlobalBarrierStart();
waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
sendGlobalBarrierReached();
waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
} finally {
sendGlobalBarrierComplete();
completedLatch.countDown();
}}

call()是在acquired节点下面建立实例节点。waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");等待所有的rs回复,sendGlobalBarrierReached();在reached节点下面建立实例节点,waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");等待所有的rs回复

final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List nodeNames)
throws IOException, IllegalArgumentException {
String procName = proc.getName();
String abortNode = zkProc.getAbortZNode(procName);
try {
if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
abort(abortNode);
}
} catch (KeeperException e) {throw new IOException("Failed while watching abort node:" + abortNode, e);
}
String acquire = zkProc.getAcquiredBarrierNode(procName);try {
byte[] data = ProtobufUtil.prependPBMagic(info);
ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
for (String node : nodeNames) {
String znode = ZKUtil.joinZNode(acquire, node);if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
coordinator.memberAcquiredBarrier(procName, node);
}
}
} catch (KeeperException e) {
throw new IOException("Failed while creating acquire node:" + acquire, e);
}
}

1、首先是检查abortNode(每个procName在zk下面都有一个对应的节点,比如snapshot,然后在procName下面又分了acquired、reached、abort三个节点。检查abort节点下面有没有当前的实例。)
2、在acquired节点为该实例创建节点,创建完成之后,在该实例节点下面监控各个Region Server的节点。如果发现已经有了,就更新Procedure中的acquiringMembers列表和inBarrierMembers,把节点从acquiringMembers中删除,然后添加到inBarrierMembers列表当中。
3、到这一步服务端的工作就停下来了,等到所有RS接受到指令之后在acquired节点下创建节点。
4、收到所有RS的回复之后,它才会开始在reached节点创建实例节点,然后继续等待。
5、RS完成任务之后,在reached的实例节点下面创建相应的节点,然后回复。
6、在确定所有的RS都完成工作之后,清理zk当中的相应proName节点。
注意:在这个过程当中,有任务的错误,都会在abort节点下面建立该实例的节点,RS上面的子过程一旦发现abort存在该节点的实例,就会取消该过程。

public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
throws KeeperException {
this.zkController = new ZKProcedureUtil(watcher, procType) {
@Override
public void nodeCreated(String path) {
if (!isInProcedurePath(path)) {
return;
}
String parent = ZKUtil.getParent(path);
// if its the end barrier, the procedure can be completed
if (isReachedNode(parent)) {
receivedReachedGlobalBarrier(path);
return;
} else if (isAbortNode(parent)) {
abort(path);
return;
} else if (isAcquiredNode(parent)) {
startNewSubprocedure(path);
} else {
LOG.debug("Ignoring created notification for node:" + path);
}
}
};
}

Snapshot在Region Server是由RegionServerSnapshotManager类里面的ProcedureMemberRpcs负责监测snapshot下面的节点变化,当发现acquired下面有实例之后,启动新任务。

public Subprocedure createSubprocedure(String opName, byte[] data) {
    return builder.buildSubprocedure(opName, data);
  }

当检测节点增加后,会调用ProcedureMember的以上方法来创建SubProcedure,这里的builder是SnapshotSubprocedureBuilder,它的buildSubprocedure()会创建FlushSnapshotSubprocedure类型的subprocedureFlushSnapshotSubprocedure有一个名为regions的成员变量,这里会进行初始化,从region server的online regions列表中检查是否有被snapshot表的region,如果有,则初始化regions,否则regions为空。同样,这个subprocedure会提交给内部的线程池处理.FlushSnapshotSubprocedure继承于Subprocedure,它是一个callable,入口函数是call。

final public Void call() {
try {
waitForReachedGlobalBarrier();
//...
} catch (Exception e) {
} finally {
releasedLocalBarrier.countDown();
}
}

acquireBarrier();在acquired的实例节点下面建立rs的节点 rpcs.sendMemberAcquired(this);,等待reached的实例节点的建立
接下来insideBarrier();rpcs.sendMemberCompleted(this);
可以看出,只有reached相应节点建立,region server才可以往下走进行实际的snapshot操作,而reached节点的建立只有HMaster看到所有的相关的region server都已经acquire了任务后才会去建立,这就达到了同步的目的。

for (HRegion region : regions) {
taskManager.submitTask(new RegionSnapshotTask(region));
}

insideBarrier的实现在FlushSnapshotSubprocedure这个类里面,调用了flushSnapshot(),这个方法给每个region都开一个线程去提交。

public void addRegionToSnapshot(SnapshotDescription desc,
ForeignExceptionSnare exnSnare) throws IOException {
Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
this.fs.getFileSystem(), snapshotDir, getRegionInfo());
for (Store store : stores.values()) {
Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
List storeFiles = new ArrayList(store.getStorefiles());
for (int i = 0; i < sz; i++) {
StoreFile storeFile = storeFiles.get(i);
Path file = storeFile.getPath();
Path referenceFile = new Path(dstStoreDir, file.getName());
boolean success = true;
if (storeFile.isReference()) {
storeFile.getFileInfo().getReference().write(fs.getFileSystem(), referenceFile);
} else {
  success = fs.getFileSystem().createNewFile(referenceFile);
}
if (!success) {
throw new IOException("Failed to create reference file:" + referenceFile);
}
}
}
}
  1. 在工作目录创建region目录和写入region的信息
  2. 为hfile创建引用
    2.1. 分列族为store创建引用目录,每个store属于不同的列族
    2.2. 遍历hfile,然后创建引用 int sz = storeFiles.size();
    把旧的引用文件的内容写入到新的引用文件当中,getFileSystem()创建一个空的引用文件
    在工作目录在.hbase-snapshot/.tmps/snapshotName/region/familyName/下面给hfile创建引用文件。在创建引用文件的时候,还要先判断一下这个所谓的hfile是不是真的hfile,还是它本身就是一个引用文件了。
    如果已经是引用文件的话,把旧的引用文件里面的内容写入到新的引用文件当中。
    如果是一个正常的hfile的话,就创建一个空的引用文件即可,以后我们可以通过它的名字找到它在snapshot下面相应的文件。
    此时,每个RS的工作已完成。

被禁用的表:

备份禁用的表,区别是snapshotRegions方法,但是方法除了做一些准备工作之外,就是snapshotDisabledRegion,与上文类似。

四、功能

1、全量/增量备份:任何数据库都需要有备份的功能来实现数据的高可靠性,snapshot可以非常方便的实现表的在线备份功能,并且对在线业务请求影响非常小。使用备份数据,用户可以在异常发生的情况下快速回滚到指定快照点。增量备份会在全量备份的基础上使用binlog进行周期性的增量备份。
2.、数据迁移:可以使用ExportSnapshot功能将快照导出到另一个集群,实现数据的迁移

上一篇:快照(snapshot)和订阅(subscribe)的效果示意


下一篇:error: RPC failed; HTTP 400 curl 22 The requested URL returned error: 400 Fa...