直接进入主题吧,上代码。
public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
// 清空之前完成的备份和恢复的任务
cleanupSentinels();
// 设置snapshot的版本
snapshot = snapshot.toBuilder().setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION)
.build();
// if the table is enabled, then have the RS run actually the snapshot work
TableName snapshotTable = TableName.valueOf(snapshot.getTable());
AssignmentManager assignmentMgr = master.getAssignmentManager();
//根据表的状态选择snapshot的类型
if (assignmentMgr.getZKTable().isEnabledTable(snapshotTable)) {
snapshotEnabledTable(snapshot);
}
// 被禁用的表走这个方法
else if (assignmentMgr.getZKTable().isDisabledTable(snapshotTable)) {
snapshotDisabledTable(snapshot);
} else {
throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
}
}
从代码上看得出来,启用的表和被禁用的表走的是两个不同的方法。
Snapshot启用的表
先看snapshotEnabledTable方法吧,看看在线的表是怎么备份的。
private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
throws HBaseSnapshotException {
// snapshot准备工作
prepareToTakeSnapshot(snapshot);
// new一个handler
EnabledTableSnapshotHandler handler =
new EnabledTableSnapshotHandler(snapshot, master, this);
//通过handler线程来备份
snapshotTable(snapshot, handler);
}
这里就两步,先去看看snapshot前的准备工作吧,F3进入prepareToTakeSnapshot方法。这个方法里面也没干啥,就是检查一下是否可以对这个表做备份或者恢复的操作,然后就会重建这个工作目录,这个工作目录在.hbase-snapshot/.tmps下面,每个snapshot都有自己的目录。
在snapshotTable里面把就线程提交一下,让handler来处理了。
handler.prepare();
this.executorService.submit(handler);
this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
这些都不是重点,咱到handler那边去看看吧,EnabledTableSnapshotHandler是继承TakeSnapshotHandler的,prepare方法和process方法都一样,区别在于snapshotRegions方法被重写了。
看prepare方法还是检查表的定义文件在不在,我们直接进入process方法吧。
// 把snapshot的信息写入到工作目录
SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, this.fs);
// 开一个线程去复制表信息文件
new TableInfoCopyTask(monitor, snapshot, fs, rootDir).call();
monitor.rethrowException();
//查找该表相关的region和位置
List<Pair<HRegionInfo, ServerName>> regionsAndLocations =
MetaReader.getTableRegionsAndLocations(this.server.getCatalogTracker(),
snapshotTable, false);
// 开始snapshot
snapshotRegions(regionsAndLocations);
// 获取serverNames列表,后面的校验snapshot用到
Set<String> serverNames = new HashSet<String>();
for (Pair<HRegionInfo, ServerName> p : regionsAndLocations) {
if (p != null && p.getFirst() != null && p.getSecond() != null) {
HRegionInfo hri = p.getFirst();
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
serverNames.add(p.getSecond().toString());
}
}
// 检查snapshot是否合格
status.setStatus("Verifying snapshot: " + snapshot.getName());
verifier.verifySnapshot(this.workingDir, serverNames);
// 备份完毕之后,把临时目录转移到正式的目录
completeSnapshot(this.snapshotDir, this.workingDir, this.fs);
1、写一个.snapshotinfo文件到工作目录下
2、把表的定义信息写一份到工作目录下,即.tabledesc文件
3、查找和表相关的Region Server和机器
4、开始备份
5、检验snapshot的结果
6、确认没问题了,就把临时目录rename到正式目录
我们直接到备份这一步去看吧,方法在EnabledTableSnapshotHandler里面,重写了。
// 用分布式事务来备份在线的,太强悍了
Procedure proc = coordinator.startProcedure(this.monitor, this.snapshot.getName(),
this.snapshot.toByteArray(), Lists.newArrayList(regionServers));
try {
// 等待完成
proc.waitForCompleted();
// 备份split过的region
Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
for (Pair<HRegionInfo, ServerName> region : regions) {
HRegionInfo regionInfo = region.getFirst();
if (regionInfo.isOffline() && (regionInfo.isSplit() || regionInfo.isSplitParent())) {
if (!fs.exists(new Path(snapshotDir, regionInfo.getEncodedName()))) {
LOG.info("Take disabled snapshot of offline region=" + regionInfo);
snapshotDisabledRegion(regionInfo);
}
}
}
这里用到一个分布式事务,这里被我叫做分布式事务,我也不知道它是不是事务,但是Procedure这个词我真的不好翻译,叫过程也不合适。
分布式事务
我们进入ProcedureCoordinator的startProcedure看看吧。
Procedure proc = createProcedure(fed, procName, procArgs,expectedMembers);
if (!this.submitProcedure(proc)) {
LOG.error("Failed to submit procedure '" + procName + "'");
return null;
}
先创建Procedure,然后提交它,这块没什么特别的,继续深入进去submitProcedure方法也找不到什么有用的信息,我们得回到Procedure类里面去,它是一个Callable的类,奥秘就在call方法里面。
final public Void call() {
try {
//在acquired节点下面建立实例节点
sendGlobalBarrierStart();
// 等待所有的rs回复
waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
//在reached节点下面建立实例节点
sendGlobalBarrierReached();
//等待所有的rs回复
waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
} finally {
sendGlobalBarrierComplete();
completedLatch.countDown();
}
}
从sendGlobalBarrierStart开始看吧,里面就一句话。
coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
再追杀下去。
final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
throws IOException, IllegalArgumentException {
String procName = proc.getName();
// 获取abort节点的名称
String abortNode = zkProc.getAbortZNode(procName);
try {
// 如果存在abort节点,就广播错误,中断该过程
if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
abort(abortNode);
}
} catch (KeeperException e) {throw new IOException("Failed while watching abort node:" + abortNode, e);
}
// 获得acquire节点名称
String acquire = zkProc.getAcquiredBarrierNode(procName);
try {
// 创建acquire节点,info信息是Snapshot的信息,包括表名
byte[] data = ProtobufUtil.prependPBMagic(info);
ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
// 监控acquire下面的节点,发现指定的节点,就报告给coordinator
for (String node : nodeNames) {
String znode = ZKUtil.joinZNode(acquire, node);if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
coordinator.memberAcquiredBarrier(procName, node);
}
}
} catch (KeeperException e) {
throw new IOException("Failed while creating acquire node:" + acquire, e);
}
}
1、首先是检查abortNode ,什么是abortNode ?每个procName在zk下面都有一个对应的节点,比如snapshot,然后在procName下面又分了acquired、reached、abort三个节点。检查abort节点下面有没有当前的实例。
2、在acquired节点为该实例创建节点,创建实例节点的时候,把SnapshotDescription的信息(在EnabledTableSnapshotHandler类里面通过this.snapshot.toByteArray()传进去的)放了进去,创建完成之后,在该实例节点下面监控各个Region Server的节点。如果发现已经有了,就更新Procedure中的acquiringMembers列表和inBarrierMembers,把节点从
acquiringMembers中删除,然后添加到inBarrierMembers列表当中。
3、到这一步服务端的工作就停下来了,等到所有RS接收到指令之后通过实例节点当中保存的表信息找到相应的region创建子过程,子过程在acquired节点下创建节点。
4、收到所有RS的回复之后,它才会开始在reached节点创建实例节点,然后继续等待。
5、RS完成任务之后,在reached的实例节点下面创建相应的节点,然后回复。
6、在确定所有的RS都完成工作之后,清理zk当中的相应proName节点。
注意:在这个过程当中,有任务的错误,都会在abort节点下面建立该实例的节点,RS上面的子过程一旦发现abort存在该节点的实例,就会取消该过程。
Snapshot这块在Region Server是由RegionServerSnapshotManager类里面的ProcedureMemberRpcs负责监测snapshot下面的节点变化,当发现acquired下面有实例之后,启动新任务。
public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
throws KeeperException {
this.zkController = new ZKProcedureUtil(watcher, procType) {
@Override
public void nodeCreated(String path) {
if (!isInProcedurePath(path)) {
return;
}
String parent = ZKUtil.getParent(path);
// if its the end barrier, the procedure can be completed
if (isReachedNode(parent)) {
receivedReachedGlobalBarrier(path);
return;
} else if (isAbortNode(parent)) {
abort(path);
return;
} else if (isAcquiredNode(parent)) {
startNewSubprocedure(path);
} else {
LOG.debug("Ignoring created notification for node:" + path);
}
}
};
}
这块折叠起来,不是咱们的重点,让大家看看而已。我们直接进入Subprocedure这个类里面看看吧。
final public Void call() {
try {
// 目前是什么也没干
acquireBarrier();
// 在acquired的实例节点下面建立rs的节点
rpcs.sendMemberAcquired(this);
// 等待reached的实例节点的建立
waitForReachedGlobalBarrier();
// 干活
insideBarrier();
// 活干完了
rpcs.sendMemberCompleted(this);
} catch (Exception e) {
} finally {
releasedLocalBarrier.countDown();
}
}
insideBarrier的实现在FlushSnapshotSubprocedure这个类里面,调用了flushSnapshot(),这个方法给每个region都开一个线程去提交。for (HRegion region : regions) {
taskManager.submitTask(new RegionSnapshotTask(region));
}
Snapshot在线的region
我们接下来看看RegionSnapshotTask的call方法。
public Void call() throws Exception {
// 上锁,暂时不让读了
region.startRegionOperation();
try {
region.flushcache();
region.addRegionToSnapshot(snapshot, monitor);
} finally {
LOG.debug("Closing region operation on " + region);
region.closeRegionOperation();
}
return null;
}
}
在对region操作之前,先上锁,不让读了。然后就flushCache,这个方法很大,也好难懂哦,不过我们还是要迎接困难上,我折叠起来吧,想看的就看,不想看的就看我下面的写的步骤吧。MultiVersionConsistencyControl.WriteEntry w = null;
this.updatesLock.writeLock().lock();
long flushsize = this.memstoreSize.get();
List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
long flushSeqId = -1L;
//先flush日志,再flush memstore到文件
try {
// Record the mvcc for all transactions in progress.
w = mvcc.beginMemstoreInsert();
mvcc.advanceMemstore(w);
if (wal != null) {
//准备flush日志,进入等待flush的队列,这个startSeqId很重要,在恢复的时候就靠它了,它之前的日志就是已经flush过了,不用恢复
Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
if (startSeqId == null) {
return false;
}
flushSeqId = startSeqId.longValue();
} else {
flushSeqId = myseqid;
}
for (Store s : stores.values()) {
storeFlushCtxs.add(s.createFlushContext(flushSeqId));
}
// 给MemStore做个snapshot,它的内部是两个队列,实际是从一个经常访问的队列放到另外一个不常访问的队列,那个队列名叫snapshot
for (StoreFlushContext flush : storeFlushCtxs) {
flush.prepare();
}
} finally {
this.updatesLock.writeLock().unlock();
}
// 同步未flush的日志到硬盘上
if (wal != null && !shouldSyncLog()) {
wal.sync();
}
// 等待日志同步完毕
mvcc.waitForRead(w);
boolean compactionRequested = false;
try {//把memstore中的keyvalues全部flush到storefile保存在临时目录当中,把flushSeqId追加到storefile里
for (StoreFlushContext flush : storeFlushCtxs) {
flush.flushCache(status);
}
// 把之前生成在临时目录的文件转移到正式目录
for (StoreFlushContext flush : storeFlushCtxs) {
boolean needsCompaction = flush.commit(status);
if (needsCompaction) {
compactionRequested = true;
}
}
storeFlushCtxs.clear();
// flush之后,就要减掉相应的memstore的大小
this.addAndGetGlobalMemstoreSize(-flushsize);
1、获取WAL日志的flushId(要写入到hfile当中,以后恢复的时候,要拿日志的flushId和hfile的flushId对比,小于hfile的flushId的就不用恢复了)
2、给MemStore的做snapshot,从kvset集合转移到snapshot集合
3、同步日志,写入到硬盘
4、把MemStore的的snapshot集合当中的内容写入到hfile当中,MemStore当中保存的是KeyValue的集合,写入其实就是一个循环,调用StoreFile.Writer的append方法追加,具体的可以看我的那篇博客《非mapreduce生成Hfile,然后导入hbase当中》
5、上一步的生成的文件是保存在临时目录中的,转移到正式的目录当中
6、更新MemStore当中的大小
好,我们继续看addRegionToSnapshot方法,好累啊,尼玛,这么多步骤。
public void addRegionToSnapshot(SnapshotDescription desc,
ForeignExceptionSnare exnSnare) throws IOException {// 获取工作目录
Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
// 1. 在工作目录创建region目录和写入region的信息
HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
this.fs.getFileSystem(), snapshotDir, getRegionInfo());
// 2. 为hfile创建引用
for (Store store : stores.values()) {
// 2.1. 分列族为store创建引用目录,每个store属于不同的列族
Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());// 2.2. 遍历hfile,然后创建引用
int sz = storeFiles.size();
for (int i = 0; i < sz; i++) {
StoreFile storeFile = storeFiles.get(i);
Path file = storeFile.getPath();
Path referenceFile = new Path(dstStoreDir, file.getName());
boolean success = true;
if (storeFile.isReference()) {
// 把旧的引用文件的内容写入到新的引用文件当中
storeFile.getFileInfo().getReference().write(fs.getFileSystem(), referenceFile);
} else {
// 创建一个空的引用文件
success = fs.getFileSystem().createNewFile(referenceFile);
}
if (!success) {
throw new IOException("Failed to create reference file:" + referenceFile);
}
}
}
}
在工作目录在.hbase-snapshot/.tmps/snapshotName/region/familyName/下面给hfile创建引用文件。在创建引用文件的时候,还要先判断一下这个所谓的hfile是不是真的hfile,还是它本身就是一个引用文件了。
如果已经是引用文件的话,把旧的引用文件里面的内容写入到新的引用文件当中。
如果是一个正常的hfile的话,就创建一个空的引用文件即可,以后我们可以通过它的名字找到它在snapshot下面相应的文件。
okay,到这里,每个RS的工作都完成了。
备份split过的region
完成执行分布式事务,就是备份split过的region了,把之前的代码再贴一次吧,折叠起来,需要的自己看。
if (regionInfo.isOffline() && (regionInfo.isSplit() || regionInfo.isSplitParent())) {
if (!fs.exists(new Path(snapshotDir, regionInfo.getEncodedName()))) {
LOG.info("Take disabled snapshot of offline region=" + regionInfo);
snapshotDisabledRegion(regionInfo);
}
}
protected void snapshotDisabledRegion(final HRegionInfo regionInfo)
throws IOException {
// 创建新的region目录和region信息
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs,
workingDir, regionInfo);
// 把region下的recovered.edits目录的文件复制snapshot的对应region下面
Path regionDir = HRegion.getRegionDir(rootDir, regionInfo);
Path snapshotRegionDir = regionFs.getRegionDir();
new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir, snapshotRegionDir).call();
// 给每个列族的下面的文件创建引用,所谓引用就是一个同名的空文件
new ReferenceRegionHFilesTask(snapshot, monitor, regionDir, fs, snapshotRegionDir).call();
}
备份启用的表,现在已经结束了,但是备份禁用的表吧,前面说了区别是snapshotRegions方法,但是方法除了做一些准备工作之外,就是snapshotDisabledRegion。。。。所以snapshot到这里就完了,下面我们再回顾一遍吧。
1、进行snapshot之前的准备,创建目录,复制一些必要的信息文件等。
2、对于启用的表,启动分布式事务,RS接到任务,flush掉WAL日志和MemStore的数据,写入文件。
3、为hfile创建引用文件,这里的引用文件居然是空的文件,而且名字一样,它不是真的备份hfile,这是什么回事呢?这个要到下一章,从snapshot中恢复,才能弄明白了,这个和hbase的归档文件机制有关系,hbase删除文件的时候,不是直接删除,而是把它先放入archive文件夹内。