在开始本节之前,我们得先说两个非常重要的接口,service和reactor。

接口

首先我们来看service,tendermint中所有的服务都是实现了这个接口定义的函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Service interface {
	// 默认的启动函数
	Start() error
	OnStart() error

	// 停止运行函数
	Stop() error
	OnStop()

	// 重启服务
	Reset() error
	OnReset() error

	// 服务是否在运行
	IsRunning() bool

	// 返回一个通道,结束服务
	Quit() <-chan struct{}

	// 返回服务的标志
	String() string

	// 日志记录
	SetLogger(log.Logger)
}

可以很清楚的理解service定义的这几个接口,并不难,对service的基本的操作也就这几个函数。 我们再看一看reactor接口。 说到reactor,总会想到RxJava,或者spring中的reactor,也就是异步式编程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type Reactor interface {
	cmn.Service // 默认继承了service基本接口

	// 添加一个路由
	SetSwitch(*Switch)

	// 获取channel
	GetChannels() []*conn.ChannelDescriptor

	// 添加以恶搞peer
	AddPeer(peer Peer)

	// 移除一个peer
	RemovePeer(peer Peer, reason interface{})

	// 处理receive函数
	Receive(chID byte, peer Peer, msgBytes []byte)
}

这个reactor就是和整个P2P网络进行交互的组件模块。

peer

我们先看一下peer的定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type peer struct {
	cmn.BaseService

	peerConn
	mconn *tmconn.MConnection

	nodeInfo NodeInfo
	channels []byte

	Data *cmn.CMap

	metrics       *Metrics
	metricsTicker *time.Ticker
}

我们先关注peer的通信模块。我们根据内部成员变量,猜测主要和通信相关的是peerConn以及mconn,点进去一看,确实没错。

peerConn

这个定义比较简单,就是简单的原声通信,记录了通信的一些内容

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type peerConn struct {
	outbound   bool
	persistent bool
	conn       net.Conn // source connection

	originalAddr *NetAddress // nil for inbound connections

	// cached RemoteIP()
	ip net.IP
}

可以看着,peerConn维护了一个连接,和别人的通信地址。没有太难理解的内容。

mconn

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type MConnection struct {
	cmn.BaseService

	conn          net.Conn
	bufConnReader *bufio.Reader
	bufConnWriter *bufio.Writer
	sendMonitor   *flow.Monitor
	recvMonitor   *flow.Monitor
	send          chan struct{}
	pong          chan struct{}
	channels      []*Channel
	channelsIdx   map[byte]*Channel
	onReceive     receiveCbFunc
	onError       errorCbFunc
	errored       uint32
	config        MConnConfig

	// Closing quitSendRoutine will cause
	// doneSendRoutine to close.
	quitSendRoutine chan struct{}
	doneSendRoutine chan struct{}

	flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
	pingTimer  *cmn.RepeatTimer   // send pings periodically

	// close conn if pong is not received in pongTimeout
	pongTimer     *time.Timer
	pongTimeoutCh chan bool // true - timeout, false - peer sent pong

	chStatsTimer *cmn.RepeatTimer // update channel stats periodically

	created time.Time // time of creation

	_maxPacketMsgSize int
}

这个好复杂,我们根据自带的注释,可以理解为,这个实现了peer和其他节点的多工通信。对于每一个通信有一个通道和通道ID。 我们来看一看具体的实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig) *MConnection {
	if config.PongTimeout >= config.PingInterval {
		panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
	}

	mconn := &MConnection{
		conn:          conn,
		bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
		bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
		sendMonitor:   flow.New(0, 0),
		recvMonitor:   flow.New(0, 0),
		send:          make(chan struct{}, 1),
		pong:          make(chan struct{}, 1),
		onReceive:     onReceive,
		onError:       onError,
		config:        config,
		created:       time.Now(),
	}

	// Create channels
	var channelsIdx = map[byte]*Channel{}
	var channels = []*Channel{}

	for _, desc := range chDescs {
		channel := newChannel(mconn, *desc)
		channelsIdx[channel.desc.ID] = channel
		channels = append(channels, channel)
	}
	mconn.channels = channels
	mconn.channelsIdx = channelsIdx

	mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)

	// maxPacketMsgSize() is a bit heavy, so call just once
	mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()

	return mconn
}

我们可以看到几个回调函数,onReceive和onError,而且还对conn做了二次io封装,方便进行读写。并且在这创建了好几个通道,最后创建了一个MConnection的基础服务。 通道是怎么实现的呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type Channel struct {
	conn          *MConnection
	desc          ChannelDescriptor
	sendQueue     chan []byte
	sendQueueSize int32 // atomic.
	recving       []byte
	sending       []byte
	recentlySent  int64 // exponential moving average

	maxPacketMsgPayloadSize int

	Logger log.Logger
}

每一个通道都维持着一个MConnection,然后有个发送队列,看内容应该是会把发送队列的内容放到sending缓冲区发出去,然后recving是接受缓冲区。根据channel的成员函数,我们可以看到,会有goroutine调用来读写sending和recving缓冲区进行数据收发。 我们知道OnStart是开始方法,我们看下OnStart是怎么启动的吧

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (c *MConnection) OnStart() error {
	if err := c.BaseService.OnStart(); err != nil {
		return err
	}
	c.quitSendRoutine = make(chan struct{})
	c.doneSendRoutine = make(chan struct{})
	c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
	c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval)
	c.pongTimeoutCh = make(chan bool, 1)
	c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
	go c.sendRoutine()
	go c.recvRoutine()
	return nil
}

主要语句就两个,创建一个发协程,一个收协程。 先看一看数据如何发出去

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func (c *MConnection) sendRoutine() {
	defer c._recover()

FOR_LOOP:
	for {
		var _n int64
		var err error
	SELECTION:
		select {
		case <-c.flushTimer.Ch:
			// NOTE: flushTimer.Set() must be called every time
			// something is written to .bufConnWriter.
			c.flush()
		case <-c.chStatsTimer.Chan():
			for _, channel := range c.channels {
				channel.updateStats()
			}
		case <-c.pingTimer.Chan():
			c.Logger.Debug("Send Ping")
			_n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
			if err != nil {
				break SELECTION
			}
			c.sendMonitor.Update(int(_n))
			c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
			c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
				select {
				case c.pongTimeoutCh <- true:
				default:
				}
			})
			c.flush()
		case timeout := <-c.pongTimeoutCh:
			if timeout {
				c.Logger.Debug("Pong timeout")
				err = errors.New("pong timeout")
			} else {
				c.stopPongTimer()
			}
		case <-c.pong:
			c.Logger.Debug("Send Pong")
			_n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{})
			if err != nil {
				break SELECTION
			}
			c.sendMonitor.Update(int(_n))
			c.flush()
		case <-c.quitSendRoutine:
			close(c.doneSendRoutine)
			break FOR_LOOP
		case <-c.send:
			// Send some PacketMsgs
			eof := c.sendSomePacketMsgs()
			if !eof {
				// Keep sendRoutine awake.
				select {
				case c.send <- struct{}{}:
				default:
				}
			}
		}

		if !c.IsRunning() {
			break FOR_LOOP
		}
		if err != nil {
			c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
			c.stopForError(err)
			break FOR_LOOP
		}
	}

	// Cleanup
	c.stopPongTimer()
}

上面有ping,pong,就是和别人通信是否稳定的,不进去看了。我们看下是如何发送出去自己的数据的。也就是c.send收到了内容的时候

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (c *MConnection) sendPacketMsg() bool {
	// 挑选一个channel,这个channel是最近发送比率最低的
	...

	// Nothing to send?
	if leastChannel == nil {
		return true
	}
	// c.Logger.Info("Found a msgPacket to send")

	// Make & send a PacketMsg from this channel
	_n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
	if err != nil {
		c.Logger.Error("Failed to write PacketMsg", "err", err)
		c.stopForError(err)
		return true
	}
	c.sendMonitor.Update(int(_n))
	c.flushTimer.Set()
	return false
}

然后我们沿着writePacketMsgTo看下去的话,会看到函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (ch *Channel) nextPacketMsg() PacketMsg {
	packet := PacketMsg{}
	packet.ChannelID = byte(ch.desc.ID)
	maxSize := ch.maxPacketMsgPayloadSize
	packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
	if len(ch.sending) <= maxSize {
		packet.EOF = byte(0x01)
		ch.sending = nil
		atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
	} else {
		packet.EOF = byte(0x00)
		ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
	}
	return packet
}

很简单,就是如果这个包还没发送完,就从缓冲区里面继续读,继续发,如果发完了,队列就减少1.并且再发送的过程中设置结束变量。 所以呢,整体的流程就是,有包的话就从缓冲区里面读出来,封装packet然后发出去,如果包多,就一直sendSomePackets. 接下来再看看怎么收的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
func (c *MConnection) recvRoutine() {
	defer c._recover()

FOR_LOOP:
	for {
		// Block until .recvMonitor says we can read.
		c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)


		// Read packet type
		var packet Packet
		var _n int64
		var err error
		_n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize))
		c.recvMonitor.Update(int(_n))
		if err != nil {
			if c.IsRunning() {
				c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
				c.stopForError(err)
			}
			break FOR_LOOP
		}

		// Read more depending on packet type.
		switch pkt := packet.(type) {
		case PacketPing:
			// TODO: prevent abuse, as they cause flush()'s.
			// https://github.com/tendermint/tendermint/issues/1190
			c.Logger.Debug("Receive Ping")
			select {
			case c.pong <- struct{}{}:
			default:
				// never block
			}
		case PacketPong:
			c.Logger.Debug("Receive Pong")
			select {
			case c.pongTimeoutCh <- false:
			default:
				// never block
			}
		case PacketMsg:
			channel, ok := c.channelsIdx[pkt.ChannelID]
			if !ok || channel == nil {
				err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
				c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
				c.stopForError(err)
				break FOR_LOOP
			}

			msgBytes, err := channel.recvPacketMsg(pkt)
			if err != nil {
				if c.IsRunning() {
					c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
					c.stopForError(err)
				}
				break FOR_LOOP
			}
			if msgBytes != nil {
				c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
				// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
				c.onReceive(pkt.ChannelID, msgBytes)
			}
		default:
			err := fmt.Errorf("Unknown message type %v", reflect.TypeOf(packet))
			c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
			c.stopForError(err)
			break FOR_LOOP
		}
	}

	// Cleanup
	close(c.pong)
	for range c.pong {
		// Drain
	}
}

对于ping和pong我们就不看了,就是检测连接的。主要看一下PacketMsg类型的处理。再最前面发的时候,我们看到包如果没发完,标志量EOF不是01,这个地方的channel.recvPacketMsg(pkt)就是把一个完整的封装起来的。然后我们发现了onReceive函数,还记得最开始说的回调函数吗?就是这!

所以,MConnection的工作就是收发信息,channel是通信的渠道,发就调用底层的发出去,收就进行回调。那回调函数在哪呢?我们接下来继续看。

peer

现在我们再看一个peer是怎么创建的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func newPeer(
	pc peerConn,
	mConfig tmconn.MConnConfig,
	nodeInfo NodeInfo,
	reactorsByCh map[byte]Reactor,
	chDescs []*tmconn.ChannelDescriptor,
	onPeerError func(Peer, interface{}),
	options ...PeerOption,
) *peer {
	p := &peer{
		peerConn:      pc,
		nodeInfo:      nodeInfo,
		channels:      nodeInfo.(DefaultNodeInfo).Channels, // TODO
		Data:          cmn.NewCMap(),
		metricsTicker: time.NewTicker(metricsTickerDuration),
		metrics:       NopMetrics(),
	}

	p.mconn = createMConnection(
		pc.conn,
		p,
		reactorsByCh,
		chDescs,
		onPeerError,
		mConfig,
	)
	p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
	for _, option := range options {
		option(p)
	}

	return p
}

等等回调函数呢? 我们看下createMConnection

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func createMConnection(
	conn net.Conn,
	p *peer,
	reactorsByCh map[byte]Reactor,
	chDescs []*tmconn.ChannelDescriptor,
	onPeerError func(Peer, interface{}),
	config tmconn.MConnConfig,
) *tmconn.MConnection {

	onReceive := func(chID byte, msgBytes []byte) {
		reactor := reactorsByCh[chID]
		if reactor == nil {
			// Note that its ok to panic here as it's caught in the conn._recover,
			// which does onPeerError.
			panic(fmt.Sprintf("Unknown channel %X", chID))
		}
		p.metrics.PeerReceiveBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
		reactor.Receive(chID, p, msgBytes)
	}

	onError := func(r interface{}) {
		onPeerError(p, r)
	}

	return tmconn.NewMConnectionWithConfig(
		conn,
		chDescs,
		onReceive,
		onError,
		config,
	)
}

看到了吧,在这里面,reactor的receive函数做进一步的处理。每一个channel就有一个对应的reactor来处理。 需要注意的就是这创建了一个peer的基础服务。我们看一下peer怎么工作的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (p *peer) OnStart() error {
	if err := p.BaseService.OnStart(); err != nil {
		return err
	}

	if err := p.mconn.Start(); err != nil {
		return err
	}

	go p.metricsReporter()
	return nil
}

这里面启动的就是基本的服务,然后是通信服务,最后还有一个统计的服务。 到这似乎就结束了。 不对!MConnection是通过Peer维护的,所以收发得靠Peer来做。看看peer怎么发的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (p *peer) Send(chID byte, msgBytes []byte) bool {
	if !p.IsRunning() {
		return false
	} else if !p.hasChannel(chID) {
		return false
	}
	res := p.mconn.Send(chID, msgBytes)
	if res {
		p.metrics.PeerSendBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
	}
	return res
}

func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
	if !p.IsRunning() {
		return false
	} else if !p.hasChannel(chID) {
		return false
	}
	res := p.mconn.TrySend(chID, msgBytes)
	if res {
		p.metrics.PeerSendBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
	}
	return res
}

还是调用了底层的mconn的处理。收的处理还在reactor中,我们还没看到那一层。