2021SC@SDUSC
目录
一、简述
snapshot是很多存储系统和数据库系统都支持的功能。一个snapshot是一个全部文件系统或者某个目录在某一时刻的镜像。
二、基础原理
实现数据文件镜像最简单粗暴的方式是加锁拷贝(之所以需要加锁,是因为镜像得到的数据必须是某一时刻完全一致的数据),拷贝的这段时间不允许对原数据进行任何形式的更新删除,仅提供只读操作,拷贝完成之后再释放锁。这种方式涉及数据的实际拷贝,数据量大的情况下必然会花费大量时间,长时间的加锁拷贝必然导致客户端长时间不能更新删除,这是生产线上不能容忍的。
snapshot机制并不会拷贝数据,可以理解为它是原数据的一份指针。在HBase这种LSM类型系统结构下是比较容易理解的,我们知道HBase数据文件一旦落到磁盘之后就不再允许更新删除等原地修改操作,如果想更新删除的话可以追加写入新文件(HBase中根本没有更新接口,删除命令也是追加写入)。这种机制下实现某个表的snapshot只需要给当前表的所有文件分别新建一个引用(指针),其他新写入的数据重新创建一个新文件写入即可。
snapshot流程主要涉及3个步骤:
- 加一把全局锁,此时不允许任何的数据写入更新以及删除
- 将Memstore中的缓存数据flush到文件中(可选)
- 为所有HFile文件分别新建引用指针,这些指针元数据就是snapshot
三、实现
Snapshot的过程类似于两阶段提交,大体过程是,HMaster收到snapshot命令后,作为coordinator,然后从meta region中取出Photo表的region和对应的region server的信息,这些region server就作为两阶段提交的participant,prepare阶段就相当于对region server本地的Photo表的region做快照存入HDFS的临时目录,commit阶段其实就是HMaster把临时目录改成正确的目录。期间,HMaster和region server的数据共享通过ZK来完成。
snapshot的命令:
hbase> snapshot 'sync_stage:Photo', 'PhotoSnapshot'
对sync_stage这个namespace下的Photo表做一次snapshot(表只有一个column family,叫做PHOTO),snapshot名字叫做PhotoSnapshot
public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
snapshot = snapshot.toBuilder().setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION)
.build();
// if the table is enabled, then have the RS run actually the snapshot work
TableName snapshotTable = TableName.valueOf(snapshot.getTable());
AssignmentManager assignmentMgr = master.getAssignmentManager();
if (assignmentMgr.getZKTable().isEnabledTable(snapshotTable)) {
snapshotEnabledTable(snapshot);
}
else if (assignmentMgr.getZKTable().isDisabledTable(snapshotTable)) {
snapshotDisabledTable(snapshot);
} else {
throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
}
}
清空之前完成的备份和恢复的任务 cleanupSentinels();
并设置snapshot的版本(assignmentMgr.getZKTable().isEnabledTable(snapshotTable))
根据表的状态选择snapshot的类型。
启用的表:
private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
throws HBaseSnapshotException {
// snapshot准备工作
prepareToTakeSnapshot(snapshot);
// new一个handler
EnabledTableSnapshotHandler handler =
new EnabledTableSnapshotHandler(snapshot, master, this);
//通过handler线程来备份
snapshotTable(snapshot, handler);
}
进行准备,并通过handler线程备份
handler.prepare();
this.executorService.submit(handler);
this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
EnabledTableSnapshotHandler
是继承TakeSnapshotHandler
的,任务入口函数在TakeSnapshotHandler
的process()方法,prepare
方法和process
方法都一样,区别在于snapshotRegions方法被重写了。
看prepare方法还是检查表的定义文件在不在
SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, this.fs);
new TableInfoCopyTask(monitor, snapshot, fs, rootDir).call();
monitor.rethrowException();
List> regionsAndLocations =
MetaReader.getTableRegionsAndLocations(this.server.getCatalogTracker(),
snapshotTable, false);
snapshotRegions(regionsAndLocations);
Set serverNames = new HashSet();
for (Pair p : regionsAndLocations) {
if (p != null && p.getFirst() != null && p.getSecond() != null) {
HRegionInfo hri = p.getFirst();
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
serverNames.add(p.getSecond().toString());
}
}
status.setStatus("Verifying snapshot: " + snapshot.getName());
verifier.verifySnapshot(this.workingDir, serverNames);
completeSnapshot(this.snapshotDir, this.workingDir, this.fs);
写一个.snapshotinfo文件到工作目录下,把表的定义信息写一份到工作目录下,即.tabledesc文件。接下来查找和表相关的Region Server和机器,开始备份,并检验snapshot的结果。确认没问题了,就把临时目录rename到正式目录。
Procedure proc = coordinator.startProcedure(this.monitor, this.snapshot.getName(),
this.snapshot.toByteArray(), Lists.newArrayList(regionServers));
try {
Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
for (Pair region : regions) {
HRegionInfo regionInfo = region.getFirst();
if (regionInfo.isOffline() && (regionInfo.isSplit() || regionInfo.isSplitParent())) {
if (!fs.exists(new Path(snapshotDir, regionInfo.getEncodedName()))) {
LOG.info("Take disabled snapshot of offline region=" + regionInfo);
snapshotDisabledRegion(regionInfo);
}
}
}
这是EnabledTableSnapshotHandler方法
里的,Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
等待完成 proc.waitForCompleted()`,并备份split过的region
Procedure proc = createProcedure(fed, procName, procArgs,expectedMembers);
if (!this.submitProcedure(proc)) {
LOG.error("Failed to submit procedure '" + procName + "'");
return null;
}
进入ProcedureCoordinator
的startProcedure
,先创建Procedure,然后提交它
final public Void call() {
try {
sendGlobalBarrierStart();
waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
sendGlobalBarrierReached();
waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
} finally {
sendGlobalBarrierComplete();
completedLatch.countDown();
}}
call()
是在acquired节点下面建立实例节点。waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
等待所有的rs回复,sendGlobalBarrierReached();
在reached节点下面建立实例节点,waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
等待所有的rs回复
final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List nodeNames)
throws IOException, IllegalArgumentException {
String procName = proc.getName();
String abortNode = zkProc.getAbortZNode(procName);
try {
if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
abort(abortNode);
}
} catch (KeeperException e) {throw new IOException("Failed while watching abort node:" + abortNode, e);
}
String acquire = zkProc.getAcquiredBarrierNode(procName);try {
byte[] data = ProtobufUtil.prependPBMagic(info);
ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
for (String node : nodeNames) {
String znode = ZKUtil.joinZNode(acquire, node);if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
coordinator.memberAcquiredBarrier(procName, node);
}
}
} catch (KeeperException e) {
throw new IOException("Failed while creating acquire node:" + acquire, e);
}
}
1、首先是检查abortNode(每个procName在zk下面都有一个对应的节点,比如snapshot,然后在procName下面又分了acquired、reached、abort三个节点。检查abort节点下面有没有当前的实例。)
2、在acquired节点为该实例创建节点,创建完成之后,在该实例节点下面监控各个Region Server的节点。如果发现已经有了,就更新Procedure
中的acquiringMembers
列表和inBarrierMembers
,把节点从acquiringMembers
中删除,然后添加到inBarrierMembers列表当中。
3、到这一步服务端的工作就停下来了,等到所有RS接受到指令之后在acquired节点下创建节点。
4、收到所有RS的回复之后,它才会开始在reached节点创建实例节点,然后继续等待。
5、RS完成任务之后,在reached的实例节点下面创建相应的节点,然后回复。
6、在确定所有的RS都完成工作之后,清理zk当中的相应proName节点。
注意:在这个过程当中,有任务的错误,都会在abort节点下面建立该实例的节点,RS上面的子过程一旦发现abort存在该节点的实例,就会取消该过程。
public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
throws KeeperException {
this.zkController = new ZKProcedureUtil(watcher, procType) {
@Override
public void nodeCreated(String path) {
if (!isInProcedurePath(path)) {
return;
}
String parent = ZKUtil.getParent(path);
// if its the end barrier, the procedure can be completed
if (isReachedNode(parent)) {
receivedReachedGlobalBarrier(path);
return;
} else if (isAbortNode(parent)) {
abort(path);
return;
} else if (isAcquiredNode(parent)) {
startNewSubprocedure(path);
} else {
LOG.debug("Ignoring created notification for node:" + path);
}
}
};
}
Snapshot在Region Server是由RegionServerSnapshotManager
类里面的ProcedureMemberRpcs
负责监测snapshot下面的节点变化,当发现acquired下面有实例之后,启动新任务。
public Subprocedure createSubprocedure(String opName, byte[] data) {
return builder.buildSubprocedure(opName, data);
}
当检测节点增加后,会调用
ProcedureMember的以上方法来创建
SubProcedure,这里的builder是SnapshotSubprocedureBuilder
,它的buildSubprocedure()
会创建FlushSnapshotSubprocedure
类型的subprocedure
,FlushSnapshotSubprocedure
有一个名为regions的成员变量,这里会进行初始化,从region server的online regions列表中检查是否有被snapshot表的region,如果有,则初始化regions,否则regions为空。同样,这个subprocedure
会提交给内部的线程池处理.FlushSnapshotSubprocedure
继承于Subprocedure
,它是一个callable,入口函数是call。
final public Void call() {
try {
waitForReachedGlobalBarrier();
//...
} catch (Exception e) {
} finally {
releasedLocalBarrier.countDown();
}
}
acquireBarrier();
在acquired的实例节点下面建立rs的节点 rpcs.sendMemberAcquired(this);
,等待reached的实例节点的建立
接下来insideBarrier();
、rpcs.sendMemberCompleted(this);
可以看出,只有reached相应节点建立,region server才可以往下走进行实际的snapshot操作,而reached节点的建立只有HMaster看到所有的相关的region server都已经acquire了任务后才会去建立,这就达到了同步的目的。
for (HRegion region : regions) {
taskManager.submitTask(new RegionSnapshotTask(region));
}
insideBarrier的实现在FlushSnapshotSubprocedure这个类里面,调用了flushSnapshot(),这个方法给每个region都开一个线程去提交。
public void addRegionToSnapshot(SnapshotDescription desc,
ForeignExceptionSnare exnSnare) throws IOException {
Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
this.fs.getFileSystem(), snapshotDir, getRegionInfo());
for (Store store : stores.values()) {
Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
List storeFiles = new ArrayList(store.getStorefiles());
for (int i = 0; i < sz; i++) {
StoreFile storeFile = storeFiles.get(i);
Path file = storeFile.getPath();
Path referenceFile = new Path(dstStoreDir, file.getName());
boolean success = true;
if (storeFile.isReference()) {
storeFile.getFileInfo().getReference().write(fs.getFileSystem(), referenceFile);
} else {
success = fs.getFileSystem().createNewFile(referenceFile);
}
if (!success) {
throw new IOException("Failed to create reference file:" + referenceFile);
}
}
}
}
- 在工作目录创建region目录和写入region的信息
- 为hfile创建引用
2.1. 分列族为store创建引用目录,每个store属于不同的列族
2.2. 遍历hfile,然后创建引用int sz = storeFiles.size();
把旧的引用文件的内容写入到新的引用文件当中,getFileSystem()
创建一个空的引用文件
在工作目录在.hbase-snapshot/.tmps/snapshotName/region/familyName/下面给hfile创建引用文件。在创建引用文件的时候,还要先判断一下这个所谓的hfile是不是真的hfile,还是它本身就是一个引用文件了。
如果已经是引用文件的话,把旧的引用文件里面的内容写入到新的引用文件当中。
如果是一个正常的hfile的话,就创建一个空的引用文件即可,以后我们可以通过它的名字找到它在snapshot下面相应的文件。
此时,每个RS的工作已完成。
被禁用的表:
备份禁用的表,区别是snapshotRegions
方法,但是方法除了做一些准备工作之外,就是snapshotDisabledRegion
,与上文类似。
四、功能
1、全量/增量备份:任何数据库都需要有备份的功能来实现数据的高可靠性,snapshot可以非常方便的实现表的在线备份功能,并且对在线业务请求影响非常小。使用备份数据,用户可以在异常发生的情况下快速回滚到指定快照点。增量备份会在全量备份的基础上使用binlog进行周期性的增量备份。
2.、数据迁移:可以使用ExportSnapshot功能将快照导出到另一个集群,实现数据的迁移