经过上一篇文章关于一些基础“接口”的定义,我们本篇开始操作核心,节点。

一个节点,有三个状态,Follower,Candidate和Leader。他们肯定具有一些共性,比如他们都具有自己的ID,自己的交互通信工具,执行状态机,超时时间,任期,投票人等。 这些信息我们肯定是无法以下就能够确定全的,我们不妨先知道有这些属性,用到了就做详细解释,没有用到就先忽略。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Node struct {
	sync.RWMutex

	ID              string
	Server          GRPCServer
	Client          GRPCClient
	Status          int
	Log             Logger
	StateMachine    Applyer
	ElectionTimeout time.Duration
	Uncommitted     map[int64]*CommandRequest

	Term         int64
	ElectionTerm int64
	VoteFor      string
	Ballots      int
	Cluster      []*Peer

	endElectionChan      chan int
	finishedElectionChan chan int

	VoteResponseChan          chan pb.VoteRespond
	RequestVoteChan           chan pb.VoteRequest
	RequestVoteResponseChan   chan pb.VoteRespond
	AppendEntriesChan         chan pb.EntryRequest
	AppendEntriesResponseChan chan pb.EntryResponse
	CommandChan               chan CommandRequest
	ExitChan                  chan int
}

为了方便调用,我们可以创建一个new函数,直接把需要传递的参数传递进来,然后需要初始化的进行初始化即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func NewNode(id string, client GRPCClient, server GRPCServer, logger Logger, applyer Applyer) *Node {
	node := &Node{
		ID:              id,
		Server:          server,
		Client:          client,
		Log:             logger,
		Status:          FOLLOWER,
		StateMachine:    applyer,
		ElectionTimeout: 500 * time.Millisecond,
		Uncommitted:     make(map[int64]*CommandRequest),

		VoteResponseChan:          make(chan pb.VoteRespond),
		RequestVoteChan:           make(chan pb.VoteRequest),
		RequestVoteResponseChan:   make(chan pb.VoteRespond),
		AppendEntriesChan:         make(chan pb.EntryRequest),
		AppendEntriesResponseChan: make(chan pb.EntryResponse),
		CommandChan:               make(chan CommandRequest),
		ExitChan:                  make(chan int),
	}
	return node
}

我们的功能按置顶向下的方式来分类。这样容易理解,也容易完善流程。

顶层功能

  1. 启动功能

    1
    2
    3
    4
    5
    6
    
    func (n *Node) Serve() error {
    	err := n.Server.Start(n)
    	log.Println("serve start ")
    	go n.loop()
    	return err
    }

我们需要第一步把GRPCServer启动起来,然后启动一个loop函数,这个loop函数是什么作用呢? 这个node节点需要处理超时选举,处理请求信息,这个内容需要一个循环函数一致进行监听处理。 2. 结束功能

1
2
3
func (n *Node) Stop() {
	n.Server.Stop()
}

我们直接结束服务器即可。 3. 添加服务器

1
2
3
4
5
6
func (n *Node) AddToCluster(member string) {
	p := &Peer{
		ID: member,
	}
	n.Cluster = append(n.Cluster, p)
}

很明显,进来一个服务器,我们要添加到定义的Node里面的cluster变量里面。

底层功能

底层提供的功能非常多,我们需要一点一点解释。 为了方便理解,我们不妨按照启动流程进行解释。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
                                                                        +-------+
                                   |       |
                                   |       |
                                   |       |
                                   |       |
                               |       v                 Follower +------------------------->Candidate+--------------------------->Leader
 ^ ^                                    +                                    +
 | |                                    |                                    |
 | |                                    |                                    |
 | |                                    |                                    |
 | |                                |                                    |
 | +------------------------------------+                            |
 |                                                                           |
 +---------------------------------------------------------------------------+

这里多说一句,这个工具是asciiflow很方便 第一步就是选举超时,我们需要Follower将自己提升为Cnadidate 在循环里面应该是这样

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (n *Node) loop() {
	electionTimer := time.NewTimer(n.randomElectionTimeout())
	for {
		select {
		case <-electionTimer.C:
			n.electionTimeout()
		case <-n.ExitChan:
			n.stepDown()
			goto exit
		}
		randElectionTimeout := n.randomElectionTimeout()
		if !electionTimer.Reset(randElectionTimeout) {
			electionTimer = time.NewTimer(randElectionTimeout)
		}
	}
exit:
	log.Printf("[%s] exiting ioLoop()", n.ID)
}

这个n.randomElectionTimeout()含义看单词意思就可以明白,就是生成随机的超时时间。在论文里面不同的节点具有不同的超时时间,这样才可以选举出leader。 那么就定义下这个函数吧

1
2
3
func (n *Node) randomElectionTimeout() time.Duration {
	return n.ElectionTimeout + time.Duration(rand.Int63n(int64(n.ElectionTimeout)))
}

就是在基础超时时间的基础上添加一个随机数,生成不同的时间。 那么n.electionTimeout()是怎么工作的呢?

1
2
3
4
5
6
7
8
9
func (n *Node) electionTimeout() {
	if n.Status == LEADER {
		return
	}
	n.Status = CANDIDATE
	n.endElection()
	n.nextTerm()
	n.runForLeader()
}

毋庸置疑,如果节点已经是Leader了,就不需要再进行选举了。否则的话,把自己提升为Candidate,结束当前的选举,进入下一个任期,并且竞争Leader。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (n *Node) endElection() {
	if n.Status != CANDIDATE || n.endElectionChan == nil {
		return
	}
	close(n.endElectionChan)
	<-n.finishedElectionChan

	n.endElectionChan = nil
	n.finishedElectionChan = nil
}

结束选举也不难理解,如果当前状态不是候选人,那就没必要进行了,否则关闭channel

1
2
3
4
5
func (n *Node) nextTerm() {
	n.Status = FOLLOWER
	n.Term++
	n.Ballots = 0
}

为什么只会设置状态为Follower呢?因为,进入下一个任期的出发条件不一定是选举过程,也有可能是已经选举出来了。当然了,在此处我们要把自己的投票初始化为0. 最后是要竞争选Leader了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (n *Node) runForLeader() {
	n.Status = CANDIDATE
	n.Ballots++
	n.ElectionTerm = n.Term
	n.endElectionChan = make(chan int)
	n.finishedElectionChan = make(chan int)
	vr := &pb.VoteRequest{
		Term:         n.Term,
		CandidateID:  n.ID,
		LastLogIndex: n.Log.Index(),
		LastLogTerm:  n.Log.Term()}
	go n.gatherVotes(vr)
}

提升自己的状态,并且构造请求,发送给其他节点,收集投票结果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (n *Node) gatherVotes(request *pb.VoteRequest) {
	for _, peer := range n.Cluster {
		go func(p string) {
			vresp, err := n.Client.RequestVote(p, request)
			log.Println(vresp)
			if err != nil {
				log.Printf("[%s] error in RequestVoteRPC() to %s - %s", n.ID, p, err)
				return
			}
			n.VoteResponseChan <- *vresp
		}(peer.ID)
	}

	<-n.endElectionChan
	close(n.finishedElectionChan)
}

怎么发送的呢?就是按集群顺序全部发送,等到结果,把结果放到n.VoteResponseChan里面 这里就体现出来n.endElectionChan的意义了。如果不使用的话,这个go的协程就停止了。 然后,把收到的结果放到n.VoteResponseChan里面,我们进行投票处理就好了! 在我们前面的循环里面要有个对VoteResponseChan的处理

1
2
case vresp := <-n.VoteResponseChan:
		n.doRespondVote(vresp)

具体的处理流程应当如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (n *Node) doRespondVote(vr pb.VoteRespond) {
	if n.Status != CANDIDATE {
		return
	}
	if vr.Term != n.ElectionTerm {
		if vr.Term > n.ElectionTerm {
			// we discovered a higher term
			n.endElection()
			n.setTerm(vr.Term)
		}
		return
	}

	if vr.VoteGranted {
		n.voteGranted()
	}

}

如果当前的状态不是Candidate的话,肯定不需要处理的(实际上,不是发出去让别人投票请求,谁会主动给他发相应呢?) 除此之外,如果回复的任期不一致,那一定不用进行选举了(有别人发了或者有人同意了其他人的Leader请求)然后,需要将任期更新到最新的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (n *Node) setTerm(term int64) {
	// check freshness
	if term <= n.Term {
		return
	}

	n.Term = term
	n.Status = FOLLOWER
	n.VoteFor = ""
	n.Ballots = 0
}

很容易理解,就是将任期更新,一切初始化。 我们继续上面的投票内容。经过层层筛选,回复的消息是我同意你当选,也就是vr.VoteGranted==True的时候,我就认为有人投票同意了,开始进行处理。

1
2
3
4
5
6
7
8
func (n *Node) voteGranted() {
	n.Ballots++
	majority := (len(n.Cluster)+1)/2 + 1
	if n.Ballots >= majority {
		n.endElection()
		n.promoteToLeader()
	}
}

只要票数大于大多数,就认为通过了,就把自己提权提到Leader就好了!

1
2
3
4
5
6
func (n *Node) promoteToLeader() {
	n.Status = LEADER
	for _, peer := range n.Cluster {
		peer.NextIndex = n.Log.Index()
	}
}

我们关注的核心是目前将状态提升至Leader,并且维护peer的内容也要更新Index。 可能会有人问,难道不需要广播吗?给所有的人说声,我当选了! 这个就是在心跳包里面实现的了! 我们下篇继续。