文章目录
前言
本文继续介绍了HRegion上Memstore flush的主体流程和主要细节,cacheFlusher如何处理flush请求。
cacheFlusher如何处理flush请求
通过如何初始化cacheFlusher部分的介绍,我们已经知道,在MemStoreFlusher内部,存在两个存储flush请求及其HRegion封装类的队列和集合,即flushQueue和regionsInQueue,而MemStoreFlusher对外提供了一个requestFlush()方法,我们大体看下这个方法:
public void requestFlush(HRegion r) {
synchronized (regionsInQueue) {// 使用synchronized关键字对regionsInQueue进行线程同步
if (!regionsInQueue.containsKey(r)) {// 如果regionsInQueue中不存在对应HRegion
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
// 将HRegion类型的r封装成FlushRegionEntry类型的fqe
// 这个fqe没有delay,即延迟执行时间,所以它被添加到flush队列的顶部。不久它将出列被处理。
FlushRegionEntry fqe = new FlushRegionEntry(r);
// 将HRegion->FlushRegionEntry的对应关系添加到regionsInQueue集合
// 将flush请求FlushRegionEntry添加到flushQueue队列
// 从这里可以看出regionsInQueue、flushQueue这两个成员变量go together
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
}
}
requestFlush()
requestFlush()方法的主要作用,就是添加一个flush region的请求至MemStoreFlusher内部队列。其主要逻辑如下:
1、首先需要使用synchronized关键字对regionsInQueue进行线程同步,这么做是为了防止多线程的并发;
2、然后判断regionsInQueue中是否存在对应的HRegion,如果regionsInQueue集合中不存在对应HRegion的话继续,否则直接返回;
3、既然regionsInQueue集合中不存在对应HRegion,将HRegion类型的r封装成FlushRegionEntry类型的fqe;
4、将HRegion->FlushRegionEntry的对应关系添加到regionsInQueue集合;
5、将flush请求FlushRegionEntry添加到flushQueue队列。
从上述4、5步就可以看出regionsInQueue、flushQueue这两个成员变量go together,并且这个fqe没有delay,即延迟执行时间,所以它被添加到flush队列的顶部,不久它将出列被处理。回到flushQueue的定义,flushQueue是一个存储了Region刷新缓存请求的队列,里面存储的是实现了FlushQueueEntry接口的对象,FlushQueueEntry没有定义任何行为,但是继承了java.util.concurrent.Delayed接口,故flushQueue是java中的DelayQueue,队列里存储的对象有一个过期时间的概念。
既然flush的请求已经被添加至flushQueue队列,相当于生产者已经把产品生产出来了,那么需要一个消费者,这个消费者的角色就是由FlushHandler线程来担任的。既然是线程,那么处理的逻辑肯定在其run()方法内,先看下flushQueue中存储的都是什么?
回顾下flushQueue的定义,它是一个存储了FlushQueueEntry的队列DelayQueue。我们先看下FlushQueueEntry的定义:
interface FlushQueueEntry extends Delayed {
}
是一个集成了java的Delayed接口的无任何方法的空接口,那么它的实现类就是WakeupFlushThread和FlushRegionEntry。首先来看下flushQueue对应的队列类型—Java中的DelayQueue。
DelayQueue是一个*的BlockingQueue,其内部存储的必然是实现了Delayed接口的对象。所以,FlushQueueEntry必须实现java的Delayed接口。而这种队列中的成员有一个最大特点,就是只有在其到期后才能出列,并且该队列内的成员都是有序的,从头至尾按照延迟到期时间的长短来排序。那么如何判断成员是否到期呢?对应成员对象的getDelay()方法返回一个小于等于0的值,就说明对应对象在队列中已到期,可以被取走。
既然DelayQueue中存储的成员对象都是有序的,那么实现了Delayed接口的类,必须提供compareTo()方法,用以排序,并且需要实现上述getDelay()方法,判断队内成员是否到期可以被取走。
下面开始研究下WakeupFlushThread和FlushRegionEntry。
WakeupFlushThread
首先,WakeupFlushThread非常简单,没有任何实质内容,代码如下:
static class WakeupFlushThread implements FlushQueueEntry {
@Override
public long getDelay(TimeUnit unit) {
return 0;
}
@Override
public int compareTo(Delayed o) {
return -1;
}
@Override
public boolean equals(Object obj) {
return (this == obj);
}
它的主要作用是做为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠。而且,其getDelay()方法返回值为0,说明其不存在延迟时间,入列后即可出列。而它的compareTo()方法返回的值是-1,说明它与其它WakeupFlushThread在队内的顺序是等价的,无前后之分,实际上WakeupFlushThread区分前后也没有意义,它本身也没有实质性的内容。
FlushRegionEntry
接下来,我们再看下FlushRegionEntry类,其定义如下:
static class FlushRegionEntry implements FlushQueueEntry {
// 待flush的HRegion
private final HRegion region;
// 创建时间
private final long createTime;
// 何时到期
private long whenToExpire;
// 重入队列次数
private int requeueCount = 0;
FlushRegionEntry(final HRegion r) {
// 待flush的HRegion
this.region = r;
// 创建时间为当前时间
this.createTime = EnvironmentEdgeManager.currentTime();
// 何时到期也为当前时间,意味着首次入队列时是没有延迟时间的,入列即可出列
this.whenToExpire = this.createTime;
}
/**
* @param maximumWait
* @return True if we have been delayed > <code>maximumWait</code> milliseconds.
*/
public boolean isMaximumWait(final long maximumWait) {
return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
}
/**
* @return Count of times {@link #requeue(long)} was called; i.e this is
* number of times we've been requeued.
*/
public int getRequeueCount() {
return this.requeueCount;
}
/**
* 类似重新入列的处理方法,重新入列次数requeueCount加1,何时到期未当前时间加参数when
*
* @param when When to expire, when to come up out of the queue.
* Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime()
* to whatever you pass.
* @return This.
*/
public FlushRegionEntry requeue(final long when) {
this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
this.requeueCount++;
return this;
}
/**
* 判断何时到期的方法
*/
@Override
public long getDelay(TimeUnit unit) {
// 何时到期减去当前时间
return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
TimeUnit.MILLISECONDS);
}
/**
* 排序比较方法,根据判断何时到期的getDelay()方法来决定顺序
*/
@Override
public int compareTo(Delayed other) {
// Delay is compared first. If there is a tie, compare region's hash code
int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
other.getDelay(TimeUnit.MILLISECONDS)).intValue();
if (ret != 0) {
return ret;
}
// 何时到期时间一直的话,根据hashCode()来排序,其实也就是根据HRegion的hashCode()方法返回值来排序
FlushQueueEntry otherEntry = (FlushQueueEntry) other;
return hashCode() - otherEntry.hashCode();
}
@Override
public String toString() {
return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
}
@Override
public int hashCode() {
int hash = (int) getDelay(TimeUnit.MILLISECONDS);
return hash ^ region.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Delayed other = (Delayed) obj;
return compareTo(other) == 0;
}
}
接下来我们看下flush请求的实际处理流程,即FlushHandler的run()方法:
它的主要处理逻辑为:
1、首先HRegionServer未停止的话,run()方法一直运行;
2、将标志位AtomicBoolean类型的wakeupPending设置为false;
3、从flushQueue队列中拉取一个FlushQueueEntry,即fqe:
3.1、如果fqe为空,或者为WakeupFlushThread:
3.1.1、如果通过isAboveLowWaterMark()方法判断全局MemStore的大小高于限制值得低水平线,调用flushOneForGlobalPressure()方法,按照一定策略,flush一个HRegion的MemStore,降低MemStore的大小,预防OOM等异常情况的发生,并入列另一个令牌,以使该线程之后再次被唤醒;
3.2、fre不为空,且不为WakeupFlushThread的话,转化为FlushRegionEntry类型的fre:调用flushRegion()方法,并且如果结果为false的话,跳出循环;
4、如果循环结束,同时清空regionsInQueue和flushQueue(ps:又是在一起啊O(∩_∩)O~)
5、唤醒所有的等待着,使得它们能够看到close标志;
6、记录日志。
WakeupFlushThread的主要作用是做为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠,实际上WakeupFlushThread起到的作用不仅仅是这个,在FlushHandler线程不断的poll刷新队列flushQueue中的元素时,如果获取到的是一个WakeupFlushThread,它会发起 一个检测,即RegionServer的全局MemStore大小是否超过低水平线,如果未超过,WakeupFlushThread仅仅起到了一个占位符的作用,否则,WakeupFlushThread不仅做为占位符,保证刷新线程不休眠,还按照一定策略选择该RegionServer上的一个Region刷新memstore,以缓解RegionServer内存压力。
总结
本文介绍了HRegion上Memstore flush的主体流程和主要细节,讲述了cacheFlusher如何处理flush请求的细节,关于如何选择一个HRegion进行flush以缓解MemStore压力以及后续问题将在下文中介绍。