首先看一下 raft node 之间传递的基本消息(比如 leader 选举,AppendLog)类型 Message protobuf 定义

message Message {
	optional MessageType type        = 1 ;
	optional uint64      to          = 2 ;
	optional uint64      from        = 3 ;
  
  // 整个消息发出去时,所处的任期
	optional uint64      term        = 4 ;

	// logTerm is generally used for appending Raft logs to followers. For example,
	// (type=MsgApp,index=100,logTerm=5) means leader appends entries starting at
	// index=101, and the term of entry at index 100 is 5.
	// (type=MsgAppResp,reject=true,index=100,logTerm=5) means follower rejects some
	// entries from its leader as it already has an entry with term 5 at index 100.
	// 该消息携带的第一条Entry记录的的Term值
  optional uint64      logTerm     = 5 ;

// 索引值,该索引值和消息的类型有关,不同的消息类型代表的含义不同
	optional uint64      index       = 6 ;

	repeated Entry       entries     = 7 ;

// 已经提交的日志的索引值,用来向别人同步日志的提交信息。
	optional uint64      commit      = 8 ;

// 在传输快照时,该字段保存了快照数据
	optional Snapshot    snapshot    = 9 ;

	optional bool        reject      = 10;
	optional uint64      rejectHint  = 11;
	optional bytes       context     = 12;
}

message Entry {
	optional uint64     Term  = 2; 

// Index:当前这个entry在整个raft日志中的位置索引,
// 有了Term和Index之后,一个`log entry`就能被唯一标识。
	optional uint64     Index = 3; 
	optional EntryType  Type  = 1;
	optional bytes      Data  = 4;
}

raftexample 目录提供了一个如何使用 raft library 的例子, 首先从 main.go 来看如何启动一个 raft node。

func newRaftNode(...) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {

	commitC := make(chan *commit)
	errorC := make(chan error)

	rc := &raftNode{
		proposeC:    proposeC,
		confChangeC: confChangeC,
		commitC:     commitC,
		errorC:      errorC,
		id:          id,
	....
	}
	go rc.startRaft()
	return commitC, errorC, rc.snapshotterReady
}

从阅读参考资料的过程中已经得知,etcd raft 的实现是需要调用者配合的,因此newRaftNode的返回的这几个 channel 就是给上层的应用通过 channel 和 raftNode 进行交互。

其他细节先省略,我们直接看 startRaft()

func (rc *raftNode) startRaft() {
	// 创建 WAL 实例,然后加载快照并回放 WAL 日志
	rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
	oldwal := wal.Exist(rc.waldir)
	// 首先会读取快照数据,
	// 在快照数据中记录了该快照包含的最后一条 Entry 记录的 Term 值 和 索引值。
	// 然后根据 Term 值 和 索引值确定读取 WAL 日志文件的位置, 并进行日志记录的读取。
	rc.wal = rc.replayWAL()

	// signal replay has finished
	rc.snapshotterReady <- rc.snapshotter

...
	// 初始化底层的 etcd-raft 模块,这里会根据 WAL 日志的回放情况,
	// 判断当前节点是首次启动还是重新启动
	if oldwal || rc.join {
		rc.node = raft.RestartNode(c)
	} else {
		// 初次启动
		rc.node = raft.StartNode(c, rpeers)
	}

	// 创建 Transport 实例并启动,他负责 raft 节点之间的网络通信服务
	rc.transport = &rafthttp.Transport{
	...
	}

	rc.transport.Start()
	for i := range rc.peers {
		if i+1 != rc.id {
			// 建立与集群中其他各个节点的连接
			rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
		}
	}

	// 启动一个goroutine,其中会监听当前节点与集群中其他节点之间的网络连接
	go rc.serveRaft()
	// 启动后台 goroutine 处理上层应用与底层 etcd-raft 模块的交互
	go rc.serveChannels()
}

snapshot 以及上层应用交互的部分暂时先略过。

rc.serveChannels() 的作用是处理上层应用于 raft 库的交互,在这个函数中设置了 ticker := time.NewTicker(100 * time.*Millisecond*) 控制了 raft.node 结构中的 RawNode 的 tick(),进而控制了 raft 中的 tick 函数指针。

再来理一理这些结构体之间的关系:

  1. raftexample.raftNode 是上层应用对一个 raft 节点的封装,它除了有上面提到的 propose channel 等channel以外,还有一个 raft.Node 成员。
  2. raft.Node是 raft 库中的一个抽象接口,而 raft.node 结构体实现了这个接口。
  3. raft.node(注意小写) 是一个私有结构体,封装了一堆 channel,以及最重要的 RawNode。
  4. raft.RawNode 包含了最重要的 raft.raft 实现。

所以,在 StartNode() 中我们看到,先 new 一个 RawNode,然后再封装成一个 raft.Node 返回给应用层 raftexample.raftNode 。

有关 raft node 启动和选举 leader 是在 StartNode()

func StartNode(c *Config, peers []Peer) Node {

// 初始化RawNode 和 raft
	rn, err := NewRawNode(c)
	if err != nil {
		panic(err)
	}

// 3 个 node 的 term 都设置为 1, role 都为 follower
	err = rn.Bootstrap(peers)
	if err != nil {
		c.Logger.Warningf("error occurred during starting a new node: %v", err)
	}

	n := newNode(rn)

// run() 是一个无限循环函数,follower 开始竞选 leader
	go n.run()
	return &n
}

值得注意的是 n.run() 是 raft.node 结构体的无限循环,用于处理应用层发来的消息,而 raft 状态机是 raft.Step()

在 raft.go 中有一组函数用来设置 follower, candidate, leader 的 step 函数。

func (r *raft) becomeFollower(term uint64, lead uint64) {
	r.step = stepFollower
	r.reset(term)
	r.tick = r.tickElection
	r.lead = lead
	r.state = StateFollower
	r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

func (r *raft) becomeCandidate() {
	r.step = stepCandidate
	r.reset(r.Term + 1)
	r.tick = r.tickElection
	r.Vote = r.id
	r.state = StateCandidate
	r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}

func (r *raft) becomeLeader() {
	r.step = stepLeader
	r.reset(r.Term)
	r.tick = r.tickHeartbeat
	r.lead = r.id
	r.state = StateLeader
}

他们的调用关系是这样的: n.run() 调用 r.Step(),r.Step() 中调用每个 role 各自的 step 函数

开始选举

raftexample binary 启动之后如何开始选举的呢?

我们知道一开始大家都是 follower,而且应用层 rc.serveChannels() 有个 100 ms 触发的 timer, 通过控制 raft.node 结构中的 RawNode 的 tick(),进而控制 raft 中的 tick 函数指针。

而 follower, candidate, leader 的 tick 指针在上面的 3 个函数中都被赋值给了 tickElection() 或者 tickHeartbeat(),所以选举就是从 tickElection() 开始的。

func (r *raft) tickElection() {
	r.electionElapsed++

	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil {
			r.logger.Debugf("error occurred during election: %v", err)
		}
	}
}

func (r *raft) Step(m pb.Message) error {
	switch m.Type {
	case pb.MsgHup:
			r.hup(campaignElection)
	}
}

func (r *raft) campaign(t CampaignType) {
   for _, id := range ids {
		r.send(pb.Message{
             Term: term, To: id, 
             Type: voteMsg, 
             Index: r.raftLog.lastIndex(), 
             LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}

可以看到,candidate 发起的 投票请求 所包含的信息与论文 3.4 的 RequestVote RPC 所示一样。

raft-request-vote

那么,其他节点处理投票请求的地方在哪呢? 在 raft.Step(),而不是在具体的 r.step() 函数指针指向的 follower, leader 各自的 step 函数。

由此可见,raft.Step() 首先对收到的 message 做一些基本的检查,比如比较消息的 Term 和 节点的 term 大小关系,然后做一些基本的操作,最后再调用 各个role 自己的 step 函数。

func (r *raft) Step(m pb.Message) error {
  case pb.MsgVote, pb.MsgPreVote:
		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {

			r.send(pb.Message{To: m.From, 
             Term: m.Term, 
             Type: voteRespMsgType(m.Type)})

			if m.Type == pb.MsgVote {
				// Only record real votes.
				r.electionElapsed = 0
				r.Vote = m.From

			  // r.lead leader 的 id 不是在回复投票的时候设置的
        // 而是在收到 append,heartbeat 的时候顺便设置的。
			}

最后,candidate 就在 raft.stepCandidate()函数中调用 becomeLeader() 切换成 leader,并且开始发送 append 信息。

参考资料

  1. https://www.cnblogs.com/ricklz/p/15155095.html