我们看完了peer,看点稍微简单点的mempool实现吧。
这个和我们写的abci程序有直接的关联,因此,我们需要仔细阅读下。
废话不多说,我们先看下Mempool的定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
type Mempool struct {
config *cfg.MempoolConfig
proxyMtx sync.Mutex
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
height int64 // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
preCheck PreCheckFunc
postCheck PostCheckFunc
// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
cache txCache
// A log of mempool txs
wal *auto.AutoFile
logger log.Logger
metrics *Metrics
}
|
就是一堆和和交易相关的内容,还有一些和上层通信所需要的接口。
我们看下怎么定义的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func NewMempool(
config *cfg.MempoolConfig,
proxyAppConn proxy.AppConnMempool,
height int64,
options ...MempoolOption,
) *Mempool {
mempool := &Mempool{
config: config,
proxyAppConn: proxyAppConn,
txs: clist.New(),
height: height,
rechecking: 0,
recheckCursor: nil,
recheckEnd: nil,
logger: log.NewNopLogger(),
metrics: NopMetrics(),
}
if config.CacheSize > 0 {
mempool.cache = newMapTxCache(config.CacheSize)
} else {
mempool.cache = nopTxCache{}
}
proxyAppConn.SetResponseCallback(mempool.resCb)
for _, option := range options {
option(mempool)
}
return mempool
}
|
里面有一个需要注意的,就是为proxyAppConn设置了一个回调。
因为我们知道,内存池把一个交易提交给我们的abci程序之后,根据返回的结果来决定下一步如何处理,所以这个地方定义了mempool.resCb
。
我们看一下时怎么从内存池中取出来一个交易的。为了方便起见,不妨假设是按照交易笔数取出来的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (mem *Mempool) ReapMaxTxs(max int) types.Txs {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
if max < 0 {
max = mem.txs.Len()
}
for atomic.LoadInt32(&mem.rechecking) > 0 {
// TODO: Something better?
time.Sleep(time.Millisecond * 10)
}
txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max))
for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() {
memTx := e.Value.(*mempoolTx)
txs = append(txs, memTx.tx)
}
return txs
}
|
主要流程不复杂,就是从tx列表中按照要求的数目来读取交易,按照用户要求的数目读取出来,然后返回回去。
那怎么更新的呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
func (mem *Mempool) Update(
height int64,
txs types.Txs,
preCheck PreCheckFunc,
postCheck PostCheckFunc,
) error {
// Set height
mem.height = height
mem.notifiedTxsAvailable = false
if preCheck != nil {
mem.preCheck = preCheck
}
if postCheck != nil {
mem.postCheck = postCheck
}
// Add committed transactions to cache (if missing).
for _, tx := range txs {
_ = mem.cache.Push(tx)
}
// Remove committed transactions.
txsLeft := mem.removeTxs(txs)
// Either recheck non-committed txs to see if they became invalid
// or just notify there're some txs left.
if len(txsLeft) > 0 {
if mem.config.Recheck {
mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height)
mem.recheckTxs(txsLeft)
// At this point, mem.txs are being rechecked.
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
} else {
mem.notifyTxsAvailable()
}
}
// Update metrics
mem.metrics.Size.Set(float64(mem.Size()))
return nil
}
|
重点就是txsLeft := mem.removeTxs(txs)
会把已经提交的tx给移除出去,然后其他的就都无所谓了。
接下来再看看如何检查交易的,换句话说,如何把底层的交易传递给abci程序的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
mem.proxyMtx.Lock()
// use defer to unlock mutex because application (*local client*) might panic
defer mem.proxyMtx.Unlock()
if mem.Size() >= mem.config.Size {
return ErrMempoolIsFull
}
if mem.preCheck != nil {
if err := mem.preCheck(tx); err != nil {
return ErrPreCheck{err}
}
}
// CACHE
if !mem.cache.Push(tx) {
return ErrTxInCache
}
// END CACHE
// WAL
if mem.wal != nil {
// TODO: Notify administrators when WAL fails
_, err := mem.wal.Write([]byte(tx))
if err != nil {
mem.logger.Error("Error writing to WAL", "err", err)
}
_, err = mem.wal.Write([]byte("\n"))
if err != nil {
mem.logger.Error("Error writing to WAL", "err", err)
}
}
// END WAL
// NOTE: proxyAppConn may error if tx buffer is full
if err = mem.proxyAppConn.Error(); err != nil {
return err
}
reqRes := mem.proxyAppConn.CheckTxAsync(tx)
if cb != nil {
reqRes.SetCallback(cb)
}
return nil
}
|
可以看到,最后的方法就是reqRes := mem.proxyAppConn.CheckTxAsync(tx)
来调用上层的内容,获取处理结果。
那么checkTxAsync
具体怎么工作的呢?点进去我们发现proxyAppConn是一个接口?再同一个文件中查找,我们会发现只有一个实现了这个接口,就是appConnMempool
,具体的定义如下
1
2
3
|
type appConnMempool struct {
appConn abcicli.Client
}
|
然后往下看程序继承实现
1
2
3
|
func (app *appConnMempool) CheckTxAsync(tx []byte) *abcicli.ReqRes {
return app.appConn.CheckTxAsync(tx)
}
|
再进去一看,又是接口实现。根据我们对abci的了解,有三种实现,socket,rpc以及直接golang交互,我们为了简单,看直接和golang交互的。
localClient的实现如下
1
2
3
4
5
6
7
|
type localClient struct {
cmn.BaseService
mtx *sync.Mutex
types.Application
Callback
}
|
里面的types.Application就是我们自己的abci程序,看下checkTxAsync,可以看到
1
2
3
4
5
6
7
8
9
10
|
func (app *localClient) CheckTxAsync(tx []byte) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.CheckTx(tx)
return app.callback(
types.ToRequestCheckTx(tx),
types.ToResponseCheckTx(res),
)
}
|
最后面return了一下回调,因为我们之前看到代码是
1
|
proxyAppConn.SetResponseCallback(mempool.resCb)
|
这个return就是使得回调产生了作用,那我们再看看resCb。
1
2
3
4
5
6
7
8
9
|
func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
if mem.recheckCursor == nil {
mem.resCbNormal(req, res)
} else {
mem.metrics.RecheckTimes.Add(1)
mem.resCbRecheck(req, res)
}
mem.metrics.Size.Set(float64(mem.Size()))
}
|
正常情况下,我们应该去追踪Normal的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
tx := req.GetCheckTx().Tx
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
memTx := &mempoolTx{
height: mem.height,
gasWanted: r.CheckTx.GasWanted,
tx: tx,
}
mem.txs.PushBack(memTx)
mem.logger.Info("Added good transaction",
"tx", TxID(tx),
"res", r,
"height", memTx.height,
"total", mem.Size(),
)
mem.metrics.TxSizeBytes.Observe(float64(len(tx)))
mem.notifyTxsAvailable()
} else {
// ignore bad transaction
mem.logger.Info("Rejected bad transaction", "tx", TxID(tx), "res", r, "err", postCheckErr)
mem.metrics.FailedTxs.Add(1)
// remove from cache (it might be good later)
mem.cache.Remove(tx)
}
default:
// ignore other messages
}
}
|
看这段代码,我们就可以看到如果abci程序返回的结果正确,就可以添加到内存池中了。
可是问题来了,我们刚刚都是在考虑checkTx是如何执行的,那么是谁调用的呢?Reactor!
1
2
3
4
5
|
type MempoolReactor struct {
p2p.BaseReactor
config *cfg.MempoolConfig
Mempool *Mempool
}
|
类型定义很简单。
正常情况下得有OnStart函数,我们点进去一看,很简单,就不看了。
再看一下Receive函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
if err != nil {
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
memR.Switch.StopPeerForError(src, err)
return
}
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
switch msg := msg.(type) {
case *TxMessage:
err := memR.Mempool.CheckTx(msg.Tx, nil)
if err != nil {
memR.Logger.Info("Could not check tx", "tx", TxID(msg.Tx), "err", err)
}
// broadcasting happens from go routines per peer
default:
memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}
|
也不难理解,就是把接收到的消息添加到内存池当中,然后进行checkTx。
那么如何给别人发呢?就是在AddPeer中。
1
2
3
|
func (memR *MempoolReactor) AddPeer(peer p2p.Peer) {
go memR.broadcastTxRoutine(peer)
}
|
看下启动的协程是如何工作的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
if !memR.config.Broadcast {
return
}
var next *clist.CElement
for {
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
if next == nil {
select {
case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available
if next = memR.Mempool.TxsFront(); next == nil {
continue
}
case <-peer.Quit():
return
case <-memR.Quit():
return
}
}
memTx := next.Value.(*mempoolTx)
// make sure the peer is up to date
peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
if !ok {
// Peer does not have a state yet. We set it in the consensus reactor, but
// when we add peer in Switch, the order we call reactors#AddPeer is
// different every time due to us using a map. Sometimes other reactors
// will be initialized before the consensus reactor. We should wait a few
// milliseconds and retry.
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
if peerState.GetHeight() < memTx.Height()-1 { // Allow for a lag of 1 block
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
// send memTx
msg := &TxMessage{Tx: memTx.tx}
success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))
if !success {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
select {
case <-next.NextWaitChan():
// see the start of the for loop for nil check
next = next.Next()
case <-peer.Quit():
return
case <-memR.Quit():
return
}
}
}
|
还行,还可以看懂,就是看下peer的状态,然后封装mempool的交易,发送给peer。
到这块,内存池的内容就结束了,不难理解,这个内存池只是一个存储区,对交易的内容不做任何处理,来了新的交易就取出来发给上层abci程序,如果通过就放到内存池,然后等待共识层接受,给出去。所以是一笔一笔的交易的转移。