peer结构体是Peer接口的实现,peer是远程raft节点的代表,本地raft节点通过peer给远程raft节点发送消息
每个peer有两个发送消息的基本机制:stream和pipeline
stream是长连接,它始终可以传输消息。除了通常的stream,还有优化的stream用于leader发送msgApp消息,因为msgApp占了大部分的消息
pipeline是一系列客户端发送请求到远程节点,在传输完成后会立即关闭连接,主要负责数据量较大、发送频率较低的消息。如MsgSnap消息等。
peer结构体及字段的含义如下:
type peer struct {
lg *zap.Logger
localID types.ID //当前节点ID
// id of the remote raft peer node
id types.ID //该peer实例对应的节点ID,对端ID
r Raft //Raft接口,在Raft接口实现的底层封装了etcd-raft模块
status *peerStatus //peer的状态
/*
每个节点可能提供了多个URL供其他节点正常访问,当其中一个访问失败时,我们应该可以尝试访问另一个。
urlPicker提供的主要功能就是在这些URL之间进行切换
*/
picker *urlPicker
msgAppV2Writer *streamWriter
writer *streamWriter //负责向Stream消息通道中写消息
pipeline *pipeline //pipeline消息通道
snapSender *snapshotSender // snapshot sender to send v3 snapshot messages 负责发送快照数据
msgAppV2Reader *streamReader
msgAppReader *streamReader //负责从Stream消息通道中读消息
recvc chan raftpb.Message //从Stream消息通道中读取到消息之后,会通过该通道将消息交给Raft接口,然后由它返回给底层etcd-raft模块进行处理
propc chan raftpb.Message //从Stream消息通道中读取到MsgProp类型的消息之后,会通过该通道将MsgApp消息交给Raft接口,然后由它返回给底层的etcd-raft模块进行处理
mu sync.Mutex
paused bool //是否暂停向其他节点发送消息
cancel context.CancelFunc // cancel pending works in go routine created by peer. context的取消函数
stopc chan struct{}
}
以上就是peer结构体,peer实现读写分离的方式和对端进行交互。
一、peer的创建
peer的创建过程主要在startPeer函数中
func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
//省略日志处理
status := newPeerStatus(t.Logger, t.ID, peerID) //创建节点的状态信息 status
picker := newURLPicker(urls) //根据节点提供的URL创建urlPicker
errorc := t.ErrorC
r := t.Raft //底层的Raft状态及
pipeline := &pipeline{ //创建pipeline实例
peerID: peerID,
tr: t,
picker: picker,
status: status,
followerStats: fs,
raft: r,
errorc: errorc,
}
pipeline.start() //启动pipeline
p := &peer{ //创建peer实例
lg: t.Logger,
localID: t.ID,
id: peerID,
r: r,
status: status,
picker: picker,
msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r), //创建并启动streamWriter
writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
pipeline: pipeline,
snapSender: newSnapshotSender(t, picker, peerID, status),
recvc: make(chan raftpb.Message, recvBufSize), //创建recvc通道
propc: make(chan raftpb.Message, maxPendingProposals), //创建propc通道
stopc: make(chan struct{}),
}
//启动单独的goroutine,它负责将recvc通道中读取消息,该通道中的消息就是从对端节点发送过来的消息,
// 然后将读取到的消息交给底层的Raft状态机进行处理
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
go func() {
for {
select {
case mm := <-p.recvc: //从recvc通道中获取连接上读取到的消息
if err := r.Process(ctx, mm); err != nil { //将Message交给底层Raft状态机处理
if t.Logger != nil {
t.Logger.Warn("failed to process Raft message", zap.Error(err))
} else {
plog.Warningf("failed to process raft message (%v)", err)
}
}
case <-p.stopc:
return
}
}
}()
// r.Process might block for processing proposal when there is no leader.
// Thus propc must be put into a separate routine with recvc to avoid blocking
// processing other raft messages.
//在底层的Raft状态机处理MsgProp类型的消息时,可能会阻塞,所以启动单独的goroutine来处理
go func() {
for {
select {
case mm := <-p.propc: //从propc通道中获取MsgProp类型的Message
if err := r.Process(ctx, mm); err != nil { //将Message交给底层的Raft状态机来处理
plog.Warningf("failed to process raft message (%v)", err)
}
case <-p.stopc:
return
}
}
}()
//创建并启动streamReader实例,主要负责从Stream消息通道上读取消息
p.msgAppV2Reader = &streamReader{
lg: t.Logger,
peerID: peerID,
typ: streamTypeMsgAppV2,
tr: t,
picker: picker,
status: status,
recvc: p.recvc,
propc: p.propc,
rl: rate.NewLimiter(t.DialRetryFrequency, 1),
}
p.msgAppReader = &streamReader{
lg: t.Logger,
peerID: peerID,
typ: streamTypeMessage,
tr: t,
picker: picker,
status: status,
recvc: p.recvc,
propc: p.propc,
rl: rate.NewLimiter(t.DialRetryFrequency, 1),
}
p.msgAppV2Reader.start() //V2版本
p.msgAppReader.start()
return p
}
status用于记录当且peer的可用性状态
picker从URL列表中获取可用URL的机制
pipeline的创建过程我们前面已经详细介绍,这里就不在累述。
streamWriter和streamReader 上一节也详细介绍过了。
这里看到开启两个协程分别获取p.recvc通道和p.propc通道中的值,并将获取的值通过Process方法 交给底层的raft模块。recvc和propc通道中的值是上节我们讲streamReader的时候从对端节点读取到的消息。
二、向对端发送消息
peer的send方法主要用于向对端发送消息。
func (p *peer) send(m raftpb.Message) {
p.mu.Lock()
paused := p.paused
p.mu.Unlock()
if paused { //如果暂停则返回
return
}
//根据消息类型选择一个通道去发送消息
writec, name := p.pick(m)
select {
case writec <- m: //将消息发送到writec通道中,等待发送
default:
//如果出现阻塞,则将消息报告给底层Raft状态机,这里会根据消息类型选择合适的报告方法
p.r.ReportUnreachable(m.To)
if isMsgSnap(m) { //如果消息类型的MsgSnap则通知底层的raft-node,当前快照数据发送失败
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
}
//其他处理省略
}
}
send主要作用是向对端节点发送消息,pick方法根据消息类型和协议版本选择一个通道发送消息。该通道主要是pipeline和stream两类消息,pipeline通道通过HTTP POST发送给对端,stream通道和对端维护的长连接。
如果出现阻塞,则将消息报告给底层的raft状态机。
pick方法
根据消息类型和版本获取对应的通道发送对端节点
func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
var ok bool
// Considering MsgSnap may have a big size, e.g., 1G, and will block
// stream for a long time, only use one of the N pipelines to send MsgSnap.
if isMsgSnap(m) {
return p.pipeline.msgc, pipelineMsg
} else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
return writec, streamAppV2
} else if writec, ok = p.writer.writec(); ok {
return writec, streamMsg
}
return p.pipeline.msgc, pipelineMsg
}
1.如果该消息是MsgSnap快照消息,则选择pipeline.msgc通道,返回的channel名为pipeline
2.如果该消息是MsgApp消息,则选择msgAppV2Writer的msgc通道,channel名为streamMsgAppV2
3.最后选择writer的msgc通道,channel名为streamMsg
如果Stream通道不可用,则使用Pipeline消息通道发送所有类型的消息
peer中的方法都比较简单这里就不在叙述。