我们上一篇文章分析了一个节点作为主动发起方,向别的节点请求投票的信息,本篇就分析下接收方的工作流程。

在前面的几篇文章里,我们定义了几个通信的接口,只是没有定义具体的实现,我们现在就先实现以下。

server

首先,server我们用的是GRPC,所以很多底层的功能都是已经封装好的,我们只需要外部调用即可。

1
2
3
4
5
6
type grpcServerImpl struct {
	address string
	server *grpc.Server
	lock *sync.Mutex
	node *Node
}

这个结构不复杂,加Node的目标是为了和node进行关联。我们稍后就明白为什么要这么做。 继续看构造函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func NewGRPCServer(address string) (GRPCServer, error) {

	if address == "" {
		return nil, errors.New("Missing address parameter")
	}
	//create our listener
	lis, err := net.Listen("tcp", address)

	if err != nil {
		return nil, err
	}
	log.Println("listen to address "+address)
	return NewGRPCServerFromListener(lis)

}

func NewGRPCServerFromListener(listener net.Listener) (GRPCServer, error) {
	grpcServer := &grpcServerImpl{
		address:  listener.Addr().String(),
		listener: listener,
		lock:     &sync.Mutex{},
	}

	var serverOpts []grpc.ServerOption

	serverOpts = append(serverOpts, grpc.MaxSendMsgSize(1024*1024))
	serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(1024*1024))

	grpcServer.server = grpc.NewServer(serverOpts...)
	pb.RegisterMgrServer(grpcServer.server,grpcServer)

	return grpcServer, nil
}

这段内容是我直接从fabric里面复制出来的,可能看着内容比较多,主要目标就是监听一个指定地址,然后配置grpc连接参数,开启server,与我们proto文件生成的服务进行绑定。 在我们启动这个server的时候,需要把Node传递进来

1
2
3
4
5
func (gServer *grpcServerImpl) Start(n *Node) error {
	gServer.node = n
	go gServer.server.Serve(gServer.listener)
	return nil
}

为什么会是协程呢?因为如果不这样的话,程序会阻塞在serve这,我们之前的循环内容无法进行操作。 结束功能也很简单

1
2
3
func (gServer *grpcServerImpl) Stop() {
	gServer.server.Stop()
}

问题不大! 接下来要做的就是想用客户端的RPC请求了!我们需要实现这请求处理接口!

1
2
3
4
func (gServer *grpcServerImpl) RequestVote(ctx context.Context, vr *pb.VoteRequest) (*pb.VoteRespond, error) {
	vresp, err := gServer.node.RequestVote(*vr)
	return &vresp, err
}

我们调用了node的RequestVote()函数进行处理,进行具体定义吧!

节点

在节点这端,我们知道大部分的消息处理都是在loop函数里面进行操作的,而消息已经获取了,我们就需要怎么把消息通过channel传递过去,这个就是我们RequestVote函数需要做的内容

1
2
3
4
func (n *Node) RequestVote(vr pb.VoteRequest) (pb.VoteRespond, error) {
	n.RequestVoteChan <- vr
	return <-n.RequestVoteResponseChan, nil
}

在for的select循环里面,就需要有n.RequestVoteChan的处理了

1
2
3
case vreq := <-n.RequestVoteChan:
		vresp, _ := n.doRequestVote(vreq)
		n.RequestVoteResponseChan <- vresp

OK!来具体的解析其他用户发过来的投票请求吧!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (n *Node) doRequestVote(vr pb.VoteRequest) (pb.VoteRespond, error) {
	if vr.Term < n.Term {
		return pb.VoteRespond{Term: n.Term, VoteGranted: false}, nil
	}
	if vr.Term > n.Term {
		n.endElection()
		n.setTerm(vr.Term)
	}
	if n.VoteFor != "" && n.VoteFor != vr.CandidateID {
		return pb.VoteRespond{Term: n.Term, VoteGranted: false}, nil
	}
	if n.Log.FresherThan(vr.LastLogIndex, vr.LastLogTerm) {
		return pb.VoteRespond{Term: n.Term, VoteGranted: false}, nil
	}
	n.VoteFor = vr.CandidateID
	return pb.VoteRespond{Term: n.Term, VoteGranted: true}, nil
}

这个处理逻辑在论文里面已经描述的很请处理,有这么几点情况需要返回false 1. vote的任期小于自身存储的任期(其他人的太旧了,或者自己这边已经有更新的leader了) 2. 如果自己已经给别人投票了,那么久不能在给这个人投票了 3. 如果自己记录的日志内容比vote人的日志内容新,也不能给他投同意了!

只有以上三点都不满足,才记录下来,我给你投票,同意你的选举! 当然啦,如果vote人的任期比我的新,肯定是我比较落伍,就得更新下自己的任期,然后停止选举。 如此一来!投票选举流程就结束了! 可能有的朋友意识到了一个问题! n.Log.FresherThan(vr.LastLogIndex, vr.LastLogTerm)是在哪实现的? 我们先留个问号!下一篇再解决!