我们之前看了peer的基本操作,现在看一下更上一层的工作内容。
通过看switch的注释,不难理解,他就是个分发功能,把不同的消息分发给不同的reactor。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
type Switch struct {
cmn.BaseService
config *config.P2PConfig
reactors map[string]Reactor
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *cmn.CMap
reconnecting *cmn.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
addrBook AddrBook
transport Transport
filterTimeout time.Duration
peerFilters []PeerFilterFunc
rng *cmn.Rand // seed for randomizing dial times and orders
metrics *Metrics
}
|
里面游客一个channel的集合,和reactor的对应map,石锤了!我们之前看到,根据channelId找到对应的reacotr处理onReceive,在这发现了关系。
这个transporter会有用,我们稍后介绍。
newSwitch没有什么内容,我们就不贴代码了。
看一看怎么启动的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (sw *Switch) OnStart() error {
// Start reactors
for _, reactor := range sw.reactors {
err := reactor.Start()
if err != nil {
return cmn.ErrorWrap(err, "failed to start %v", reactor)
}
}
// Start accepting Peers.
go sw.acceptRoutine()
return nil
}
|
首先是启动所有的reactor,然后开始了一个接受peer的协程。
能够启动reactor,前提是添加进来了,我们看下是怎么添加的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
reactorChannels := reactor.GetChannels()
for _, chDesc := range reactorChannels {
chID := chDesc.ID
if sw.reactorsByCh[chID] != nil {
cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
}
sw.chDescs = append(sw.chDescs, chDesc)
sw.reactorsByCh[chID] = reactor
}
sw.reactors[name] = reactor
reactor.SetSwitch(sw)
return reactor
}
|
主要的目标就是往map里面填数据,不过有个地方
添加了对switch的引用,方便了reactor和switch的操作。
接下来是接受peer的协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
func (sw *Switch) acceptRoutine() {
for {
p, err := sw.transport.Accept(peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
})
if err != nil {
switch err.(type) {
case ErrRejected:
rErr := err.(ErrRejected)
if rErr.IsSelf() {
// Remove the given address from the address book and add to our addresses
// to avoid dialing in the future.
addr := rErr.Addr()
sw.addrBook.RemoveAddress(&addr)
sw.addrBook.AddOurAddress(&addr)
}
sw.Logger.Info(
"Inbound Peer rejected",
"err", err,
"numPeers", sw.peers.Size(),
)
continue
case *ErrTransportClosed:
sw.Logger.Error(
"Stopped accept routine, as transport is closed",
"numPeers", sw.peers.Size(),
)
default:
sw.Logger.Error(
"Accept on transport errored",
"err", err,
"numPeers", sw.peers.Size(),
)
panic(fmt.Errorf("accept routine exited: %v", err))
}
break
}
// Ignore connection if we already have enough peers.
_, in, _ := sw.NumPeers()
if in >= sw.config.MaxNumInboundPeers {
sw.Logger.Info(
"Ignoring inbound connection: already have enough inbound peers",
"address", p.NodeInfo().NetAddress().String(),
"have", in,
"max", sw.config.MaxNumInboundPeers,
)
_ = p.Stop()
continue
}
if err := sw.addPeer(p); err != nil {
_ = p.Stop()
sw.Logger.Info(
"Ignoring inbound connection: error while adding peer",
"err", err,
"id", p.ID(),
)
}
}
}
|
可以看到,首先接受一个peer的连接,具体如何接受的我们稍后分析。首先就是如果连接失败的一系列处理。如果连接成功了,但是peer已经连接足够多的节点了,就不用再连了。否则,添加进去。
我们接着看如何添加进去一个新的peer的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
func (sw *Switch) addPeer(p Peer) error {
if err := sw.filterPeer(p); err != nil {
return err
}
p.SetLogger(sw.Logger.With("peer", p.NodeInfo().NetAddress()))
// All good. Start peer
if sw.IsRunning() {
if err := sw.startInitPeer(p); err != nil {
return err
}
}
if err := sw.peers.Add(p); err != nil {
return err
}
sw.Logger.Info("Added peer", "peer", p)
sw.metrics.Peers.Add(float64(1))
return nil
}
|
过滤peer的操作是看看peer是否符合要求,我们不进去细看了。如果一切正常的话,就添加到peer列表中,并且初始化peer并运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (sw *Switch) startInitPeer(p Peer) error {
err := p.Start() // spawn send/recv routines
if err != nil {
// Should never happen
sw.Logger.Error(
"Error starting peer",
"err", err,
"peer", p,
)
return err
}
for _, reactor := range sw.reactors {
reactor.AddPeer(p)
}
return nil
}
|
首先启动peer没什么可说的,只是,这个地方还为reactors添加了需要服务的peer,方便reactor使用。
所以呢,一个服务端,接受别人的连接,然后检查完后创建一个peer,为别人提供服务。
现在只监听别人的连接,节点应该也有自己的连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
func (sw *Switch) addOutboundPeerWithConfig(
addr *NetAddress,
cfg *config.P2PConfig,
persistent bool,
) error {
sw.Logger.Info("Dialing peer", "address", addr)
// XXX(xla): Remove the leakage of test concerns in implementation.
if cfg.TestDialFail {
go sw.reconnectToPeer(addr)
return fmt.Errorf("dial err (peerConfig.DialFail == true)")
}
p, err := sw.transport.Dial(*addr, peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
persistent: persistent,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
})
if err != nil {
switch e := err.(type) {
case ErrRejected:
if e.IsSelf() {
// Remove the given address from the address book and add to our addresses
// to avoid dialing in the future.
sw.addrBook.RemoveAddress(addr)
sw.addrBook.AddOurAddress(addr)
return err
}
}
// retry persistent peers after
// any dial error besides IsSelf()
if persistent {
go sw.reconnectToPeer(addr)
}
return err
}
if err := sw.addPeer(p); err != nil {
_ = p.Stop()
return err
}
return nil
}
|
差不多,前面是transporter的accept,接受别人的连接,现在换成dial,主动连接别人。
现在我们要看下transporter是怎么工作的了。
transporter是一个接口
1
2
3
4
5
6
7
|
type Transport interface {
// Accept returns a newly connected Peer.
Accept(peerConfig) (Peer, error)
// Dial connects to the Peer for the address.
Dial(NetAddress, peerConfig) (Peer, error)
}
|
我们之前在看peer的时候知道,他们是多工工作机制,很简单的在transporter包里面看到了类型定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
type MultiplexTransport struct {
listener net.Listener
acceptc chan accept
closec chan struct{}
// Lookup table for duplicate ip and id checks.
conns ConnSet
connFilters []ConnFilterFunc
dialTimeout time.Duration
filterTimeout time.Duration
handshakeTimeout time.Duration
nodeInfo NodeInfo
nodeKey NodeKey
resolver IPResolver
mConfig conn.MConnConfig
}
|
新建函数没有什么难以理解的骂我们就看一看对accept
以及dial
的实现吧
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) {
select {
// This case should never have any side-effectful/blocking operations to
// ensure that quality peers are ready to be used.
case a := <-mt.acceptc:
if a.err != nil {
return nil, a.err
}
cfg.outbound = false
return mt.wrapPeer(a.conn, a.nodeInfo, cfg, nil), nil
case <-mt.closec:
return nil, &ErrTransportClosed{}
}
}
|
可以看到,一个连接过来的peer被创建了出来,并且返还回去,添加到reactor里面。
怎么主动和别的节点连接的呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func (mt *MultiplexTransport) Dial(
addr NetAddress,
cfg peerConfig,
) (Peer, error) {
c, err := addr.DialTimeout(mt.dialTimeout)
if err != nil {
return nil, err
}
// TODO(xla): Evaluate if we should apply filters if we explicitly dial.
if err := mt.filterConn(c); err != nil {
return nil, err
}
secretConn, nodeInfo, err := mt.upgrade(c)
if err != nil {
return nil, err
}
cfg.outbound = true
p := mt.wrapPeer(secretConn, nodeInfo, cfg, &addr)
return p, nil
}
|
几乎一样的操作,不过在生成peer之前,要先进行下连接的过滤,看看是否已经创建连接,连接是否有问题,然后更新下连接之后,就可以生成新的创建了。