MySQL的after_sync半同步与raft 保证一致性的方式有些类似。
after_sync是master在sync binlog后等待从库把事务写到relay log后的ack,拿到ack后,在commit,然后在返回给客户端提交成功的信息。
raft中的日志有commit和applied 两个列表,commited 代表日志写入了文件,applied代表日志应用到了状态机。
raft也是leader先写日志,然后在等待大部分的节点信息都写入了日志,提交了,然后leader在提交,提交日志后,leader在应用到状态机,然后在返回给客户端提交成功的信息, 给其他节点提交信息,其他节点应用日志到状态机,其他节点网络慢的情况下,leader会不停重试传输。
针对leader1宕机的几种状态下的故障。参考
https://www.cnblogs.com/mindwind/p/5231986.html
https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md
https://mit-public-courses-cn-translatio.gitbook.io/mit6-824/lecture-06-raft1/6.6-ying-yong-ceng-jie-kou
针对上面的场景有几个疑问
一:场景3中,新主会间接提交之前的日志,客户端在重试,不是重复执行了吗?状态机执行了2次命令。
这个状态机不会重复执行,因为之前的leader已经挂了,还没有apply log,所以不会发送apply 的请求出去。
二:如果leader apply log,返回给客户端确认,但是follower没有收到apply的信号,leader就挂了,虽然新主上有commit的日志,但是不会apply,怎么办?
这个是会应用的,新的leader,在接受用户请求之前会执行
r.dispatchLogs([]*logFuture{noop})
分发一个noop日志
func (r *Raft) processLogs(index uint64, future *logFuture) {
// Reject logs we've applied already
lastApplied := r.getLastApplied()
if index <= lastApplied {
r.logger.Printf("[WARN] raft: Skipping application of old log: %d", index)
return
}
// Apply all the preceding logs
for idx := r.getLastApplied() + 1; idx <= index; idx++ {
// Get the log, either from the future or from our log store
if future != nil && future.log.Index == idx {
r.processLog(&future.log, future, false)
} else {
l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil {
r.logger.Printf("[ERR] raft: Failed to get log at %d: %v", idx, err)
panic(err)
}
r.processLog(l, nil, false)
}
// Update the lastApplied index and term
r.setLastApplied(idx)
}
在进行for idx := r.getLastApplied() + 1; idx <= index; idx++ 这个判断的时候
lastapplied一定比index小,index就是lastindex,已提交的日志在lastindex前,这样之前leader提交状态的日志会被applied。
raft实现了cap中的cp ,没有保证a ,a代表的是用户的请求一定有响应,在出现脑裂的情况下,如果一个leader的请求没有被大多数节点接受,那么就没有办法提交,没法给客户响应,如果3个节点中有2个节点挂掉,就剩一个节点,其实这个时候请求过来后,判断不是主,不会执行,也会返回给客户端not leader,所以这里的响应,是指的不会处理请求,进行业务逻辑处理。
在raft代码中,我们可以看到是在fsm apply log后才向客户端发送的响应
case commitEntry := <-r.fsmCommitCh:
// Apply the log if a command
var resp interface{}
if commitEntry.log.Type == LogCommand {
start := time.Now()
resp = r.fsm.Apply(commitEntry.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
}
// Update the indexes
lastIndex = commitEntry.log.Index
lastTerm = commitEntry.log.Term
// Invoke the future if given
if commitEntry.future != nil {
commitEntry.future.response = resp
commitEntry.future.respond(nil)
}
follower的幂等实现,就是判断entries的第一个index,是否小于等于当前follower的最后一个logindex,如果是,把这之间的删除掉
// Process any new entries
if n := len(a.Entries); n > 0 {
start := time.Now()
first := a.Entries[0]
last := a.Entries[n-1]
// Delete any conflicting entries
lastLogIdx, _ := r.getLastLog()
if first.Index <= lastLogIdx {
r.logger.Printf("[WARN] raft: Clearing log suffix from %d to %d", first.Index, lastLogIdx)
if err := r.logs.DeleteRange(first.Index, lastLogIdx); err != nil {
r.logger.Printf("[ERR] raft: Failed to clear log suffix: %v", err)
return
}
}