【注意】最后更新于 November 9, 2017,文中内容可能已过时,请谨慎使用。
最近还在继续看Fabric源码,发现自己还是差的太多了。估计可能是看的方式不太对。不过也慢慢的有了一点收获吧。
这次带来的分享是PutState以及GetState。实际上这两个函数区别不大,就是几个关键字不同。
首先,我们知道,PutState是可以在chaincode里面书写,是属于ChaincodeStub
的一个方法。
代码很简单,主要就是这几句:
1
2
3
4
5
6
|
func (stub *ChaincodeStub) PutState(key string, value []byte) error {
if key == "" {
return fmt.Errorf("key must not be an empty string")
}
return stub.handler.handlePutState(key, value, stub.TxID)
}
|
没有什么问题,可以看到是调用的handler的handlePutState
方法,继续看下去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// handlePutState communicates with the validator to put state information into the ledger.
func (handler *Handler) handlePutState(key string, value []byte, txid string) error {
// Check if this is a transaction
chaincodeLogger.Debugf("[%s]Inside putstate", shorttxid(txid))
//we constructed a valid object. No need to check for error
payloadBytes, _ := proto.Marshal(&pb.PutStateInfo{Key: key, Value: value})
// Create the channel on which to communicate the response from validating peer
var respChan chan pb.ChaincodeMessage
var err error
if respChan, err = handler.createChannel(txid); err != nil {
return err
}
defer handler.deleteChannel(txid)
// Send PUT_STATE message to validator chaincode support
msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_PUT_STATE, Payload: payloadBytes, Txid: txid}
chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_PUT_STATE)
var responseMsg pb.ChaincodeMessage
if responseMsg, err = handler.sendReceive(msg, respChan); err != nil {
return errors.New(fmt.Sprintf("[%s]error sending PUT_STATE %s", msg.Txid, err))
}
if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() {
// Success response
chaincodeLogger.Debugf("[%s]Received %s. Successfully updated state", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE)
return nil
}
|
内容有点长,我还没有截完,核心的函数就是三个,createChannel
,deleteChannel
以及sendReceive
。要理解这三个函数需要看下handler的结构。
1
2
3
4
5
6
7
8
9
10
11
|
type Handler struct {
sync.RWMutex
//shim to peer grpc serializer. User only in serialSend
serialLock sync.Mutex
To string
ChatStream PeerChaincodeStream
FSM *fsm.FSM
cc Chaincode
responseChannel map[string]chan pb.ChaincodeMessage
nextState chan *nextStateInfo
}
|
我们在这先关注responseChnnel,是一个mao类型,txid->chan
,是一个对应关系。
createChannel
以及deleteChannel
都是针对这个map进行处理的,就不在详细查看了。
好了,我们继续看sendReceive
函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) {
errc := make(chan error, 1)
handler.serialSendAsync(msg, errc)
//the serialsend above will send an err or nil
//the select filters that first error(or nil)
//and continues to wait for the response
//it is possible that the response triggers first
//in which case the errc obviously worked and is
//ignored
for {
select {
case err := <-errc:
if err == nil {
continue
}
//would have been logged, return false
return pb.ChaincodeMessage{}, err
case outmsg, val := <-c:
if !val {
return pb.ChaincodeMessage{}, fmt.Errorf("unexpected failure on receive")
}
return outmsg, nil
}
}
}
|
好的,可以看到了传递了两个参数,一个是msg
,一个是respChan
,注意,这个地方有重点了。
可以看到代码里面调用了serialSendAsync
函数,传递了一个新的chan
以及msg
,但是并没有传递respChan
.然后下面等待接收消息。一个是errc的,一个是respChan的。根据我们的应用,正确的消息应该是执行的
1
|
case outmsg, val := <-c:
|
这一句。这儿就很奇怪了,没有传递是怎么接受的数据呢?其次,chan不是应该只返回一个值吗?为什么是两个值?
我们先留待疑问,继续往下看serialSendAsync
函数。
1
2
3
4
5
6
7
8
|
func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) {
go func() {
err := handler.serialSend(msg)
if errc != nil {
errc <- err
}
}()
}
|
开启了一个协程,实现异步接受消息,对应了函数名。估计serialSend
没有什么复杂的了。
1
2
3
4
5
6
7
8
|
func (handler *Handler) serialSend(msg *pb.ChaincodeMessage) error {
handler.serialLock.Lock()
defer handler.serialLock.Unlock()
err := handler.ChatStream.Send(msg)
return err
}
|
果然,调用了ChatStream的Send方法。
到这,我的源码就进入瓶颈了。
由于我们的所有的东西都是运行在docker中的,所以进行沟通的时候会有rpc等等一系列过程,而我刚开始看,所以嘛。。。
接下来就是我的一些猜想了。
我们接着上面的疑问,我们都知道,msg
中是有txid的,而txid是对应的是respChan
.除此之外,我们还看到有这个函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (handler *Handler) sendChannel(msg *pb.ChaincodeMessage) error {
handler.Lock()
defer handler.Unlock()
if handler.responseChannel == nil {
return fmt.Errorf("[%s]Cannot send message response channel", shorttxid(msg.Txid))
}
if handler.responseChannel[msg.Txid] == nil {
return fmt.Errorf("[%s]sendChannel does not exist", shorttxid(msg.Txid))
}
chaincodeLogger.Debugf("[%s]before send", shorttxid(msg.Txid))
handler.responseChannel[msg.Txid] <- *msg
chaincodeLogger.Debugf("[%s]after send", shorttxid(msg.Txid))
return nil
}
|
在这个地方将消息传递给了respChan
,进行后续的沟通。
可以看到还有其他的函数,诸如afterResponse
,afterError
调用了这个函数。
我们可以猜测,在docker中存在着类似一个守护进程,会执行这些过程,处理数据,写入到通道中。(主要是目前文档太大了,太难找了,还没看完.
恩,就这样吧。