SyncRequestProcessor
作为一个ZooKeeper中的一个关键线程(ZooKeeperCriticalThread
),是ZooKeeper请求处理链中的事务日志记录处理器,其主要用来将事务请求记录到事务日志文件中去,同时还会触发ZooKeeper进行数据快照。
数据结构
-
LinkedBlockingQueue<Request> queuedRequests
:上一个RequestProcessor
调用nextProcessor.processRequest(request)
将request排入该队列中等待处理。 -
Thread snapInProcess
:负责快照线程,保证数据快照过程不影响ZooKeeper的主流程,需创建一个单独的异步线程来进行数据快照。 -
LinkedList<Request> toFlush
:在持久化过程中,使用组提交(Group Commits)来优化磁盘I/O操作。想象一个场景:当客户端有大量的事务请求,如果每次写请求都同步到磁盘,那么性能就会产生问题。所以设置该链表来暂存需要持久化到磁盘的Request。 -
int snapCount
:默认为100000,表示ZooKeeper每隔snapCount
次事务日志记录后进行一个数据快照。
toFlush
以及flush时机
toFlush
队列可用于存储请求,可能是读也可能是写。
ZooKeeper专门使用线程SyncRequestProcessor
来处理请求,所以这个线程必须合理的工作,否则会对整体的性能造成影响。如果客户端请求为读请求就没必要进行flush了,但如果是写请求,就必须把请求写入log,这个写入未必能保证真的同步到磁盘。所以合适的时机将缓存的事务日志刷入到磁盘是必须的。
从程序的设计应该能看到作者出于这个考虑选择了两个时机来做这件事情:
- 如果没有请求的时候(即较空闲的时候)
- 如果一直繁忙,则toFlush队列到达了一定数量(1000),就会批量同步
注意点
-
数据快照
每进行一次事务日志记录之后,ZooKeeper都会检测当前是否需要进行数据快照。理论上进行
snapCount
次事务操作后就会开始数据快照,但是考虑到数据快照对于ZooKeeper所在机器的整体性能影响,需要尽量避免ZooKeeper集群中所有机器在同一时刻进行数据快照。因此ZooKeeper在具体的实现中,并不是严格按照这个策略执行,而是采取“过半随机”策略,即符合如下条件就进行数据快照:logCount > (snapCount / 2 + randRoll)
其中
logCount
代表了当前已经记录的事务日志数量,randRoll
为1 ~ snapCount/2
之间的随机数,因此上面的条件就相当于:如果我们配置的snapCount
为100000
,那么ZooKeeper会在50000 ~ 100000
次事务日志记录后进行一次数据快照。 -
事务日志文件切换
当满足上述条件时,ZooKeeper就要开始进行数据快照了。首先是进行事务日志文件的切换。所谓的事务日志文件切换时指当前的事务日志已经“写满”,需要重新创建一个新的事务日志。即每当进行一次数据快照,重新创建一个事务日志文件。
源码
int logCount = 0;
int randRoll = r.nextInt(snapCount/2); // 产生0~snapCount/2之间的随机数
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take(); // toFlush为空,不需要flush,没有数据则直接阻塞掉
} else {
si = queuedRequests.poll(); // 没有数据直接返回,有则拿出
if (si == null) { // 如果queuedRequests中没有数据,但toFlush不空,则表明ZooKeeper现在比较空闲,可以进行flush
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
if (zks.getZKDatabase().append(si)) { // 如果si是事务请求
logCount++;
if (logCount > (snapCount / 2 + randRoll)) { // 满足条件,需要进行数据快照
randRoll = r.nextInt(snapCount/2);
zks.getZKDatabase().rollLog(); // 切换事务日志文件
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") { // 创建数据快照异步线程
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start(); // 保证数据快照过程不影响ZooKeeper的主流程,创建一个单独的异步线程来进行数据快照
}
logCount = 0;
}
} else if (toFlush.isEmpty()) { // 如果是非事务请求(读操作)且toFlush为空
// 说明近一段时间读多写少,直接响应,此处优化为了读比较频繁操作
// 为何读写不分开???
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue; // nextProcessor已经对该si处理过了,不用添加到toFlush中
}
toFlush.add(si);
if (toFlush.size() > 1000) { // 超过1000,直接flush
flush(toFlush);
}
}
}
参考
- zookeeper储存之实现分析
- 从Paxos到ZooKeeper分布式一致性原理与实践书籍