【注意】最后更新于 April 17, 2018,文中内容可能已过时,请谨慎使用。
我们上一篇说了领导的选举,本篇讨论日志和心跳包的处理。
由于有了领导选举的铺垫,我们这篇文章就要容易理解一些。当然,功能还没完善,以后会有后续文章来更新功能。
(os:怕再拖文章就写不完了)
日志
在日志和心跳包处理环节,使用的最多的就是日志管理了,需要对比,需要写入写出,所以,我们需要把日志处理功能给完善下。
在之前的文章中,我们定义了日志需要实现的接口
1
2
3
4
5
6
7
8
9
10
|
type Logger interface {
Check(prevLogIndex int64, prevLogTerm int64, index int64, term int64) error
Append(e *Entry) error
FresherThan(index int64, term int64) bool
Get(index int64) *Entry
GetEntryForRequest(index int64) (*Entry, int64, int64)
Index() int64
LastIndex() int64
Term() int64
}
|
我们就开始实现具体的功能吧!
日志结构
首先定义日志结构体
1
2
3
4
5
6
7
8
|
type Log struct {
sync.RWMutex
index int64
term int64
Entries []*Entry
}
|
所有的操作都有可能会同步操作,所以我们需要加锁进行维护。日志里面肯定有具体的控制历史,就是Entry内容
1
2
3
4
5
6
|
type Entry struct {
CmdID int64
Index int64
Term int64
Data []byte
}
|
结构内容很容易理解。
日志操作
日志的操作有些事比较容易理解的,比如
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (l *Log) Index() int64 {
return l.index
}
func (l *Log) LastIndex() int64 {
return l.index - 1
}
func (l *Log) Term() int64 {
return l.term
}
func (l *Log) Get(index int64) *Entry {
if index < 0 {
return nil
}
return l.Entries[index]
}
|
这几个功能就是返回特定的内容,不需要解释太多。接下来我们要讲几个比较复杂的功能。
1
2
3
4
5
6
7
8
9
|
func (l *Log) GetEntryForRequest(index int64) (*Entry, int64, int64) {
if index < 0 {
return nil, -1, -1
}
if index < 1 {
return l.Entries[index], -1, -1
}
return l.Entries[index], l.Entries[index-1].Index, l.Entries[index-1].Term
}
|
这个是客户端请求一个Entry的时候,我们不单单要发送这条entry,还需要prevLogIndex和prevLogTerm,所以需要返回多个变量。
1
2
3
4
5
6
7
8
9
10
11
|
func (l *Log) FresherThan(index int64, term int64) bool {
if l.term > term {
return true
}
if l.term < term {
return false
}
return l.index > index
}
|
这个实现了判断新旧的功能,首先判断Term的新旧,再判断日志的新旧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (l *Log) Check(prevLogIndex int64, prevLogTerm int64, index int64, term int64) error {
if len(l.Entries) > 0 && index > 0 {
if index > int64(len(l.Entries)) {
return errors.New("behind")
}
lastGoodEntry := l.Entries[index-1]
if lastGoodEntry.Term != prevLogTerm && lastGoodEntry.Index != prevLogIndex {
return errors.New("inconsistent")
}
} else if index != 0 {
return errors.New("missing")
}
return nil
}
|
这个功能稍微复杂点,主要是用来校验本地日志和远端日志是否匹配的问题。
如果远端日志序号比本地大,那么一定是本地落后了,或者假设序号一致,但是这条日志的任期和prevLogIndex和远端不一致,那就一定不连续。当然了,本地日志也有可能直接没有,那就直接缺少内容,需要进行复制操作了。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (l *Log) Append(e *Entry) error {
if e.Term != l.term {
l.term = e.Term
}
if e.Index < l.index {
l.Entries = l.Entries[:e.Index]
}
l.Entries = append(l.Entries, e)
l.index = e.Index + 1
return nil
}
|
有可能本地存储的日志,并不一定是全部节点公认的日志,需要将错的删除。否则,直接添加上就好了。
服务端操作
我们在上篇文章已经定义了服务端的一些功能,在这需要实现AppendEntries的功能
1
2
3
4
|
func (gServer *grpcServerImpl) AppendEntries(ctx context.Context, er *pb.EntryRequest) (*pb.EntryResponse, error) {
eresp, err := gServer.node.AppendEntries(*er)
return &eresp, err
}
|
节点功能
节点也像选举过程,一方面是接受别人的请求,一方面是向别人发送请求。
接收
节点这一端首先需要实现的功能是AppendEntries的功能
1
2
3
4
|
func (n *Node) AppendEntries(er pb.EntryRequest) (pb.EntryResponse, error) {
n.AppendEntriesChan <- er
return <-n.AppendEntriesResponseChan, nil
}
|
与选举过程非常相似,也是传递到channel,在循环里面处理。
1
2
3
|
case areq := <-n.AppendEntriesChan:
aresp, _ := n.doAppendEntries(areq)
n.AppendEntriesResponseChan <- aresp
|
具体的处理功能如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func (n *Node) doAppendEntries(er pb.EntryRequest) (pb.EntryResponse, error) {
if er.Term < n.Term {
return pb.EntryResponse{n.Term, false}, nil
}
if n.Status != FOLLOWER {
n.endElection()
n.stepDown()
}
if er.Term > n.Term {
n.setTerm(er.Term)
}
err := n.Log.Check(er.PrevLogIndex, er.PrevLogTerm, er.PrevLogIndex+1, er.Term)
if err != nil {
return pb.EntryResponse{Term: n.Term, Success: false}, nil
}
if bytes.Compare(er.Data, []byte("")) == 0 {
return pb.EntryResponse{Term: n.Term, Success: true}, nil
}
e := &Entry{
CmdID: er.LeaderCommit,
Index: er.PrevLogIndex + 1,
Term: er.Term,
Data: er.Data,
}
err = n.Log.Append(e)
return pb.EntryResponse{Term: n.Term, Success: err == nil}, nil
}
|
日志处理流程论文里面也详细规定了。
1. 任期小于本地任期的话,就可以直接返回错误了
2. 如果远端日志内容跟本地完全不匹配,也可以直接返回错误了。
3. 如果是心跳包的话,以上检查通过就通过即可。
4. 如果不是心跳包,添加到日志体,如果成功添加,返回通过。
还有几点需要注意的,节点收到这个消息的时候,有可能是在选举阶段,既然收到消息就证明有节点已经成功当选,本节点就不必再进行选举了,停止选举,退回Follower状态即可。
发送
什么时候会发送呢?心跳包时间到了的时候,或者,收到客户端发来执行命令请求的时候。
1
2
3
4
5
6
7
8
|
followerTimer := time.NewTicker(200 * time.Millisecond)
...
case <-followerTimer.C:
n.updateFollowers()
continue
case creq := <-n.CommandChan:
n.doCommand(creq)
n.updateFollowers()
|
这两个过程都会触发向别人发送的功能。
先看客户端请求执行一定指令的时候触发的功能吧
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func (n *Node) doCommand(cr CommandRequest) {
if n.Status != LEADER {
cr.ResponseChan <- CommandResponse{LeaderID: n.VoteFor, Success: false}
}
e := &Entry{
CmdID: cr.ID,
Index: n.Log.Index(),
Term: n.Term,
Data: cr.Body,
}
err := n.Log.Append(e)
if err != nil {
cr.ResponseChan <- CommandResponse{LeaderID: n.VoteFor, Success: false}
}
cr.State = Logged
cr.ReplicationCount++
n.Uncommitted[cr.ID] = &cr
}
|
收到别热的请求,首先得保证自己是Leader,然后构建日志体,写入到日志List中。为了保证日志能够最终统计是否可以被全部执行,需要记录已经执行了多少次。
然后Leader执行完后,就更新他的Follower
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
func (n *Node) updateFollowers() {
var er *pb.EntryRequest
if n.Status != LEADER {
return
}
for _, peer := range n.Cluster {
if n.Log.LastIndex() < peer.NextIndex {
// heartbeat
_, prevLogIndex, prevLogTerm := n.Log.GetEntryForRequest(n.Log.LastIndex())
er = &pb.EntryRequest{
LeaderCommit: -1,
Term: n.Term,
LeaderID: n.ID,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Data: []byte("")}
} else {
entry, prevLogIndex, prevLogTerm := n.Log.GetEntryForRequest(peer.NextIndex)
er = &pb.EntryRequest{
LeaderCommit: entry.CmdID,
Term: n.Term,
LeaderID: n.ID,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Data: entry.Data}
}
_, err := n.Client.AppendEntries(peer.ID, er)
if err != nil {
peer.NextIndex--
if peer.NextIndex < 0 {
peer.NextIndex = 0
}
continue
}
if er.LeaderCommit == -1 {
// skip commit checks for heartbeats
continue
}
majority := int32((len(n.Cluster)+1)/2 + 1)
cr := n.Uncommitted[er.LeaderCommit]
cr.ReplicationCount++
if cr.ReplicationCount >= majority && cr.State != Committed {
cr.State = Committed
log.Printf("[%s] !!! apply %+v", n.ID, cr)
err := n.StateMachine.Apply(cr)
if err != nil {
// TODO: what do we do here?
}
cr.ResponseChan <- CommandResponse{LeaderID: n.VoteFor, Success: true}
}
peer.NextIndex++
}
}
|
更新Follower的前提是自己是Leader,否则就没必要进行处理。
这块代码长度较长,但是不难理解,主要目标就是如果本地日志内容和peer日志内容一样新,就只发送心跳包,否则发送日志。然后统计日志被大多数执行了的话,就执行日志。然后更新peer的日志位置。
还有一个判断,如果远端的日志太旧了,就减少Index,直到找到远端的位置,再发送对应的日志。
我们的snapshot还没有,而且commitIndex还没处理,后续的文章会继续!