我们上一篇说了领导的选举,本篇讨论日志和心跳包的处理。

由于有了领导选举的铺垫,我们这篇文章就要容易理解一些。当然,功能还没完善,以后会有后续文章来更新功能。 (os:怕再拖文章就写不完了)

日志

在日志和心跳包处理环节,使用的最多的就是日志管理了,需要对比,需要写入写出,所以,我们需要把日志处理功能给完善下。 在之前的文章中,我们定义了日志需要实现的接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Logger interface {
	Check(prevLogIndex int64, prevLogTerm int64, index int64, term int64) error
	Append(e *Entry) error
	FresherThan(index int64, term int64) bool
	Get(index int64) *Entry
	GetEntryForRequest(index int64) (*Entry, int64, int64)
	Index() int64
	LastIndex() int64
	Term() int64
}

我们就开始实现具体的功能吧!

日志结构

首先定义日志结构体

1
2
3
4
5
6
7
8
type Log struct {
	sync.RWMutex

	index int64
	term  int64

	Entries []*Entry
}

所有的操作都有可能会同步操作,所以我们需要加锁进行维护。日志里面肯定有具体的控制历史,就是Entry内容

1
2
3
4
5
6
type Entry struct {
	CmdID int64
	Index int64
	Term  int64
	Data  []byte
}

结构内容很容易理解。

日志操作

日志的操作有些事比较容易理解的,比如

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (l *Log) Index() int64 {
	return l.index
}

func (l *Log) LastIndex() int64 {
	return l.index - 1
}

func (l *Log) Term() int64 {
	return l.term
}

func (l *Log) Get(index int64) *Entry {
	if index < 0 {
		return nil
	}
	return l.Entries[index]
}

这几个功能就是返回特定的内容,不需要解释太多。接下来我们要讲几个比较复杂的功能。

1
2
3
4
5
6
7
8
9
func (l *Log) GetEntryForRequest(index int64) (*Entry, int64, int64) {
	if index < 0 {
		return nil, -1, -1
	}
	if index < 1 {
		return l.Entries[index], -1, -1
	}
	return l.Entries[index], l.Entries[index-1].Index, l.Entries[index-1].Term
}

这个是客户端请求一个Entry的时候,我们不单单要发送这条entry,还需要prevLogIndex和prevLogTerm,所以需要返回多个变量。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (l *Log) FresherThan(index int64, term int64) bool {
	if l.term > term {
		return true
	}

	if l.term < term {
		return false
	}

	return l.index > index
}

这个实现了判断新旧的功能,首先判断Term的新旧,再判断日志的新旧。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (l *Log) Check(prevLogIndex int64, prevLogTerm int64, index int64, term int64) error {
	if len(l.Entries) > 0 && index > 0 {
		if index > int64(len(l.Entries)) {
			return errors.New("behind")
		}
		lastGoodEntry := l.Entries[index-1]
		if lastGoodEntry.Term != prevLogTerm && lastGoodEntry.Index != prevLogIndex {
			return errors.New("inconsistent")
		}
	} else if index != 0 {
		return errors.New("missing")
	}
	return nil
}

这个功能稍微复杂点,主要是用来校验本地日志和远端日志是否匹配的问题。 如果远端日志序号比本地大,那么一定是本地落后了,或者假设序号一致,但是这条日志的任期和prevLogIndex和远端不一致,那就一定不连续。当然了,本地日志也有可能直接没有,那就直接缺少内容,需要进行复制操作了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (l *Log) Append(e *Entry) error {
	if e.Term != l.term {
		l.term = e.Term
	}

	if e.Index < l.index {
		l.Entries = l.Entries[:e.Index]
	}

	l.Entries = append(l.Entries, e)
	l.index = e.Index + 1
	return nil
}

有可能本地存储的日志,并不一定是全部节点公认的日志,需要将错的删除。否则,直接添加上就好了。

服务端操作

我们在上篇文章已经定义了服务端的一些功能,在这需要实现AppendEntries的功能

1
2
3
4
func (gServer *grpcServerImpl) AppendEntries(ctx context.Context, er *pb.EntryRequest) (*pb.EntryResponse, error) {
	eresp, err := gServer.node.AppendEntries(*er)
	return &eresp, err
}

节点功能

节点也像选举过程,一方面是接受别人的请求,一方面是向别人发送请求。

接收

节点这一端首先需要实现的功能是AppendEntries的功能

1
2
3
4
func (n *Node) AppendEntries(er pb.EntryRequest) (pb.EntryResponse, error) {
	n.AppendEntriesChan <- er
	return <-n.AppendEntriesResponseChan, nil
}

与选举过程非常相似,也是传递到channel,在循环里面处理。

1
2
3
case areq := <-n.AppendEntriesChan:
		aresp, _ := n.doAppendEntries(areq)
		n.AppendEntriesResponseChan <- aresp

具体的处理功能如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (n *Node) doAppendEntries(er pb.EntryRequest) (pb.EntryResponse, error) {
	if er.Term < n.Term {
		return pb.EntryResponse{n.Term, false}, nil
	}
	if n.Status != FOLLOWER {
		n.endElection()
		n.stepDown()
	}

	if er.Term > n.Term {
		n.setTerm(er.Term)
	}
	err := n.Log.Check(er.PrevLogIndex, er.PrevLogTerm, er.PrevLogIndex+1, er.Term)
	if err != nil {
		return pb.EntryResponse{Term: n.Term, Success: false}, nil
	}
	if bytes.Compare(er.Data, []byte("")) == 0 {
		return pb.EntryResponse{Term: n.Term, Success: true}, nil
	}
	e := &Entry{
		CmdID: er.LeaderCommit,
		Index: er.PrevLogIndex + 1,
		Term:  er.Term,
		Data:  er.Data,
	}
	err = n.Log.Append(e)
	return pb.EntryResponse{Term: n.Term, Success: err == nil}, nil
}

日志处理流程论文里面也详细规定了。 1. 任期小于本地任期的话,就可以直接返回错误了 2. 如果远端日志内容跟本地完全不匹配,也可以直接返回错误了。 3. 如果是心跳包的话,以上检查通过就通过即可。 4. 如果不是心跳包,添加到日志体,如果成功添加,返回通过。

还有几点需要注意的,节点收到这个消息的时候,有可能是在选举阶段,既然收到消息就证明有节点已经成功当选,本节点就不必再进行选举了,停止选举,退回Follower状态即可。

发送

什么时候会发送呢?心跳包时间到了的时候,或者,收到客户端发来执行命令请求的时候。

1
2
3
4
5
6
7
8
followerTimer := time.NewTicker(200 * time.Millisecond)
...
case <-followerTimer.C:
	n.updateFollowers()
	continue
case creq := <-n.CommandChan:
	n.doCommand(creq)
	n.updateFollowers()

这两个过程都会触发向别人发送的功能。 先看客户端请求执行一定指令的时候触发的功能吧

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (n *Node) doCommand(cr CommandRequest) {
	if n.Status != LEADER {
		cr.ResponseChan <- CommandResponse{LeaderID: n.VoteFor, Success: false}
	}

	e := &Entry{
		CmdID: cr.ID,
		Index: n.Log.Index(),
		Term:  n.Term,
		Data:  cr.Body,
	}

	err := n.Log.Append(e)
	if err != nil {
		cr.ResponseChan <- CommandResponse{LeaderID: n.VoteFor, Success: false}
	}

	cr.State = Logged
	cr.ReplicationCount++
	n.Uncommitted[cr.ID] = &cr
}

收到别热的请求,首先得保证自己是Leader,然后构建日志体,写入到日志List中。为了保证日志能够最终统计是否可以被全部执行,需要记录已经执行了多少次。 然后Leader执行完后,就更新他的Follower

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (n *Node) updateFollowers() {
	var er *pb.EntryRequest
	if n.Status != LEADER {
		return
	}
	for _, peer := range n.Cluster {
		if n.Log.LastIndex() < peer.NextIndex {
			// heartbeat
			_, prevLogIndex, prevLogTerm := n.Log.GetEntryForRequest(n.Log.LastIndex())

			er = &pb.EntryRequest{
				LeaderCommit: -1,
				Term:         n.Term,
				LeaderID:     n.ID,
				PrevLogIndex: prevLogIndex,
				PrevLogTerm:  prevLogTerm,
				Data:         []byte("")}
		} else {
			entry, prevLogIndex, prevLogTerm := n.Log.GetEntryForRequest(peer.NextIndex)
			er = &pb.EntryRequest{
				LeaderCommit: entry.CmdID,
				Term:         n.Term,
				LeaderID:     n.ID,
				PrevLogIndex: prevLogIndex,
				PrevLogTerm:  prevLogTerm,
				Data:         entry.Data}
		}
		_, err := n.Client.AppendEntries(peer.ID, er)
		if err != nil {
			peer.NextIndex--
			if peer.NextIndex < 0 {
				peer.NextIndex = 0
			}
			continue
		}

		if er.LeaderCommit == -1 {
			// skip commit checks for heartbeats
			continue
		}
		majority := int32((len(n.Cluster)+1)/2 + 1)
		cr := n.Uncommitted[er.LeaderCommit]
		cr.ReplicationCount++
		if cr.ReplicationCount >= majority && cr.State != Committed {
			cr.State = Committed
			log.Printf("[%s] !!! apply %+v", n.ID, cr)
			err := n.StateMachine.Apply(cr)
			if err != nil {
				// TODO: what do we do here?
			}
			cr.ResponseChan <- CommandResponse{LeaderID: n.VoteFor, Success: true}
		}
		peer.NextIndex++
	}
}

更新Follower的前提是自己是Leader,否则就没必要进行处理。 这块代码长度较长,但是不难理解,主要目标就是如果本地日志内容和peer日志内容一样新,就只发送心跳包,否则发送日志。然后统计日志被大多数执行了的话,就执行日志。然后更新peer的日志位置。 还有一个判断,如果远端的日志太旧了,就减少Index,直到找到远端的位置,再发送对应的日志。 我们的snapshot还没有,而且commitIndex还没处理,后续的文章会继续!