最近还在继续看Fabric源码,发现自己还是差的太多了。估计可能是看的方式不太对。不过也慢慢的有了一点收获吧。

这次带来的分享是PutState以及GetState。实际上这两个函数区别不大,就是几个关键字不同。 首先,我们知道,PutState是可以在chaincode里面书写,是属于ChaincodeStub的一个方法。 代码很简单,主要就是这几句:

1
2
3
4
5
6
func (stub *ChaincodeStub) PutState(key string, value []byte) error {
	if key == "" {
		return fmt.Errorf("key must not be an empty string")
	}
	return stub.handler.handlePutState(key, value, stub.TxID)
}

没有什么问题,可以看到是调用的handler的handlePutState方法,继续看下去。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// handlePutState communicates with the validator to put state information into the ledger.
func (handler *Handler) handlePutState(key string, value []byte, txid string) error {
	// Check if this is a transaction
	chaincodeLogger.Debugf("[%s]Inside putstate", shorttxid(txid))

	//we constructed a valid object. No need to check for error
	payloadBytes, _ := proto.Marshal(&pb.PutStateInfo{Key: key, Value: value})

	// Create the channel on which to communicate the response from validating peer
	var respChan chan pb.ChaincodeMessage
	var err error
	if respChan, err = handler.createChannel(txid); err != nil {
		return err
	}

	defer handler.deleteChannel(txid)

	// Send PUT_STATE message to validator chaincode support
	msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_PUT_STATE, Payload: payloadBytes, Txid: txid}
	chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_PUT_STATE)

	var responseMsg pb.ChaincodeMessage

	if responseMsg, err = handler.sendReceive(msg, respChan); err != nil {
		return errors.New(fmt.Sprintf("[%s]error sending PUT_STATE %s", msg.Txid, err))
	}

	if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() {
		// Success response
		chaincodeLogger.Debugf("[%s]Received %s. Successfully updated state", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE)
		return nil
	}

内容有点长,我还没有截完,核心的函数就是三个,createChannel,deleteChannel以及sendReceive。要理解这三个函数需要看下handler的结构。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type Handler struct {
	sync.RWMutex
	//shim to peer grpc serializer. User only in serialSend
	serialLock sync.Mutex
	To         string
	ChatStream PeerChaincodeStream
	FSM        *fsm.FSM
	cc         Chaincode
	responseChannel map[string]chan pb.ChaincodeMessage
	nextState       chan *nextStateInfo
}

我们在这先关注responseChnnel,是一个mao类型,txid->chan,是一个对应关系。 createChannel以及deleteChannel都是针对这个map进行处理的,就不在详细查看了。 好了,我们继续看sendReceive函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) {
	errc := make(chan error, 1)
	handler.serialSendAsync(msg, errc)

	//the serialsend above will send an err or nil
	//the select filters that first error(or nil)
	//and continues to wait for the response
	//it is possible that the response triggers first
	//in which case the errc obviously worked and is
	//ignored
	for {
		select {
		case err := <-errc:
			if err == nil {
				continue
			}
			//would have been logged, return false
			return pb.ChaincodeMessage{}, err
		case outmsg, val := <-c:
			if !val {
				return pb.ChaincodeMessage{}, fmt.Errorf("unexpected failure on receive")
			}
			return outmsg, nil
		}
	}
}

好的,可以看到了传递了两个参数,一个是msg,一个是respChan,注意,这个地方有重点了。 可以看到代码里面调用了serialSendAsync函数,传递了一个新的chan以及msg,但是并没有传递respChan.然后下面等待接收消息。一个是errc的,一个是respChan的。根据我们的应用,正确的消息应该是执行的

1
case outmsg, val := <-c:

这一句。这儿就很奇怪了,没有传递是怎么接受的数据呢?其次,chan不是应该只返回一个值吗?为什么是两个值? 我们先留待疑问,继续往下看serialSendAsync函数。

1
2
3
4
5
6
7
8
func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) {
	go func() {
		err := handler.serialSend(msg)
		if errc != nil {
			errc <- err
		}
	}()
}

开启了一个协程,实现异步接受消息,对应了函数名。估计serialSend没有什么复杂的了。

1
2
3
4
5
6
7
8
func (handler *Handler) serialSend(msg *pb.ChaincodeMessage) error {
	handler.serialLock.Lock()
	defer handler.serialLock.Unlock()

	err := handler.ChatStream.Send(msg)

	return err
}

果然,调用了ChatStream的Send方法。 到这,我的源码就进入瓶颈了。 由于我们的所有的东西都是运行在docker中的,所以进行沟通的时候会有rpc等等一系列过程,而我刚开始看,所以嘛。。。 接下来就是我的一些猜想了。 我们接着上面的疑问,我们都知道,msg中是有txid的,而txid是对应的是respChan.除此之外,我们还看到有这个函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (handler *Handler) sendChannel(msg *pb.ChaincodeMessage) error {
	handler.Lock()
	defer handler.Unlock()
	if handler.responseChannel == nil {
		return fmt.Errorf("[%s]Cannot send message response channel", shorttxid(msg.Txid))
	}
	if handler.responseChannel[msg.Txid] == nil {
		return fmt.Errorf("[%s]sendChannel does not exist", shorttxid(msg.Txid))
	}

	chaincodeLogger.Debugf("[%s]before send", shorttxid(msg.Txid))
	handler.responseChannel[msg.Txid] <- *msg
	chaincodeLogger.Debugf("[%s]after send", shorttxid(msg.Txid))

	return nil
}

在这个地方将消息传递给了respChan,进行后续的沟通。 可以看到还有其他的函数,诸如afterResponseafterError调用了这个函数。 我们可以猜测,在docker中存在着类似一个守护进程,会执行这些过程,处理数据,写入到通道中。(主要是目前文档太大了,太难找了,还没看完. 恩,就这样吧。