【注意】最后更新于 April 17, 2018,文中内容可能已过时,请谨慎使用。
经过上一篇文章关于一些基础“接口”的定义,我们本篇开始操作核心,节点。
一个节点,有三个状态,Follower,Candidate和Leader。他们肯定具有一些共性,比如他们都具有自己的ID,自己的交互通信工具,执行状态机,超时时间,任期,投票人等。
这些信息我们肯定是无法以下就能够确定全的,我们不妨先知道有这些属性,用到了就做详细解释,没有用到就先忽略。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
type Node struct {
sync.RWMutex
ID string
Server GRPCServer
Client GRPCClient
Status int
Log Logger
StateMachine Applyer
ElectionTimeout time.Duration
Uncommitted map[int64]*CommandRequest
Term int64
ElectionTerm int64
VoteFor string
Ballots int
Cluster []*Peer
endElectionChan chan int
finishedElectionChan chan int
VoteResponseChan chan pb.VoteRespond
RequestVoteChan chan pb.VoteRequest
RequestVoteResponseChan chan pb.VoteRespond
AppendEntriesChan chan pb.EntryRequest
AppendEntriesResponseChan chan pb.EntryResponse
CommandChan chan CommandRequest
ExitChan chan int
}
|
为了方便调用,我们可以创建一个new函数,直接把需要传递的参数传递进来,然后需要初始化的进行初始化即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func NewNode(id string, client GRPCClient, server GRPCServer, logger Logger, applyer Applyer) *Node {
node := &Node{
ID: id,
Server: server,
Client: client,
Log: logger,
Status: FOLLOWER,
StateMachine: applyer,
ElectionTimeout: 500 * time.Millisecond,
Uncommitted: make(map[int64]*CommandRequest),
VoteResponseChan: make(chan pb.VoteRespond),
RequestVoteChan: make(chan pb.VoteRequest),
RequestVoteResponseChan: make(chan pb.VoteRespond),
AppendEntriesChan: make(chan pb.EntryRequest),
AppendEntriesResponseChan: make(chan pb.EntryResponse),
CommandChan: make(chan CommandRequest),
ExitChan: make(chan int),
}
return node
}
|
我们的功能按置顶向下的方式来分类。这样容易理解,也容易完善流程。
顶层功能
启动功能
1
2
3
4
5
6
|
func (n *Node) Serve() error {
err := n.Server.Start(n)
log.Println("serve start ")
go n.loop()
return err
}
|
我们需要第一步把GRPCServer启动起来,然后启动一个loop函数,这个loop函数是什么作用呢?
这个node节点需要处理超时选举,处理请求信息,这个内容需要一个循环函数一致进行监听处理。
2. 结束功能
1
2
3
|
func (n *Node) Stop() {
n.Server.Stop()
}
|
我们直接结束服务器即可。
3. 添加服务器
1
2
3
4
5
6
|
func (n *Node) AddToCluster(member string) {
p := &Peer{
ID: member,
}
n.Cluster = append(n.Cluster, p)
}
|
很明显,进来一个服务器,我们要添加到定义的Node里面的cluster变量里面。
底层功能
底层提供的功能非常多,我们需要一点一点解释。
为了方便理解,我们不妨按照启动流程进行解释。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
分 离 或 超 时
+-------+
| |
| |
| |
| |
超 时 | v 成 功
Follower +------------------------->Candidate+--------------------------->Leader
^ ^ + +
| | | |
| | | |
| | | |
| | 失 败 | |
| +------------------------------------+ 更 高 任 期 |
| |
+---------------------------------------------------------------------------+
|
这里多说一句,这个工具是asciiflow很方便
第一步就是选举超时,我们需要Follower将自己提升为Cnadidate
在循环里面应该是这样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (n *Node) loop() {
electionTimer := time.NewTimer(n.randomElectionTimeout())
for {
select {
case <-electionTimer.C:
n.electionTimeout()
case <-n.ExitChan:
n.stepDown()
goto exit
}
randElectionTimeout := n.randomElectionTimeout()
if !electionTimer.Reset(randElectionTimeout) {
electionTimer = time.NewTimer(randElectionTimeout)
}
}
exit:
log.Printf("[%s] exiting ioLoop()", n.ID)
}
|
这个n.randomElectionTimeout()
含义看单词意思就可以明白,就是生成随机的超时时间。在论文里面不同的节点具有不同的超时时间,这样才可以选举出leader。
那么就定义下这个函数吧
1
2
3
|
func (n *Node) randomElectionTimeout() time.Duration {
return n.ElectionTimeout + time.Duration(rand.Int63n(int64(n.ElectionTimeout)))
}
|
就是在基础超时时间的基础上添加一个随机数,生成不同的时间。
那么n.electionTimeout()
是怎么工作的呢?
1
2
3
4
5
6
7
8
9
|
func (n *Node) electionTimeout() {
if n.Status == LEADER {
return
}
n.Status = CANDIDATE
n.endElection()
n.nextTerm()
n.runForLeader()
}
|
毋庸置疑,如果节点已经是Leader了,就不需要再进行选举了。否则的话,把自己提升为Candidate,结束当前的选举,进入下一个任期,并且竞争Leader。
1
2
3
4
5
6
7
8
9
10
|
func (n *Node) endElection() {
if n.Status != CANDIDATE || n.endElectionChan == nil {
return
}
close(n.endElectionChan)
<-n.finishedElectionChan
n.endElectionChan = nil
n.finishedElectionChan = nil
}
|
结束选举也不难理解,如果当前状态不是候选人,那就没必要进行了,否则关闭channel
1
2
3
4
5
|
func (n *Node) nextTerm() {
n.Status = FOLLOWER
n.Term++
n.Ballots = 0
}
|
为什么只会设置状态为Follower呢?因为,进入下一个任期的出发条件不一定是选举过程,也有可能是已经选举出来了。当然了,在此处我们要把自己的投票初始化为0.
最后是要竞争选Leader了。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (n *Node) runForLeader() {
n.Status = CANDIDATE
n.Ballots++
n.ElectionTerm = n.Term
n.endElectionChan = make(chan int)
n.finishedElectionChan = make(chan int)
vr := &pb.VoteRequest{
Term: n.Term,
CandidateID: n.ID,
LastLogIndex: n.Log.Index(),
LastLogTerm: n.Log.Term()}
go n.gatherVotes(vr)
}
|
提升自己的状态,并且构造请求,发送给其他节点,收集投票结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (n *Node) gatherVotes(request *pb.VoteRequest) {
for _, peer := range n.Cluster {
go func(p string) {
vresp, err := n.Client.RequestVote(p, request)
log.Println(vresp)
if err != nil {
log.Printf("[%s] error in RequestVoteRPC() to %s - %s", n.ID, p, err)
return
}
n.VoteResponseChan <- *vresp
}(peer.ID)
}
<-n.endElectionChan
close(n.finishedElectionChan)
}
|
怎么发送的呢?就是按集群顺序全部发送,等到结果,把结果放到n.VoteResponseChan
里面
这里就体现出来n.endElectionChan
的意义了。如果不使用的话,这个go的协程就停止了。
然后,把收到的结果放到n.VoteResponseChan
里面,我们进行投票处理就好了!
在我们前面的循环里面要有个对VoteResponseChan
的处理
1
2
|
case vresp := <-n.VoteResponseChan:
n.doRespondVote(vresp)
|
具体的处理流程应当如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (n *Node) doRespondVote(vr pb.VoteRespond) {
if n.Status != CANDIDATE {
return
}
if vr.Term != n.ElectionTerm {
if vr.Term > n.ElectionTerm {
// we discovered a higher term
n.endElection()
n.setTerm(vr.Term)
}
return
}
if vr.VoteGranted {
n.voteGranted()
}
}
|
如果当前的状态不是Candidate的话,肯定不需要处理的(实际上,不是发出去让别人投票请求,谁会主动给他发相应呢?)
除此之外,如果回复的任期不一致,那一定不用进行选举了(有别人发了或者有人同意了其他人的Leader请求)然后,需要将任期更新到最新的。
1
2
3
4
5
6
7
8
9
10
11
|
func (n *Node) setTerm(term int64) {
// check freshness
if term <= n.Term {
return
}
n.Term = term
n.Status = FOLLOWER
n.VoteFor = ""
n.Ballots = 0
}
|
很容易理解,就是将任期更新,一切初始化。
我们继续上面的投票内容。经过层层筛选,回复的消息是我同意你当选,也就是vr.VoteGranted==True
的时候,我就认为有人投票同意了,开始进行处理。
1
2
3
4
5
6
7
8
|
func (n *Node) voteGranted() {
n.Ballots++
majority := (len(n.Cluster)+1)/2 + 1
if n.Ballots >= majority {
n.endElection()
n.promoteToLeader()
}
}
|
只要票数大于大多数,就认为通过了,就把自己提权提到Leader就好了!
1
2
3
4
5
6
|
func (n *Node) promoteToLeader() {
n.Status = LEADER
for _, peer := range n.Cluster {
peer.NextIndex = n.Log.Index()
}
}
|
我们关注的核心是目前将状态提升至Leader,并且维护peer的内容也要更新Index。
可能会有人问,难道不需要广播吗?给所有的人说声,我当选了!
这个就是在心跳包里面实现的了!
我们下篇继续。