我们之前看了peer的基本操作,现在看一下更上一层的工作内容。

通过看switch的注释,不难理解,他就是个分发功能,把不同的消息分发给不同的reactor。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type Switch struct {
	cmn.BaseService

	config       *config.P2PConfig
	reactors     map[string]Reactor
	chDescs      []*conn.ChannelDescriptor
	reactorsByCh map[byte]Reactor
	peers        *PeerSet
	dialing      *cmn.CMap
	reconnecting *cmn.CMap
	nodeInfo     NodeInfo // our node info
	nodeKey      *NodeKey // our node privkey
	addrBook     AddrBook

	transport Transport

	filterTimeout time.Duration
	peerFilters   []PeerFilterFunc

	rng *cmn.Rand // seed for randomizing dial times and orders

	metrics *Metrics
}

里面游客一个channel的集合,和reactor的对应map,石锤了!我们之前看到,根据channelId找到对应的reacotr处理onReceive,在这发现了关系。 这个transporter会有用,我们稍后介绍。 newSwitch没有什么内容,我们就不贴代码了。 看一看怎么启动的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (sw *Switch) OnStart() error {
	// Start reactors
	for _, reactor := range sw.reactors {
		err := reactor.Start()
		if err != nil {
			return cmn.ErrorWrap(err, "failed to start %v", reactor)
		}
	}

	// Start accepting Peers.
	go sw.acceptRoutine()

	return nil
}

首先是启动所有的reactor,然后开始了一个接受peer的协程。 能够启动reactor,前提是添加进来了,我们看下是怎么添加的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
	reactorChannels := reactor.GetChannels()
	for _, chDesc := range reactorChannels {
		chID := chDesc.ID
		if sw.reactorsByCh[chID] != nil {
			cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
		}
		sw.chDescs = append(sw.chDescs, chDesc)
		sw.reactorsByCh[chID] = reactor
	}
	sw.reactors[name] = reactor
	reactor.SetSwitch(sw)
	return reactor
}

主要的目标就是往map里面填数据,不过有个地方

1
reactor.SetSwitch(sw)

添加了对switch的引用,方便了reactor和switch的操作。 接下来是接受peer的协程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (sw *Switch) acceptRoutine() {
	for {
		p, err := sw.transport.Accept(peerConfig{
			chDescs:      sw.chDescs,
			onPeerError:  sw.StopPeerForError,
			reactorsByCh: sw.reactorsByCh,
			metrics:      sw.metrics,
		})
		if err != nil {
			switch err.(type) {
			case ErrRejected:
				rErr := err.(ErrRejected)

				if rErr.IsSelf() {
					// Remove the given address from the address book and add to our addresses
					// to avoid dialing in the future.
					addr := rErr.Addr()
					sw.addrBook.RemoveAddress(&addr)
					sw.addrBook.AddOurAddress(&addr)
				}

				sw.Logger.Info(
					"Inbound Peer rejected",
					"err", err,
					"numPeers", sw.peers.Size(),
				)

				continue
			case *ErrTransportClosed:
				sw.Logger.Error(
					"Stopped accept routine, as transport is closed",
					"numPeers", sw.peers.Size(),
				)
			default:
				sw.Logger.Error(
					"Accept on transport errored",
					"err", err,
					"numPeers", sw.peers.Size(),
				)
				panic(fmt.Errorf("accept routine exited: %v", err))
			}

			break
		}

		// Ignore connection if we already have enough peers.
		_, in, _ := sw.NumPeers()
		if in >= sw.config.MaxNumInboundPeers {
			sw.Logger.Info(
				"Ignoring inbound connection: already have enough inbound peers",
				"address", p.NodeInfo().NetAddress().String(),
				"have", in,
				"max", sw.config.MaxNumInboundPeers,
			)

			_ = p.Stop()

			continue
		}

		if err := sw.addPeer(p); err != nil {
			_ = p.Stop()
			sw.Logger.Info(
				"Ignoring inbound connection: error while adding peer",
				"err", err,
				"id", p.ID(),
			)
		}
	}
}

可以看到,首先接受一个peer的连接,具体如何接受的我们稍后分析。首先就是如果连接失败的一系列处理。如果连接成功了,但是peer已经连接足够多的节点了,就不用再连了。否则,添加进去。 我们接着看如何添加进去一个新的peer的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (sw *Switch) addPeer(p Peer) error {
	if err := sw.filterPeer(p); err != nil {
		return err
	}

	p.SetLogger(sw.Logger.With("peer", p.NodeInfo().NetAddress()))

	// All good. Start peer
	if sw.IsRunning() {
		if err := sw.startInitPeer(p); err != nil {
			return err
		}
	}

	if err := sw.peers.Add(p); err != nil {
		return err
	}

	sw.Logger.Info("Added peer", "peer", p)
	sw.metrics.Peers.Add(float64(1))

	return nil
}

过滤peer的操作是看看peer是否符合要求,我们不进去细看了。如果一切正常的话,就添加到peer列表中,并且初始化peer并运行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (sw *Switch) startInitPeer(p Peer) error {
	err := p.Start() // spawn send/recv routines
	if err != nil {
		// Should never happen
		sw.Logger.Error(
			"Error starting peer",
			"err", err,
			"peer", p,
		)
		return err
	}

	for _, reactor := range sw.reactors {
		reactor.AddPeer(p)
	}

	return nil
}

首先启动peer没什么可说的,只是,这个地方还为reactors添加了需要服务的peer,方便reactor使用。 所以呢,一个服务端,接受别人的连接,然后检查完后创建一个peer,为别人提供服务。 现在只监听别人的连接,节点应该也有自己的连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (sw *Switch) addOutboundPeerWithConfig(
	addr *NetAddress,
	cfg *config.P2PConfig,
	persistent bool,
) error {
	sw.Logger.Info("Dialing peer", "address", addr)

	// XXX(xla): Remove the leakage of test concerns in implementation.
	if cfg.TestDialFail {
		go sw.reconnectToPeer(addr)
		return fmt.Errorf("dial err (peerConfig.DialFail == true)")
	}

	p, err := sw.transport.Dial(*addr, peerConfig{
		chDescs:      sw.chDescs,
		onPeerError:  sw.StopPeerForError,
		persistent:   persistent,
		reactorsByCh: sw.reactorsByCh,
		metrics:      sw.metrics,
	})
	if err != nil {
		switch e := err.(type) {
		case ErrRejected:
			if e.IsSelf() {
				// Remove the given address from the address book and add to our addresses
				// to avoid dialing in the future.
				sw.addrBook.RemoveAddress(addr)
				sw.addrBook.AddOurAddress(addr)

				return err
			}
		}

		// retry persistent peers after
		// any dial error besides IsSelf()
		if persistent {
			go sw.reconnectToPeer(addr)
		}

		return err
	}

	if err := sw.addPeer(p); err != nil {
		_ = p.Stop()
		return err
	}

	return nil
}

差不多,前面是transporter的accept,接受别人的连接,现在换成dial,主动连接别人。 现在我们要看下transporter是怎么工作的了。 transporter是一个接口

1
2
3
4
5
6
7
type Transport interface {
	// Accept returns a newly connected Peer.
	Accept(peerConfig) (Peer, error)

	// Dial connects to the Peer for the address.
	Dial(NetAddress, peerConfig) (Peer, error)
}

我们之前在看peer的时候知道,他们是多工工作机制,很简单的在transporter包里面看到了类型定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type MultiplexTransport struct {
	listener net.Listener

	acceptc chan accept
	closec  chan struct{}

	// Lookup table for duplicate ip and id checks.
	conns       ConnSet
	connFilters []ConnFilterFunc

	dialTimeout      time.Duration
	filterTimeout    time.Duration
	handshakeTimeout time.Duration
	nodeInfo         NodeInfo
	nodeKey          NodeKey
	resolver         IPResolver

	mConfig conn.MConnConfig
}

新建函数没有什么难以理解的骂我们就看一看对accept以及dial的实现吧

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) {
	select {
	// This case should never have any side-effectful/blocking operations to
	// ensure that quality peers are ready to be used.
	case a := <-mt.acceptc:
		if a.err != nil {
			return nil, a.err
		}

		cfg.outbound = false

		return mt.wrapPeer(a.conn, a.nodeInfo, cfg, nil), nil
	case <-mt.closec:
		return nil, &ErrTransportClosed{}
	}
}

可以看到,一个连接过来的peer被创建了出来,并且返还回去,添加到reactor里面。 怎么主动和别的节点连接的呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (mt *MultiplexTransport) Dial(
	addr NetAddress,
	cfg peerConfig,
) (Peer, error) {
	c, err := addr.DialTimeout(mt.dialTimeout)
	if err != nil {
		return nil, err
	}

	// TODO(xla): Evaluate if we should apply filters if we explicitly dial.
	if err := mt.filterConn(c); err != nil {
		return nil, err
	}

	secretConn, nodeInfo, err := mt.upgrade(c)
	if err != nil {
		return nil, err
	}

	cfg.outbound = true

	p := mt.wrapPeer(secretConn, nodeInfo, cfg, &addr)

	return p, nil
}

几乎一样的操作,不过在生成peer之前,要先进行下连接的过滤,看看是否已经创建连接,连接是否有问题,然后更新下连接之后,就可以生成新的创建了。