我们之前看了Tendermint如何和别人连接,如何接受别人的连接,可是,Tendermint得怎么找到别人啊! 不能手动一个一个的输入连接地址啊! 这个就是pexreactor的功能了!

我们首先看下PEXReactor怎么定义的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type PEXReactor struct {
	p2p.BaseReactor

	book              AddrBook
	config            *PEXReactorConfig
	ensurePeersPeriod time.Duration // TODO: should go in the config

	// maps to prevent abuse
	requestsSent         *cmn.CMap // ID->struct{}: unanswered send requests
	lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us

	seedAddrs []*p2p.NetAddress

	attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)}
}

很简单,继承了基础的reactor,然后有一些地址表,我们具体的去看一看。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
	r := &PEXReactor{
		book:                 b,
		config:               config,
		ensurePeersPeriod:    defaultEnsurePeersPeriod,
		requestsSent:         cmn.NewCMap(),
		lastReceivedRequests: cmn.NewCMap(),
	}
	r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
	return r
}

还好,就是把传递的变量赋值进去,然后创建了一个reactor,就像我们之前看到的service一样。 是怎么启动起来的呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (r *PEXReactor) OnStart() error {
	err := r.book.Start()
	if err != nil && err != cmn.ErrAlreadyStarted {
		return err
	}

	numOnline, seedAddrs, err := r.checkSeeds()
	if err != nil {
		return err
	} else if numOnline == 0 && r.book.Empty() {
		return errors.New("Address book is empty, and could not connect to any seed nodes")
	}

	r.seedAddrs = seedAddrs

	// Check if this node should run
	// in seed/crawler mode
	if r.config.SeedMode {
		go r.crawlPeersRoutine()
	} else {
		go r.ensurePeersRoutine()
	}
	return nil
}

首先是启动地址簿,我们答题说几句,就是会从我们的配置文件中读取地址信息,然后保存下来。具体的我们以后再分析。 接下来是检查这些种子节点是否有错误。主要检查的什么呢? 检查的就是种子节点的内容格式是否正确,是否能够联通,然后返回可用的种子 最后根据配置,看自己是否是个种子节点,如果是的话,就去抓取其他的节点,如果不是,就是保证节点连接就好了。 我们来看看作为种子节点是怎么工作的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (r *PEXReactor) crawlPeersRoutine() {
	// Do an initial crawl
	r.crawlPeers()

	// Fire periodically
	ticker := time.NewTicker(defaultCrawlPeersPeriod)

	for {
		select {
		case <-ticker.C:
			r.attemptDisconnects()
			r.crawlPeers()
		case <-r.Quit():
			return
		}
	}
}

很明显,先抓取一次其他的节点,然后定时轮询取获取其他的节点。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (r *PEXReactor) crawlPeers() {
	peerInfos := r.getPeersToCrawl()

	now := time.Now()
	// Use addresses we know of to reach additional peers
	for _, pi := range peerInfos {
		// Do not attempt to connect with peers we recently dialed
		if now.Sub(pi.LastAttempt) < defaultCrawlPeerInterval {
			continue
		}
		// Otherwise, attempt to connect with the known address
		err := r.Switch.DialPeerWithAddress(pi.Addr, false)
		if err != nil {
			r.book.MarkAttempt(pi.Addr)
			continue
		}
		// Ask for more addresses
		peer := r.Switch.Peers().Get(pi.Addr.ID)
		if peer != nil {
			r.RequestAddrs(peer)
		}
	}
}

整体流程就是取获取新的节点去连接。怎么获取的新的节点呢?就是向已连接过的节点去请求新的节点,类似于gossip协议,一层一层的扩散出去找新的节点。 我们再看下普通节点时候如何工作的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (r *PEXReactor) ensurePeersRoutine() {
	var (
		seed   = cmn.NewRand()
		jitter = seed.Int63n(r.ensurePeersPeriod.Nanoseconds())
	)

	// Randomize first round of communication to avoid thundering herd.
	// If no potential peers are present directly start connecting so we guarantee
	// swift setup with the help of configured seeds.
	if r.hasPotentialPeers() {
		time.Sleep(time.Duration(jitter))
	}

	// fire once immediately.
	// ensures we dial the seeds right away if the book is empty
	r.ensurePeers()

	// fire periodically
	ticker := time.NewTicker(r.ensurePeersPeriod)
	for {
		select {
		case <-ticker.C:
			r.ensurePeers()
		case <-r.Quit():
			ticker.Stop()
			return
		}
	}
}

和作为种子节点差不多,一定时间情况下就去维持连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func (r *PEXReactor) ensurePeers() {
	var (
		out, in, dial = r.Switch.NumPeers()
		numToDial     = r.Switch.MaxNumOutboundPeers() - (out + dial)
	)
	r.Logger.Info(
		"Ensure peers",
		"numOutPeers", out,
		"numInPeers", in,
		"numDialing", dial,
		"numToDial", numToDial,
	)

	if numToDial <= 0 {
		return
	}

	// bias to prefer more vetted peers when we have fewer connections.
	// not perfect, but somewhate ensures that we prioritize connecting to more-vetted
	// NOTE: range here is [10, 90]. Too high ?
	newBias := cmn.MinInt(out, 8)*10 + 10

	toDial := make(map[p2p.ID]*p2p.NetAddress)
	// Try maxAttempts times to pick numToDial addresses to dial
	maxAttempts := numToDial * 3

	for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
		try := r.book.PickAddress(newBias)
		if try == nil {
			continue
		}
		if _, selected := toDial[try.ID]; selected {
			continue
		}
		if r.Switch.IsDialingOrExistingAddress(try) {
			continue
		}
		// TODO: consider moving some checks from toDial into here
		// so we don't even consider dialing peers that we want to wait
		// before dialling again, or have dialed too many times already
		r.Logger.Info("Will dial address", "addr", try)
		toDial[try.ID] = try
	}

	// Dial picked addresses
	for _, addr := range toDial {
		go r.dialPeer(addr)
	}

	// If we need more addresses, pick a random peer and ask for more.
	if r.book.NeedMoreAddrs() {
		peers := r.Switch.Peers().List()
		peersCount := len(peers)
		if peersCount > 0 {
			peer := peers[cmn.RandInt()%peersCount] // nolint: gas
			r.Logger.Info("We need more addresses. Sending pexRequest to random peer", "peer", peer)
			r.RequestAddrs(peer)
		}
	}

	// If we are not connected to nor dialing anybody, fallback to dialing a seed.
	if out+in+dial+len(toDial) == 0 {
		r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds")
		r.dialSeeds()
	}
}

整体过程就是挑选出需要连接的peer节点,然后进行连接,如果本peer存的地址太少的话,就请求更多的节点来。 现在我们能够发现,节点可以连接到别人上面,别的节点应该能够连接到本节点,怎么处理呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
	msg, err := decodeMsg(msgBytes)
	if err != nil {
		r.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
		r.Switch.StopPeerForError(src, err)
		return
	}
	r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)

	switch msg := msg.(type) {
	case *pexRequestMessage:

		// NOTE: this is a prime candidate for amplification attacks,
		// so it's important we
		// 1) restrict how frequently peers can request
		// 2) limit the output size

		// If we're a seed and this is an inbound peer,
		// respond once and disconnect.
		if r.config.SeedMode && !src.IsOutbound() {
			id := string(src.ID())
			v := r.lastReceivedRequests.Get(id)
			if v != nil {
				// FlushStop/StopPeer are already
				// running in a go-routine.
				return
			}
			r.lastReceivedRequests.Set(id, time.Now())

			// Send addrs and disconnect
			r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
			go func() {
				// In a go-routine so it doesn't block .Receive.
				src.FlushStop()
				r.Switch.StopPeerGracefully(src)
			}()

		} else {
			// Check we're not receiving requests too frequently.
			if err := r.receiveRequest(src); err != nil {
				r.Switch.StopPeerForError(src, err)
				return
			}
			r.SendAddrs(src, r.book.GetSelection())
		}

	case *pexAddrsMessage:
		// If we asked for addresses, add them to the book
		if err := r.ReceiveAddrs(msg.Addrs, src); err != nil {
			r.Switch.StopPeerForError(src, err)
			return
		}
	default:
		r.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
	}
}

就是对节点收到的pex信息进行处理,包括像别人请求时别的peer相应的信息以及本peer响应别的节点的信息。

如果我们整体来看PEXReactor的话,就是实现了连接的管理和维护,可以实现向别的节点请求连接,更新本地的连接,然后也可以响应别的节点的请求,本地做出回应。