1.概述
在rpc请求里,有了请求req就必然有回复resp。本文就来解析发送req的节点收到resp该怎么处理。
2.handle_peer_resp源码解析
void raft_server::handle_peer_resp(ptr<resp_msg>& resp, const ptr<rpc_exception>& err)
{
if (err)
{
l_->info(sstrfmt("peer response error: %s").fmt(err->what()));
return;
}
// update peer last response time
{
read_lock(peers_lock_);
auto peer = peers_.find(resp->get_src());
if (peer != peers_.end())
{
peer->second->set_last_resp(system_clock::now());
}
else
{
l_->info(sstrfmt("Peer %d not found, ignore the message").fmt(resp->get_src()));
return;
}
}
l_->debug(lstrfmt("Receive a %s message from peer %d with Result=%d, Term=%llu, NextIndex=%llu")
.fmt(
__msg_type_str[resp->get_type()],
resp->get_src(),
resp->get_accepted() ? 1 : 0,
resp->get_term(),
resp->get_next_idx()));
{
recur_lock(lock_);
// if term is updated, no more action is required
if (update_term(resp->get_term()))
{
return;
}
// ignore the response that with lower term for safety
switch (resp->get_type())
{
case msg_type::vote_response:
handle_voting_resp(*resp);
break;
case msg_type::append_entries_response:
handle_append_entries_resp(*resp);
break;
case msg_type::install_snapshot_response:
handle_install_snapshot_resp(*resp);
break;
default:
l_->err(sstrfmt("Received an unexpected message %s for response, system exits.")
.fmt(__msg_type_str[resp->get_type()]));
ctx_->state_mgr_->system_exit(-1);
::exit(-1);
break;
}
}
}
- 1.与rep_handlers类似,resp_handlers同样有一个总的处理resp的函数,通过switch-case来分流。
- 2.在交给具体的resp_handlers之前,handle_peer_resp还更新了peer的last_response_time。
- 3.如果这个resp可以更新节点的term,说明节点已经落后了,无需进行任何操作。
3.handle_voting_resp源码解析
void raft_server::handle_voting_resp(resp_msg& resp)
{
if (resp.get_term() != state_->get_term())
{
l_->info(sstrfmt("Received an outdated vote response at term %llu v.s. current term %llu")
.fmt(resp.get_term(), state_->get_term()));
return;
}
if (election_completed_)
{
l_->info("Election completed, will ignore the voting result from this server");
return;
}
if (voted_servers_.find(resp.get_src()) != voted_servers_.end())
{
l_->info(sstrfmt("Duplicate vote from %d for term %lld").fmt(resp.get_src(), state_->get_term()));
return;
}
{
read_lock(peers_lock_);
voted_servers_.insert(resp.get_src());
if (resp.get_accepted())
{
votes_granted_ += 1;
}
if (voted_servers_.size() >= (peers_.size() + 1))
{
election_completed_ = true;
}
if (votes_granted_ > (int32)((peers_.size() + 1) / 2))
{
l_->info(sstrfmt("Server is elected as leader for term %llu").fmt(state_->get_term()));
election_completed_ = true;
become_leader();
}
}
}
- 1.if (resp.get_term() != state_->get_term())判断term是否相同,相同继续。
- 2.判断if (election_completed_)选举是否完成,因为candidate只需要一半以上就会成功,所以可能出现election结束了但还收到了resp的情况。
- 3.判断发送resp的节点在不在candidate的voted_servers_里面,在的话说明收到了同一个节点的两票,出错。
- 4.如果 (voted_servers_.size() >= (peers_.size() + 1))说明选举已经结束。
- 5.通过resp.get_accepted()来统计自己的得票,如果超过了一半说明成功了,调用become_leader();
4.handle_append_entries_resp源码解析
void raft_server::handle_append_entries_resp(resp_msg& resp)
{
read_lock(peers_lock_);
peer_itor it = peers_.find(resp.get_src());
if (it == peers_.end())
{
l_->info(sstrfmt("the response is from an unkonw peer %d").fmt(resp.get_src()));
return;
}
// if there are pending logs to be synced or commit index need to be advanced, continue to send appendEntries to
// this peer
bool need_to_catchup = true;
ptr<peer> p = it->second;
if (resp.get_accepted())
{
{
auto_lock(p->get_lock());
p->set_next_log_idx(resp.get_next_idx());
p->set_matched_idx(resp.get_next_idx() - 1);
}
// try to commit with this response
std::vector<ulong> matched_indexes(peers_.size() + 1);
matched_indexes[0] = log_store_->next_slot() - 1;
int i = 1;
for (it = peers_.begin(); it != peers_.end(); ++it, i++)
{
matched_indexes[i] = it->second->get_matched_idx();
}
std::sort(matched_indexes.begin(), matched_indexes.end(), std::greater<ulong>());
commit(matched_indexes[(peers_.size() + 1) / 2]);
need_to_catchup = p->clear_pending_commit() || resp.get_next_idx() < log_store_->next_slot();
}
else
{
std::lock_guard<std::mutex> guard(p->get_lock());
if (resp.get_next_idx() > 0 && p->get_next_log_idx() > resp.get_next_idx())
{
// fast move for the peer to catch up
p->set_next_log_idx(resp.get_next_idx());
}
else if (p->get_next_log_idx() > 0)
{
p->set_next_log_idx(p->get_next_log_idx() - 1);
}
}
// This may not be a leader anymore, such as the response was sent out long time ago
// and the role was updated by UpdateTerm call
// Try to match up the logs for this peer
if (role_ == srv_role::leader && need_to_catchup)
{
request_append_entries(*p);
}
}
- 1.首先判断发送resp的节点是不是还在peers_列表里面,不在报错。
- 2.如果resp的accepted = 1,说明append-entry成功了,设置match_idx与next_idx。
- 3.提取peers_列表里面所有的match_idx,然后sort排序。
- 4.排序后取中位数mid_idx,说明至少有一半以上的follower都应用到了mid_idx,将mid应用到自己(也就是leader)的状态机。
- 5.如果没接受(accepted = 0),那么就调整next_idx继续逼近。(具体可看cornerstone中msg类型解析)
- 6.如果该节点还需要catch_up,再发送一遍。(可能是该leader在很久之前给他发的req,现在节点才回复,导致节点依然落后。)
知识点:
log_entry是先让follower应用到状态机,只有超过半数以上的都应用了,leader才会应用到自己的状态机。具体到实现,可以将所有节点的match_idx排序然后取中位数。
5.handle_install_snapshot_resp源码解析
void raft_server::handle_install_snapshot_resp(resp_msg& resp)
{
read_lock(peers_lock_);
peer_itor it = peers_.find(resp.get_src());
if (it == peers_.end())
{
l_->info(sstrfmt("the response is from an unkonw peer %d").fmt(resp.get_src()));
return;
}
// if there are pending logs to be synced or commit index need to be advanced, continue to send appendEntries to
// this peer
bool need_to_catchup = true;
ptr<peer> p = it->second;
if (resp.get_accepted())
{
std::lock_guard<std::mutex> guard(p->get_lock());
ptr<snapshot_sync_ctx> sync_ctx = p->get_snapshot_sync_ctx();
if (sync_ctx == nilptr)
{
l_->info("no snapshot sync context for this peer, drop the response");
need_to_catchup = false;
}
else
{
if (resp.get_next_idx() >= sync_ctx->get_snapshot()->size())
{
l_->debug("snapshot sync is done");
ptr<snapshot> nil_snp;
p->set_next_log_idx(sync_ctx->get_snapshot()->get_last_log_idx() + 1);
p->set_matched_idx(sync_ctx->get_snapshot()->get_last_log_idx());
p->set_snapshot_in_sync(nil_snp);
need_to_catchup = p->clear_pending_commit() || resp.get_next_idx() < log_store_->next_slot();
}
else
{
l_->debug(sstrfmt("continue to sync snapshot at offset %llu").fmt(resp.get_next_idx()));
sync_ctx->set_offset(resp.get_next_idx());
}
}
}
else
{
l_->info("peer declines to install the snapshot, will retry");
}
// This may not be a leader anymore, such as the response was sent out long time ago
// and the role was updated by UpdateTerm call
// Try to match up the logs for this peer
if (role_ == srv_role::leader && need_to_catchup)
{
request_append_entries(*p);
}
}
- 核心代码是,其他与上面一致。
if (resp.get_next_idx() >= sync_ctx->get_snapshot()->size())
{
l_->debug("snapshot sync is done");
ptr<snapshot> nil_snp;
p->set_next_log_idx(sync_ctx->get_snapshot()->get_last_log_idx() + 1);
p->set_matched_idx(sync_ctx->get_snapshot()->get_last_log_idx());
p->set_snapshot_in_sync(nil_snp);
need_to_catchup = p->clear_pending_commit() || resp.get_next_idx() < log_store_->next_slot();
}
else
{
l_->debug(sstrfmt("continue to sync snapshot at offset %llu").fmt(resp.get_next_idx()));
sync_ctx->set_offset(resp.get_next_idx());
}
- 因为这是snapshot,不需要发idx,所以resp.get_next_idx()实际上是follower已经接受snapshot的offset。如果接受的offset >= sync_ctx->get_snapshot()->size(),说明已经完成了,设置next_idx与match_idx。否则继续从已经接受的offset位置继续发送。
6.额外的ext_resp处理源码解析
void raft_server::handle_ext_resp(ptr<resp_msg>& resp, const ptr<rpc_exception>& err)
{
recur_lock(lock_);
if (err)
{
handle_ext_resp_err(*err);
return;
}
l_->debug(lstrfmt("Receive an extended %s message from peer %d with Result=%d, Term=%llu, NextIndex=%llu")
.fmt(
__msg_type_str[resp->get_type()],
resp->get_src(),
resp->get_accepted() ? 1 : 0,
resp->get_term(),
resp->get_next_idx()));
switch (resp->get_type())
{
case msg_type::sync_log_response:
if (srv_to_join_)
{
// we are reusing heartbeat interval value to indicate when to stop retry
srv_to_join_->resume_hb_speed();
srv_to_join_->set_next_log_idx(resp->get_next_idx());
srv_to_join_->set_matched_idx(resp->get_next_idx() - 1);
sync_log_to_new_srv(resp->get_next_idx());
}
break;
case msg_type::join_cluster_response:
if (srv_to_join_)
{
if (resp->get_accepted())
{
l_->debug("new server confirms it will join, start syncing logs to it");
sync_log_to_new_srv(resp->get_next_idx());
}
else
{
l_->debug("new server cannot accept the invitation, give up");
}
}
else
{
l_->debug("no server to join, drop the message");
}
break;
case msg_type::leave_cluster_response:
if (!resp->get_accepted())
{
l_->debug("peer doesn't accept to stepping down, stop proceeding");
return;
}
l_->debug("peer accepted to stepping down, removing this server from cluster");
rm_srv_from_cluster(resp->get_src());
break;
case msg_type::install_snapshot_response:
{
if (!srv_to_join_)
{
l_->info("no server to join, the response must be very old.");
return;
}
if (!resp->get_accepted())
{
l_->info("peer doesn't accept the snapshot installation request");
return;
}
ptr<snapshot_sync_ctx> sync_ctx = srv_to_join_->get_snapshot_sync_ctx();
if (sync_ctx == nilptr)
{
l_->err("Bug! SnapshotSyncContext must not be null");
ctx_->state_mgr_->system_exit(-1);
::exit(-1);
return;
}
if (resp->get_next_idx() >= sync_ctx->get_snapshot()->size())
{
// snapshot is done
ptr<snapshot> nil_snap;
l_->debug("snapshot has been copied and applied to new server, continue to sync logs after snapshot");
srv_to_join_->set_snapshot_in_sync(nil_snap);
srv_to_join_->set_next_log_idx(sync_ctx->get_snapshot()->get_last_log_idx() + 1);
srv_to_join_->set_matched_idx(sync_ctx->get_snapshot()->get_last_log_idx());
}
else
{
sync_ctx->set_offset(resp->get_next_idx());
l_->debug(sstrfmt("continue to send snapshot to new server at offset %llu").fmt(resp->get_next_idx()));
}
sync_log_to_new_srv(srv_to_join_->get_next_log_idx());
}
break;
case msg_type::prevote_response:
handle_prevote_resp(*resp);
break;
default:
l_->err(lstrfmt("received an unexpected response message type %s, for safety, stepping down")
.fmt(__msg_type_str[resp->get_type()]));
ctx_->state_mgr_->system_exit(-1);
::exit(-1);
break;
}
}
在解析前我们先梳理一下调用顺序:
1.leader向新节点发送invite_srv_to_join_cluster,新节点收到invite_srv_to_join_cluster请求后更新自己的role_,leader_等状态,并调用reconfigure重置cluster的config。更新完后发送join_cluster_response给leader。
2.leader收到该response后调用switch-case里面的msg_type::join_cluster_response分支来处理。处理完join_cluster_response会调用ync_log_to_new_srv向新节点发送sync_log_req。
3.新节点收到sync_log_req后发送sync_log_response给leader,leader收到后调用switch-case里面的msg_type::sync_log_response分支。
void raft_server::sync_log_to_new_srv(ulong start_idx)
{
// only sync committed logs
int32 gap = (int32)(quick_commit_idx_ - start_idx);
if (gap < ctx_->params_->log_sync_stop_gap_)
{
l_->info(lstrfmt("LogSync is done for server %d with log gap %d, now put the server into cluster")
.fmt(srv_to_join_->get_id(), gap));
ptr<cluster_config> new_conf = cs_new<cluster_config>(log_store_->next_slot(), config_->get_log_idx());
new_conf->get_servers().insert(
new_conf->get_servers().end(), config_->get_servers().begin(), config_->get_servers().end());
new_conf->get_servers().push_back(conf_to_add_);
bufptr new_conf_buf(new_conf->serialize());
ptr<log_entry> entry(cs_new<log_entry>(state_->get_term(), std::move(new_conf_buf), log_val_type::conf));
log_store_->append(entry);
config_changing_ = true;
request_append_entries();
return;
}
ptr<req_msg> req;
if (start_idx > 0 && start_idx < log_store_->start_index())
{
req = create_sync_snapshot_req(*srv_to_join_, start_idx, state_->get_term(), quick_commit_idx_);
}
else
{
int32 size_to_sync = std::min(gap, ctx_->params_->log_sync_batch_size_);
bufptr log_pack = log_store_->pack(start_idx, size_to_sync);
req = cs_new<req_msg>(
state_->get_term(),
msg_type::sync_log_request,
id_,
srv_to_join_->get_id(),
0L,
start_idx - 1,
quick_commit_idx_);
req->log_entries().push_back(
cs_new<log_entry>(state_->get_term(), std::move(log_pack), log_val_type::log_pack));
}
srv_to_join_->send_req(req, ex_resp_handler_);
}
- 1.msg_type::sync_log_response类型:
srv_to_join_->resume_hb_speed();
srv_to_join_->set_next_log_idx(resp->get_next_idx());
srv_to_join_->set_matched_idx(resp->get_next_idx() - 1);
sync_log_to_new_srv(resp->get_next_idx());
收到了节点的resp后,那么给该节点添加hb任务,设置next_idx与match_idx。然后调用sync_log_to_new_srv再去同步一遍给该节点,直到两者数据一致,否则重复发送sync_log_request。(类似redis里面主从同步的时候,把主节点在主从同步时候的写入操作写入一个buffer,然后在最后再发给从节点再同步一遍。)
根据上面sync_log_to_new_srv源码我们可以看到,sync_log不是单纯采用request_append_entry去数据同步。因为新加入的节点落后很多,所以leader采用的机制是先发送snapshot,直到新节点的last_log_idx大于leader的start_idx,接着分情况讨论,如果两者idx的差(gap) < ctx_->params_->log_sync_stop_gap_,说明gap不足以打包成log_pack,则调用request_append_entry,否则打包成log_pack发送。
- 2.case msg_type::join_cluster_response类型:
case msg_type::join_cluster_response:
if (srv_to_join_)
{
if (resp->get_accepted())
{
l_->debug("new server confirms it will join, start syncing logs to it");
sync_log_to_new_srv(resp->get_next_idx());
}
else
{
l_->debug("new server cannot accept the invitation, give up");
}
}
else
{
l_->debug("no server to join, drop the message");
}
break;
解析完上一个resp,这里就好理解了。如果srv_to_join存在则调用sync_log_to_new_srv来数据同步。
- 3.case msg_type::leave_cluster_response类型:
case msg_type::leave_cluster_response:
if (!resp->get_accepted())
{
l_->debug("peer doesn't accept to stepping down, stop proceeding");
return;
}
l_->debug("peer accepted to stepping down, removing this server from cluster");
rm_srv_from_cluster(resp->get_src());
break;
重点是rm_srv_from_cluster。
void raft_server::rm_srv_from_cluster(int32 srv_id)
{
ptr<cluster_config> new_conf = cs_new<cluster_config>(log_store_->next_slot(), config_->get_log_idx());
for (cluster_config::const_srv_itor it = config_->get_servers().begin(); it != config_->get_servers().end(); ++it)
{
if ((*it)->get_id() != srv_id)
{
new_conf->get_servers().push_back(*it);
}
}
l_->info(lstrfmt("removed a server from configuration and save the configuration to log store at %llu")
.fmt(new_conf->get_log_idx()));
config_changing_ = true;
bufptr new_conf_buf(new_conf->serialize());
ptr<log_entry> entry(cs_new<log_entry>(state_->get_term(), std::move(new_conf_buf), log_val_type::conf));
log_store_->append(entry);
request_append_entries();
}
先把要移除的srv从leader的config移除,然后把cluster的更改写入leader的log_store,调用request_append_entries广播给各个follower,达到所有节点更改的效果。
- 4.install_snapshot_response类型:
case msg_type::install_snapshot_response:
{
if (!srv_to_join_)
{
l_->info("no server to join, the response must be very old.");
return;
}
if (!resp->get_accepted())
{
l_->info("peer doesn't accept the snapshot installation request");
return;
}
ptr<snapshot_sync_ctx> sync_ctx = srv_to_join_->get_snapshot_sync_ctx();
if (sync_ctx == nilptr)
{
l_->err("Bug! SnapshotSyncContext must not be null");
ctx_->state_mgr_->system_exit(-1);
::exit(-1);
return;
}
if (resp->get_next_idx() >= sync_ctx->get_snapshot()->size())
{
// snapshot is done
ptr<snapshot> nil_snap;
l_->debug("snapshot has been copied and applied to new server, continue to sync logs after snapshot");
srv_to_join_->set_snapshot_in_sync(nil_snap);
srv_to_join_->set_next_log_idx(sync_ctx->get_snapshot()->get_last_log_idx() + 1);
srv_to_join_->set_matched_idx(sync_ctx->get_snapshot()->get_last_log_idx());
}
else
{
sync_ctx->set_offset(resp->get_next_idx());
l_->debug(sstrfmt("continue to send snapshot to new server at offset %llu").fmt(resp->get_next_idx()));
}
sync_log_to_new_srv(srv_to_join_->get_next_log_idx());
}
break;
(1)因为snapshot是分段传送的,如果没有srv_to_join_,则根本无法跟踪offset,因此报错。
(2)if (sync_ctx == nilptr)与(1)同理,必须要有sync_ctx,否则无法跟踪offset。
(3)在前面handle_install_snapshot_resp里面我们说过resp->get_next_idx()记录的其实是snapshot的offset,根据offset我们分两种情况,如果offset >= sync_ctx->get_snapshot()->size()说明snapshot已经完成了,更新next_idx与match_idx。否则从resp里面的offset继续同步。
(4)安装完snapshot之后还要调用更小粒度的sync_log_to_new_srv(srv_to_join_->get_next_log_idx())来进一步同步数据。(类似redis里面持久化先应用RDB快速同步再用AOF更细粒度同步)
- 5.case msg_type::prevote_response类型:
case msg_type::prevote_response:
handle_prevote_resp(*resp);
break;
重点是handle_prevote_resp:
void raft_server::handle_prevote_resp(resp_msg& resp)
{
if (!prevote_state_)
{
l_->info(sstrfmt("Prevote has completed, term received: %llu, current term %llu")
.fmt(resp.get_term(), state_->get_term()));
return;
}
{
read_lock(peers_lock_);
bool vote_added = prevote_state_->add_voted_server(resp.get_src());
if (!vote_added)
{
l_->info("Prevote has from %d has been processed.");
return;
}
if (resp.get_accepted())
{
prevote_state_->inc_accepted_votes();
}
if (prevote_state_->get_accepted_votes() > (int32)((peers_.size() + 1) / 2))
{
l_->info(sstrfmt("Prevote passed for term %llu").fmt(state_->get_term()));
become_candidate();
}
else if (prevote_state_->num_of_votes() >= (peers_.size() + 1))
{
l_->info(sstrfmt("Prevote failed for term %llu").fmt(state_->get_term()));
prevote_state_->reset(); // still in prevote state, just reset the prevote state
restart_election_timer(); // restart election timer for a new round of prevote
}
}
}
如果得到票数超过一半,成为candidate,否则再开始新一轮prevote。
7.总结
- 1.log_entry是先让follower应用到状态机,只有超过半数以上的都应用了,leader才会应用到自己的状态机。具体到实现,可以将所有节点的match_idx排序然后取中位数。
- 2.对于snapshot的req与resp,可以利用idx这一项来记录offset。
- 3.对于新节点数据同步,采用snapshot,log_pack等方式加快数据同步。
- 4.数据同步需要多次同步,直到粒度满足要求。
- 5.因为snapshot是分段传送的,如果无法跟踪offset,说明resp错误。