MultiVersionConcurrencyControl.java,版本 0.94.1
MultiVersionConsistencyControl 管理 memstore 中的读写一致性。该类实现了一种机制,达到如下的目的:
- 提供接口让 reader 知道可以忽略哪些元素项
- 提供一个新的 WriteNumber 给 writer
- 将写更新提供给 reader(通过原子事务)
1 变量
主要包含两个变量:
- memstoreRead 前面文章中提到的 ReadPoint
- memstoreWrite 前面文章中提到的 WriteNumber
变量描述private volatile long
,私有,长整形,volatile 标记,线程间可见。
readWriters 私有 object 常量,加锁时候用。
writeQueue 写操作队列,默认构造函数初始化的LinkedList,元素类型为 WriterEntry。注意,LinkedList 这个双向链表数据结构,适合随机增、删,但不是线程安全的。
perThreadReadPoint 线程局部变量,保存当前线程获取的 readPoint,static final
,ThreadLocal,用 Long.MAX_VALUE初始化。
FIXED_SIZE 一个 MVCC 对象占用内存空间的大小,包括:一个锁对象 readWriters,两个 long 型变量 memstoreRead 和 memstoreWrite,两个应用对象的地址:writeQueue 和 perThreadReadPoint。
2 内嵌类
WriteEntry 每个 writer 持有一个,保存 writeNumber,并用一个 completed 标记是否写完。
实际上就是对理论中 writeNumber 的包装:通过 writeNumber 对应到一个writer,通过 completed 标记 writer 是否写完。
3 方法
3.1 构造方法
public MultiVersionConsistencyControl();
唯一的一个构造方法,无参数,将 memstoreRead 和 memstoreWrite 初始化为0。
两个变量的值一般不会为0,后续会用初始化方法,将两个变量的值设置为一个正常值。
3.2 初始化方法
public void initialize(long startPoint);
初始化 memstoreRead 和 memstoreWrite 。
在初始化的时候,会用 synchronized 对 writeQueue 加锁。
判断 memstoreWrite 是否和 memstoreRead 相同,如果两者不同,认为这个是已经用过的 MVCC 对象,不需要再做初始化;如果相同,则将 memstoreRead 和 memstoreWriet 设置为输入参数。
initialize 方法唯一一处引用在 org.apache.hadoop.hbase.regionserver.HRegion.initializeRegionInternals
方法中。
HRegion 包含一个私有的常量 mvcc ,对象声明的时候用 MVCC 的缺省构造方法初始化。HRegion 的 org.apache.hadoop.hbase.regionserver.HRegion.initialize
对 HRegion 初始化的时候,调用了 initialiazeRegionInterals
,初始化 Region 的内部对象,把 mvcc 初始化为一个合理的值。
所以,上面 MVCC 初始化方法的判断逻辑是 memstoreRead == memstoreWrite
时候才有必要进行初始化。
3.3 memstoreRead相关
在实现的时候,ReadPoint 包含两类,一个是全局的,由 memstoreRead 记录的,这个是所有线程均可见、可改的,一个是局部的, ThreadLocal 的 readPoint,每个线程有一个自己的副本,且这个不保证与全局的 readPoint 一样。在代码实现的时候,需要对这两类 readPoint 小心处理。
3.3.1 memstoreReadPoint
public long memstoreReadPoint();
返回当前 MVCC 记录的 memstoreRead。在两处被用到:
一处是 MVCC 的 resetThreadReadPoint,用 mvcc 的全局 readpoint 更新线程局部的 readPoint。
一处是在 HRegion 类中,getSmallestReadPoint 方法,比较该 Region 上所有 scanner 持有的 readPoint,以及 Region MVCC 对象的 readPoint,在这所有中找到一个最小的。
3.3.2 ThreadReadPoint
ThreadReadPoint 相关4个方法,均标记为 static 。
public static long getThreadReadPoint();
获取线程局部 readPoint,主要由 memstore scanner 调用,以确认跳过哪些值。
分别在 org.apahce.hadoop.hbase.regionserver.MemStore.MemStoreScanner.getNext
方法和 org.apahce.hadoop.hbase.regionserver.StoreFileScanner.skipKVsNewerThanReadpoint
方法中被调用。
在 getNext
方法中,获取线程当前的 readPoint,scanner 在扫描的时候,所有比 readPonit 老的值才可以被读到。
在 skipKVsNewerThanReadpoint
方法中则想法,获取线程当前的 readPoint 后,把所有比 readPoint 新的 KV 对象会被跳过。
public static void setThreadReadPoint(long readPoint);
设置线程局部 readPoint。
主要在两个地方用到:
org.apahce.hadoop.hbase.regionserver.HRegion.RegionScannerImpl
配合对象自己的 readPt 使用。
构造方法,根据隔离级别参数,如果为 READ_UNCOMMITED,将线程局部 readPoint 设置为 long.MAX_VALUE。
next()
方法, 带 synchronized 标记,在开始取数据之前,先用 readPt 更新线程局部 readPoint,然后再取值。
reseek()
方法,带 synchronized 标记,重新定位之前,用 readPt 更新线程局部的 readPoint 标记。
org.apahce.hadoop.hbase.regionserver.Store
compactStore
方法。如前面理论描述中提到的,先获取当前 region 所有读者中持有的最老的 readPoint,用 setThreadReadPoint
方法更新线程局部 readPoint。后续做 compact,以这个smallestReadPoint 为基准。
由上可见,线程局部维护的 ThreadReadPoint 不自动更新,可以随时被读取,但是在所有写操作之前,需要对线程局部的 ThreadReadPoint 更新,写入一个确定的值,以确认最新。
public static void resetThreadReadPoint();
将线程局部 readPoint 重置为0。Find Usages 工具发现该方法实际未被使用。
public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc);
用一个 MVCC 对象的 memstoreReadPoint 更新当前线程的局部 readPoint,并返回更新后的局部 readPoint。
3.4 memstoreWrite相关
涉及4个方法,入口是 beginMemstoreInsert()
和 completeMemstoreInsert()
,completeMemstoreInsert()
在实现的时候,先调用 advanceMemstore()
后调用 waitForRead()
,以下依次说明。
public WriteEntry beginMemstoreInsert();
获取一个最新的 writerNumber,核心就是干这一件事。
对 writeQueue 加锁,获取一个新的 memstoreWrite,利用新的 memstore 新建个 WriteEntry 对象,并添加到 writeQueue 链表中,最后将新建的 WriteEntry 对象返回。
该方法的调用都在 HRegion 类中:
applyFamilyMapToMemstore 传入参数包含 WriteEntry 对象,如果该对象为 null,则调用
beginMemstoreInsert()
会创建一个 WriterEntry 且已加入 writeQueue。doMiniBatchMutation 批量修改,put 或者 delete。通过
beginMemstoreInsert()
获取一个包含最新的 WriteNumber 的 WriteEntryinternalFlushcache flush 的实现方法,获取最新的 writeNumber
mutateRowsWithLocks region 内的原子修改,获取最新的 writeNumber
public void completeMemstoreInsert(WriteEntry e);
按序调用后面两个方法,没有其他。
boolean advanceMemstore(WriteEntry e);
"从头开始遍历writeQueue,移除所有已完成的WriteEntry对象,最后将memstoreRead更新为最新已完成的memstoreWrite;"
boolean advanceMemstore(WriteEntry e) {
synchronized (writeQueue) {
e.markCompleted();
long nextReadValue = -1;
boolean ranOnce=false;
while (!writeQueue.isEmpty()) {
ranOnce=true;
WriteEntry queueFirst = writeQueue.getFirst();
if (nextReadValue > 0) {
if (nextReadValue+1 != queueFirst.getWriteNumber()) {
throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+ nextReadValue + " next: " + queueFirst.getWriteNumber());
}
}
if (queueFirst.isCompleted()) {
nextReadValue = queueFirst.getWriteNumber();
writeQueue.removeFirst();
} else {
break;
}
}
if (!ranOnce) {
throw new RuntimeException("never was a first");
}
if (nextReadValue > 0) {
synchronized (readWaiters) {
memstoreRead = nextReadValue;
readWaiters.notifyAll();
}
}
if (memstoreRead >= e.getWriteNumber()) {
return true;
}
return false;
}
}
public void waitForRead(WriteEntry e);
等待全局的 readPoint 更新到当前 writer 的事务号。"阻塞当前线程,直到memstoreRead等于当前WriteEntry的memstoreWrite,至此表明当前WriteEntry之前的所有更新事务都已经完成"
正如之前 解释 HBase MVCC 实现原理的时候提到的,所有事务号小于 readPoint 的事务,被认为是"确定安全"了,相关的线程也就可以被释放了。
public void waitForRead(WriteEntry e) {
boolean interrupted = false;
synchronized (readWaiters) {
while (memstoreRead < e.getWriteNumber()) {
try {
readWaiters.wait(0);
} catch (InterruptedException ie) {
// We were interrupted... finish the loop -- i.e. cleanup --and then
// on our way out, reset the interrupt flag.
interrupted = true;
}
}
}
if (interrupted) Thread.currentThread().interrupt();
}
4 最新版本
MVCC 最新版本的代码参见:MultiVersionConcurrencyControl.java
从变量到方法都有了明显的变化,下一篇专门比较下。