在开始本节之前,我们得先说两个非常重要的接口,service和reactor。
接口
首先我们来看service,tendermint中所有的服务都是实现了这个接口定义的函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
type Service interface {
// 默认的启动函数
Start() error
OnStart() error
// 停止运行函数
Stop() error
OnStop()
// 重启服务
Reset() error
OnReset() error
// 服务是否在运行
IsRunning() bool
// 返回一个通道,结束服务
Quit() <-chan struct{}
// 返回服务的标志
String() string
// 日志记录
SetLogger(log.Logger)
}
|
可以很清楚的理解service定义的这几个接口,并不难,对service的基本的操作也就这几个函数。
我们再看一看reactor接口。
说到reactor,总会想到RxJava,或者spring中的reactor,也就是异步式编程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
type Reactor interface {
cmn.Service // 默认继承了service基本接口
// 添加一个路由
SetSwitch(*Switch)
// 获取channel
GetChannels() []*conn.ChannelDescriptor
// 添加以恶搞peer
AddPeer(peer Peer)
// 移除一个peer
RemovePeer(peer Peer, reason interface{})
// 处理receive函数
Receive(chID byte, peer Peer, msgBytes []byte)
}
|
这个reactor就是和整个P2P网络进行交互的组件模块。
peer
我们先看一下peer的定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
type peer struct {
cmn.BaseService
peerConn
mconn *tmconn.MConnection
nodeInfo NodeInfo
channels []byte
Data *cmn.CMap
metrics *Metrics
metricsTicker *time.Ticker
}
|
我们先关注peer的通信模块。我们根据内部成员变量,猜测主要和通信相关的是peerConn以及mconn,点进去一看,确实没错。
peerConn
这个定义比较简单,就是简单的原声通信,记录了通信的一些内容
1
2
3
4
5
6
7
8
9
10
|
type peerConn struct {
outbound bool
persistent bool
conn net.Conn // source connection
originalAddr *NetAddress // nil for inbound connections
// cached RemoteIP()
ip net.IP
}
|
可以看着,peerConn维护了一个连接,和别人的通信地址。没有太难理解的内容。
mconn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
type MConnection struct {
cmn.BaseService
conn net.Conn
bufConnReader *bufio.Reader
bufConnWriter *bufio.Writer
sendMonitor *flow.Monitor
recvMonitor *flow.Monitor
send chan struct{}
pong chan struct{}
channels []*Channel
channelsIdx map[byte]*Channel
onReceive receiveCbFunc
onError errorCbFunc
errored uint32
config MConnConfig
// Closing quitSendRoutine will cause
// doneSendRoutine to close.
quitSendRoutine chan struct{}
doneSendRoutine chan struct{}
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
pingTimer *cmn.RepeatTimer // send pings periodically
// close conn if pong is not received in pongTimeout
pongTimer *time.Timer
pongTimeoutCh chan bool // true - timeout, false - peer sent pong
chStatsTimer *cmn.RepeatTimer // update channel stats periodically
created time.Time // time of creation
_maxPacketMsgSize int
}
|
这个好复杂,我们根据自带的注释,可以理解为,这个实现了peer和其他节点的多工通信。对于每一个通信有一个通道和通道ID。
我们来看一看具体的实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig) *MConnection {
if config.PongTimeout >= config.PingInterval {
panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
}
mconn := &MConnection{
conn: conn,
bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
sendMonitor: flow.New(0, 0),
recvMonitor: flow.New(0, 0),
send: make(chan struct{}, 1),
pong: make(chan struct{}, 1),
onReceive: onReceive,
onError: onError,
config: config,
created: time.Now(),
}
// Create channels
var channelsIdx = map[byte]*Channel{}
var channels = []*Channel{}
for _, desc := range chDescs {
channel := newChannel(mconn, *desc)
channelsIdx[channel.desc.ID] = channel
channels = append(channels, channel)
}
mconn.channels = channels
mconn.channelsIdx = channelsIdx
mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
// maxPacketMsgSize() is a bit heavy, so call just once
mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
return mconn
}
|
我们可以看到几个回调函数,onReceive和onError,而且还对conn做了二次io封装,方便进行读写。并且在这创建了好几个通道,最后创建了一个MConnection的基础服务。
通道是怎么实现的呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
|
type Channel struct {
conn *MConnection
desc ChannelDescriptor
sendQueue chan []byte
sendQueueSize int32 // atomic.
recving []byte
sending []byte
recentlySent int64 // exponential moving average
maxPacketMsgPayloadSize int
Logger log.Logger
}
|
每一个通道都维持着一个MConnection,然后有个发送队列,看内容应该是会把发送队列的内容放到sending缓冲区发出去,然后recving是接受缓冲区。根据channel的成员函数,我们可以看到,会有goroutine调用来读写sending和recving缓冲区进行数据收发。
我们知道OnStart是开始方法,我们看下OnStart是怎么启动的吧
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (c *MConnection) OnStart() error {
if err := c.BaseService.OnStart(); err != nil {
return err
}
c.quitSendRoutine = make(chan struct{})
c.doneSendRoutine = make(chan struct{})
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval)
c.pongTimeoutCh = make(chan bool, 1)
c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
go c.sendRoutine()
go c.recvRoutine()
return nil
}
|
主要语句就两个,创建一个发协程,一个收协程。
先看一看数据如何发出去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
func (c *MConnection) sendRoutine() {
defer c._recover()
FOR_LOOP:
for {
var _n int64
var err error
SELECTION:
select {
case <-c.flushTimer.Ch:
// NOTE: flushTimer.Set() must be called every time
// something is written to .bufConnWriter.
c.flush()
case <-c.chStatsTimer.Chan():
for _, channel := range c.channels {
channel.updateStats()
}
case <-c.pingTimer.Chan():
c.Logger.Debug("Send Ping")
_n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
if err != nil {
break SELECTION
}
c.sendMonitor.Update(int(_n))
c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
select {
case c.pongTimeoutCh <- true:
default:
}
})
c.flush()
case timeout := <-c.pongTimeoutCh:
if timeout {
c.Logger.Debug("Pong timeout")
err = errors.New("pong timeout")
} else {
c.stopPongTimer()
}
case <-c.pong:
c.Logger.Debug("Send Pong")
_n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{})
if err != nil {
break SELECTION
}
c.sendMonitor.Update(int(_n))
c.flush()
case <-c.quitSendRoutine:
close(c.doneSendRoutine)
break FOR_LOOP
case <-c.send:
// Send some PacketMsgs
eof := c.sendSomePacketMsgs()
if !eof {
// Keep sendRoutine awake.
select {
case c.send <- struct{}{}:
default:
}
}
}
if !c.IsRunning() {
break FOR_LOOP
}
if err != nil {
c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
c.stopForError(err)
break FOR_LOOP
}
}
// Cleanup
c.stopPongTimer()
}
|
上面有ping,pong,就是和别人通信是否稳定的,不进去看了。我们看下是如何发送出去自己的数据的。也就是c.send收到了内容的时候
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func (c *MConnection) sendPacketMsg() bool {
// 挑选一个channel,这个channel是最近发送比率最低的
...
// Nothing to send?
if leastChannel == nil {
return true
}
// c.Logger.Info("Found a msgPacket to send")
// Make & send a PacketMsg from this channel
_n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
if err != nil {
c.Logger.Error("Failed to write PacketMsg", "err", err)
c.stopForError(err)
return true
}
c.sendMonitor.Update(int(_n))
c.flushTimer.Set()
return false
}
|
然后我们沿着writePacketMsgTo
看下去的话,会看到函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (ch *Channel) nextPacketMsg() PacketMsg {
packet := PacketMsg{}
packet.ChannelID = byte(ch.desc.ID)
maxSize := ch.maxPacketMsgPayloadSize
packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
if len(ch.sending) <= maxSize {
packet.EOF = byte(0x01)
ch.sending = nil
atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
} else {
packet.EOF = byte(0x00)
ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
}
return packet
}
|
很简单,就是如果这个包还没发送完,就从缓冲区里面继续读,继续发,如果发完了,队列就减少1.并且再发送的过程中设置结束变量。
所以呢,整体的流程就是,有包的话就从缓冲区里面读出来,封装packet然后发出去,如果包多,就一直sendSomePackets
.
接下来再看看怎么收的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
func (c *MConnection) recvRoutine() {
defer c._recover()
FOR_LOOP:
for {
// Block until .recvMonitor says we can read.
c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
// Read packet type
var packet Packet
var _n int64
var err error
_n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize))
c.recvMonitor.Update(int(_n))
if err != nil {
if c.IsRunning() {
c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
c.stopForError(err)
}
break FOR_LOOP
}
// Read more depending on packet type.
switch pkt := packet.(type) {
case PacketPing:
// TODO: prevent abuse, as they cause flush()'s.
// https://github.com/tendermint/tendermint/issues/1190
c.Logger.Debug("Receive Ping")
select {
case c.pong <- struct{}{}:
default:
// never block
}
case PacketPong:
c.Logger.Debug("Receive Pong")
select {
case c.pongTimeoutCh <- false:
default:
// never block
}
case PacketMsg:
channel, ok := c.channelsIdx[pkt.ChannelID]
if !ok || channel == nil {
err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
break FOR_LOOP
}
msgBytes, err := channel.recvPacketMsg(pkt)
if err != nil {
if c.IsRunning() {
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
}
break FOR_LOOP
}
if msgBytes != nil {
c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
c.onReceive(pkt.ChannelID, msgBytes)
}
default:
err := fmt.Errorf("Unknown message type %v", reflect.TypeOf(packet))
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
break FOR_LOOP
}
}
// Cleanup
close(c.pong)
for range c.pong {
// Drain
}
}
|
对于ping和pong我们就不看了,就是检测连接的。主要看一下PacketMsg类型的处理。再最前面发的时候,我们看到包如果没发完,标志量EOF不是01,这个地方的channel.recvPacketMsg(pkt)
就是把一个完整的封装起来的。然后我们发现了onReceive函数,还记得最开始说的回调函数吗?就是这!
所以,MConnection的工作就是收发信息,channel是通信的渠道,发就调用底层的发出去,收就进行回调。那回调函数在哪呢?我们接下来继续看。
peer
现在我们再看一个peer是怎么创建的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
func newPeer(
pc peerConn,
mConfig tmconn.MConnConfig,
nodeInfo NodeInfo,
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.(DefaultNodeInfo).Channels, // TODO
Data: cmn.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
}
p.mconn = createMConnection(
pc.conn,
p,
reactorsByCh,
chDescs,
onPeerError,
mConfig,
)
p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
for _, option := range options {
option(p)
}
return p
}
|
等等回调函数呢?
我们看下createMConnection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
func createMConnection(
conn net.Conn,
p *peer,
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
config tmconn.MConnConfig,
) *tmconn.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
// Note that its ok to panic here as it's caught in the conn._recover,
// which does onPeerError.
panic(fmt.Sprintf("Unknown channel %X", chID))
}
p.metrics.PeerReceiveBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
onPeerError(p, r)
}
return tmconn.NewMConnectionWithConfig(
conn,
chDescs,
onReceive,
onError,
config,
)
}
|
看到了吧,在这里面,reactor的receive函数做进一步的处理。每一个channel就有一个对应的reactor来处理。
需要注意的就是这创建了一个peer的基础服务。我们看一下peer怎么工作的
1
2
3
4
5
6
7
8
9
10
11
12
|
func (p *peer) OnStart() error {
if err := p.BaseService.OnStart(); err != nil {
return err
}
if err := p.mconn.Start(); err != nil {
return err
}
go p.metricsReporter()
return nil
}
|
这里面启动的就是基本的服务,然后是通信服务,最后还有一个统计的服务。
到这似乎就结束了。
不对!MConnection是通过Peer维护的,所以收发得靠Peer来做。看看peer怎么发的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func (p *peer) Send(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.Send(chID, msgBytes)
if res {
p.metrics.PeerSendBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
}
return res
}
func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.TrySend(chID, msgBytes)
if res {
p.metrics.PeerSendBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
}
return res
}
|
还是调用了底层的mconn的处理。收的处理还在reactor中,我们还没看到那一层。