首先看一下 raft node 之间传递的基本消息(比如 leader 选举,AppendLog)类型 Message
protobuf 定义
message Message {
optional MessageType type = 1 ;
optional uint64 to = 2 ;
optional uint64 from = 3 ;
// 整个消息发出去时,所处的任期
optional uint64 term = 4 ;
// logTerm is generally used for appending Raft logs to followers. For example,
// (type=MsgApp,index=100,logTerm=5) means leader appends entries starting at
// index=101, and the term of entry at index 100 is 5.
// (type=MsgAppResp,reject=true,index=100,logTerm=5) means follower rejects some
// entries from its leader as it already has an entry with term 5 at index 100.
// 该消息携带的第一条Entry记录的的Term值
optional uint64 logTerm = 5 ;
// 索引值,该索引值和消息的类型有关,不同的消息类型代表的含义不同
optional uint64 index = 6 ;
repeated Entry entries = 7 ;
// 已经提交的日志的索引值,用来向别人同步日志的提交信息。
optional uint64 commit = 8 ;
// 在传输快照时,该字段保存了快照数据
optional Snapshot snapshot = 9 ;
optional bool reject = 10;
optional uint64 rejectHint = 11;
optional bytes context = 12;
}
message Entry {
optional uint64 Term = 2;
// Index:当前这个entry在整个raft日志中的位置索引,
// 有了Term和Index之后,一个`log entry`就能被唯一标识。
optional uint64 Index = 3;
optional EntryType Type = 1;
optional bytes Data = 4;
}
raftexample 目录提供了一个如何使用 raft library 的例子, 首先从 main.go 来看如何启动一个 raft node。
func newRaftNode(...) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {
commitC := make(chan *commit)
errorC := make(chan error)
rc := &raftNode{
proposeC: proposeC,
confChangeC: confChangeC,
commitC: commitC,
errorC: errorC,
id: id,
....
}
go rc.startRaft()
return commitC, errorC, rc.snapshotterReady
}
从阅读参考资料的过程中已经得知,etcd raft 的实现是需要调用者配合的,因此newRaftNode
的返回的这几个 channel 就是给上层的应用通过 channel 和 raftNode 进行交互。
其他细节先省略,我们直接看 startRaft()
func (rc *raftNode) startRaft() {
// 创建 WAL 实例,然后加载快照并回放 WAL 日志
rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
oldwal := wal.Exist(rc.waldir)
// 首先会读取快照数据,
// 在快照数据中记录了该快照包含的最后一条 Entry 记录的 Term 值 和 索引值。
// 然后根据 Term 值 和 索引值确定读取 WAL 日志文件的位置, 并进行日志记录的读取。
rc.wal = rc.replayWAL()
// signal replay has finished
rc.snapshotterReady <- rc.snapshotter
...
// 初始化底层的 etcd-raft 模块,这里会根据 WAL 日志的回放情况,
// 判断当前节点是首次启动还是重新启动
if oldwal || rc.join {
rc.node = raft.RestartNode(c)
} else {
// 初次启动
rc.node = raft.StartNode(c, rpeers)
}
// 创建 Transport 实例并启动,他负责 raft 节点之间的网络通信服务
rc.transport = &rafthttp.Transport{
...
}
rc.transport.Start()
for i := range rc.peers {
if i+1 != rc.id {
// 建立与集群中其他各个节点的连接
rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}
// 启动一个goroutine,其中会监听当前节点与集群中其他节点之间的网络连接
go rc.serveRaft()
// 启动后台 goroutine 处理上层应用与底层 etcd-raft 模块的交互
go rc.serveChannels()
}
snapshot 以及上层应用交互的部分暂时先略过。
rc.serveChannels() 的作用是处理上层应用于 raft 库的交互,在这个函数中设置了 ticker := time.NewTicker(100 * time.*Millisecond*)
控制了 raft.node 结构中的 RawNode 的 tick(),进而控制了 raft 中的 tick 函数指针。
再来理一理这些结构体之间的关系:
raftexample.raftNode
是上层应用对一个 raft 节点的封装,它除了有上面提到的 propose channel 等channel以外,还有一个 raft.Node 成员。raft.Node
是 raft 库中的一个抽象接口,而 raft.node 结构体实现了这个接口。raft.node
(注意小写) 是一个私有结构体,封装了一堆 channel,以及最重要的 RawNode。raft.RawNode
包含了最重要的 raft.raft 实现。
所以,在 StartNode() 中我们看到,先 new 一个 RawNode,然后再封装成一个 raft.Node 返回给应用层 raftexample.raftNode 。
有关 raft node 启动和选举 leader 是在 StartNode()
func StartNode(c *Config, peers []Peer) Node {
// 初始化RawNode 和 raft
rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
// 3 个 node 的 term 都设置为 1, role 都为 follower
err = rn.Bootstrap(peers)
if err != nil {
c.Logger.Warningf("error occurred during starting a new node: %v", err)
}
n := newNode(rn)
// run() 是一个无限循环函数,follower 开始竞选 leader
go n.run()
return &n
}
值得注意的是 n.run()
是 raft.node 结构体的无限循环,用于处理应用层发来的消息,而 raft 状态机是 raft.Step()
在 raft.go 中有一组函数用来设置 follower, candidate, leader 的 step 函数。
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
func (r *raft) becomeCandidate() {
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}
func (r *raft) becomeLeader() {
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
}
他们的调用关系是这样的: n.run() 调用 r.Step(),r.Step() 中调用每个 role 各自的 step 函数
开始选举
raftexample binary 启动之后如何开始选举的呢?
我们知道一开始大家都是 follower,而且应用层 rc.serveChannels() 有个 100 ms 触发的 timer, 通过控制 raft.node 结构中的 RawNode 的 tick(),进而控制 raft 中的 tick 函数指针。
而 follower, candidate, leader 的 tick 指针在上面的 3 个函数中都被赋值给了 tickElection() 或者 tickHeartbeat(),所以选举就是从 tickElection() 开始的。
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil {
r.logger.Debugf("error occurred during election: %v", err)
}
}
}
func (r *raft) Step(m pb.Message) error {
switch m.Type {
case pb.MsgHup:
r.hup(campaignElection)
}
}
func (r *raft) campaign(t CampaignType) {
for _, id := range ids {
r.send(pb.Message{
Term: term, To: id,
Type: voteMsg,
Index: r.raftLog.lastIndex(),
LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
可以看到,candidate 发起的 投票请求
所包含的信息与论文 3.4 的 RequestVote RPC
所示一样。
那么,其他节点处理投票请求的地方在哪呢? 在 raft.Step()
,而不是在具体的 r.step() 函数指针指向的 follower, leader 各自的 step 函数。
由此可见,raft.Step() 首先对收到的 message 做一些基本的检查,比如比较消息的 Term 和 节点的 term 大小关系,然后做一些基本的操作,最后再调用 各个role 自己的 step 函数。
func (r *raft) Step(m pb.Message) error {
case pb.MsgVote, pb.MsgPreVote:
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.send(pb.Message{To: m.From,
Term: m.Term,
Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
// r.lead leader 的 id 不是在回复投票的时候设置的
// 而是在收到 append,heartbeat 的时候顺便设置的。
}
最后,candidate 就在 raft.stepCandidate()
函数中调用 becomeLeader() 切换成 leader,并且开始发送 append 信息。