文章目录
- 0. peer命令简析
- 1. 创建channel
- internal/peer/channel/create.go
- orderer/common/broadcast/broadcast.go
- orderer/consensus/etcdraft/chain.go
- 2. 节点加入channel
- 3. 安装chaincode
- internal/peer/lifecycle/chaincode/chaincode.go
- internal/peer/lifecycle/chaincode/install.go
- core/chaincode/lifecycle/lifecycle.go
- 4. 认可chaincode定义
- 5. 提交chaincode定义
- 6. 调用chaincode
根据Fabric中典型的业务流程,简单地分析相关的代码,包括创建通道(channel)、加入通道(channel)、安装智能合约(chaincode)、认可智能合约(chaincode)定义、提交智能合约(chaincode)定义、调用智能合约(chaincode)。为避免篇幅过大,本文只摘取部分有助于讲解的代码。
0. peer命令简析
fabric/cmd/peer/main.go
引入了下面两个库:
- spf13/cobra库:Golang最主流的命令行框架
- spf13/viper库:用于读取环境变量或yaml里的配置选项
mainCmd.AddCommand(version.Cmd())
mainCmd.AddCommand(node.Cmd())
mainCmd.AddCommand(chaincode.Cmd(nil, cryptoProvider))
mainCmd.AddCommand(channel.Cmd(nil))
mainCmd.AddCommand(lifecycle.Cmd(cryptoProvider))
可以看到支持version、node、chaincode、channel、lifecycle这些功能。
1. 创建channel
internal/peer/channel/create.go
func executeCreate(cf *ChannelCmdFactory) error {
err := sendCreateChainTransaction(cf)
if err != nil {
return err
}
block, err := getGenesisBlock(cf)
if err != nil {
return err
}
b, err := proto.Marshal(block)
if err != nil {
return err
}
file := channelID + ".block"
if outputBlock != common.UndefinedParamValue {
file = outputBlock
}
err = ioutil.WriteFile(file, b, 0644)
if err != nil {
return err
}
return nil
}
把configtxgen(configtx.yaml)生成的channel.tx文件进行签名后发送到orderer的broadcast接口;然后从orderer获取创世区块,写入block文件。
orderer/common/broadcast/broadcast.go
orderer处理消息的函数为ProcessMessage:
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
tracker := &MetricsTracker{
ChannelID: "unknown",
TxType: "unknown",
Metrics: bh.Metrics,
}
defer func() {
// This looks a little unnecessary, but if done directly as
// a defer, resp gets the (always nil) current state of resp
// and not the return value
tracker.Record(resp)
}()
tracker.BeginValidate()
chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)
if chdr != nil {
tracker.ChannelID = chdr.ChannelId
tracker.TxType = cb.HeaderType(chdr.Type).String()
}
if err != nil {
logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()}
}
if !isConfig {
// ......
if err = processor.WaitReady(); err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
err = processor.Order(msg, configSeq)
// ......
}
} else { // isConfig
// ......
if err = processor.WaitReady(); err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
err = processor.Configure(config, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
}
// ......
}
ProcessMessage处理的消息类型分为普通交易和config交易。所有的config交易都会独立成块,因为需要立刻生效,这样后面的普通交易才能基于更新配置后的Orderer进行操作。可以看到config交易的分支调用了Configure函数。
orderer/consensus/etcdraft/chain.go
Configure函数:
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
c.Metrics.ConfigProposalsReceived.Add(1)
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
提交config交易,用于共识。如果当前orderer节点就是leader,直接请求本地的goroutine;否则通过传输机制请求真正的leader节点。
共识后会写入config区块(WriteConfigBlock 函数),然后创建新的channel,在Orderer的ledger中存储该channel。
channel的重新配置也是向Orderer提交一个ConfigUpdate Tx,需要使用configtxlator工具。
从上面的分析我们也可以看出来,channel的创建不需要Peer节点参与,仅仅是CLI和Orderer的交互。Peer只有在加入到channel后才会参与交互,channel创建的时候会包含channel的Policy,满足特定条件的Peer可以加入这个channel。
2. 节点加入channel
internal/peer/channel/join.go
创建channel后,会从Orderer拿到一个mychannel.block,发送给Peer。
首先执行getJoinCCSpec函数:
func getJoinCCSpec() (*pb.ChaincodeSpec, error) {
if genesisBlockPath == common.UndefinedParamValue {
return nil, errors.New("Must supply genesis block file")
}
gb, err := ioutil.ReadFile(genesisBlockPath)
if err != nil {
return nil, GBFileNotFoundErr(err.Error())
}
// Build the spec
input := &pb.ChaincodeInput{Args: [][]byte{[]byte(cscc.JoinChain), gb}}
spec := &pb.ChaincodeSpec{
Type: pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value["GOLANG"]),
ChaincodeId: &pb.ChaincodeID{Name: "cscc"},
Input: input,
}
return spec, nil
}
获得“configuration system chaincode”(core/scc/cscc/configure.go)中的JoinChain函数spec。
executeJoin函数:
func executeJoin(cf *ChannelCmdFactory) (err error) {
spec, err := getJoinCCSpec()
if err != nil {
return err
}
// Build the ChaincodeInvocationSpec message
invocation := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec}
creator, err := cf.Signer.Serialize()
if err != nil {
return fmt.Errorf("Error serializing identity for %s: %s", cf.Signer.GetIdentifier(), err)
}
var prop *pb.Proposal
prop, _, err = protoutil.CreateProposalFromCIS(pcommon.HeaderType_CONFIG, "", invocation, creator)
if err != nil {
return fmt.Errorf("Error creating proposal for join %s", err)
}
var signedProp *pb.SignedProposal
signedProp, err = protoutil.GetSignedProposal(prop, cf.Signer)
if err != nil {
return fmt.Errorf("Error creating signed proposal %s", err)
}
var proposalResp *pb.ProposalResponse
proposalResp, err = cf.EndorserClient.ProcessProposal(context.Background(), signedProp)
if err != nil {
return ProposalFailedErr(err.Error())
}
if proposalResp == nil {
return ProposalFailedErr("nil proposal response")
}
if proposalResp.Response.Status != 0 && proposalResp.Response.Status != 200 {
return ProposalFailedErr(fmt.Sprintf("bad proposal response %d: %s", proposalResp.Response.Status, proposalResp.Response.Message))
}
logger.Info("Successfully submitted proposal to join channel")
return nil
}
对Spec进行签名包装,调用EndorserClient.ProcessProposal接口,把签名过的proposal发送给Endorser。
core/endorser/endorser.go
ProcessProposal函数:Client与Peer交互的入口。
-
SimulateProposal函数调用:
执行proposal,调用CSCC系统链码。 -
EndorseWithPlugin函数调用:
给执行结果背书。
core/scc/cscc/configure.go
前面提到的需要执行的JoinChain函数:
func (e *PeerConfiger) joinChain(
chainID string,
block *common.Block,
deployedCCInfoProvider ledger.DeployedChaincodeInfoProvider,
lr plugindispatcher.LifecycleResources,
nr plugindispatcher.CollectionAndLifecycleResources,
) pb.Response {
if err := e.peer.CreateChannel(chainID, block, deployedCCInfoProvider, lr, nr); err != nil {
return shim.Error(err.Error())
}
return shim.Success(nil)
}
参数chainID即为channel ID,从myChannel.block里获取。函数内部逻辑为执行CreateChannel函数创建channel。
CreateChannel函数:
func (p *Peer) CreateChannel(
cid string,
cb *common.Block,
deployedCCInfoProvider ledger.DeployedChaincodeInfoProvider,
legacyLifecycleValidation plugindispatcher.LifecycleResources,
newLifecycleValidation plugindispatcher.CollectionAndLifecycleResources,
) error {
l, err := p.LedgerMgr.CreateLedger(cid, cb)
if err != nil {
return errors.WithMessage(err, "cannot create ledger from genesis block")
}
if err := p.createChannel(cid, l, cb, p.pluginMapper, deployedCCInfoProvider, legacyLifecycleValidation, newLifecycleValidation); err != nil {
return err
}
p.initChannel(cid)
return nil
}
- CreateLedger:Peer本地文件初始化、数据库初始化
- createChannel:开启gossip服务,leader Peer选举并连接orderer
- initChannel: 对于每个新创建的channel,需要install每一个sys chaincode。(DeploySysCCs函数)
3. 安装chaincode
internal/peer/lifecycle/chaincode/chaincode.go
新的智能合约生命周期模型支持如下命令:
func Cmd(cryptoProvider bccsp.BCCSP) *cobra.Command {
addFlags(chaincodeCmd)
chaincodeCmd.AddCommand(PackageCmd(nil))
chaincodeCmd.AddCommand(InstallCmd(nil, cryptoProvider))
chaincodeCmd.AddCommand(QueryInstalledCmd(nil, cryptoProvider))
chaincodeCmd.AddCommand(GetInstalledPackageCmd(nil, cryptoProvider))
chaincodeCmd.AddCommand(ApproveForMyOrgCmd(nil, cryptoProvider))
chaincodeCmd.AddCommand(CheckCommitReadinessCmd(nil, cryptoProvider))
chaincodeCmd.AddCommand(CommitCmd(nil, cryptoProvider))
chaincodeCmd.AddCommand(QueryCommittedCmd(nil, cryptoProvider))
return chaincodeCmd
}
包括打包、安装、查询已安装、获取安装包、认可定义、检查是否就绪、提交、查询已提交这些操作。
internal/peer/lifecycle/chaincode/install.go
func (i *Installer) Install() error {
err := i.Input.Validate()
if err != nil {
return err
}
pkgBytes, err := i.Reader.ReadFile(i.Input.PackageFile)
if err != nil {
return errors.WithMessagef(err, "failed to read chaincode package at '%s'", i.Input.PackageFile)
}
serializedSigner, err := i.Signer.Serialize()
if err != nil {
return errors.Wrap(err, "failed to serialize signer")
}
proposal, err := i.createInstallProposal(pkgBytes, serializedSigner)
if err != nil {
return err
}
signedProposal, err := signProposal(proposal, i.Signer)
if err != nil {
return errors.WithMessage(err, "failed to create signed proposal for chaincode install")
}
return i.submitInstallProposal(signedProposal)
}
- createInstallProposal函数:准备安装chaincode的提案,包含需要执行的“InstallChaincode”函数,发送到lifecycle系统chaincode。
- signProposal函数:给提案进行签名
- submitInstallProposal函数:调用EndorserClient.ProcessProposal函数提交提案,然后接收响应。
core/chaincode/lifecycle/lifecycle.go
核心代码为该文件中的InstallChaincode函数:
把chaincode放到文件系统里,整个过程相当于代码上传,并不会真正启用chaincode。
存储目录为$fileSystemPath/lifecycle/chaincodes目录,$fileSystemPath在core.yaml中配置,为/var/hyperledger/production。
安装的过程中,CLI只跟当前install的Peer交互,不跟Orderer和其他Peer交互。需要运行的所有Peer都要进行安装操作。
4. 认可chaincode定义
internal/peer/lifecycle/chaincode/approveformyorg.go
peer节点为组织进行认可的核心代码为Approve函数,内部逻辑包括:
- createProposal函数调用:需要调用的chaincode函数为ApproveChaincodeDefinitionForMyOrg,提案中还包括函数参数、channelID、提案创建者信息等;
- signProposal函数调用:提案创建者给提案签名;
- endorser.ProcessProposal函数调用:把签名过的proposal发送给Endorser;
- BroadcastClient.Send函数调用:收到背书响应后,通过orderer将背书过的提案广播给其他Peer。
core/chaincode/lifecycle/lifecycle.go
核心代码为ApproveChaincodeDefinitionForOrg函数:
- 首先判断Sequence,请求的Sequence必须等于当前Sequence,或者是下一个Sequence;
- 把chaincode定义条目增加到组织的状态中;
- package ID如果设置为空,表示组织不再认可此chaincode的调用。
5. 提交chaincode定义
internal/peer/lifecycle/chaincode/commit.go
提交chaincode定义的核心函数为Commit函数:
- createProposal函数调用:需要调用的chaincode函数为CommitChaincodeDefinition,提案中还包括函数参数、channelID、提案创建者信息等;
- signProposal函数调用:提案创建者给提案签名;
- endorser.ProcessProposal函数调用:把签名过的proposal发送给Endorser;
- BroadcastClient.Send函数调用:收到背书响应后,通过orderer将背书过的提案广播给其他Peer。
core/chaincode/lifecycle/lifecycle.go
核心代码为CommitChaincodeDefinition函数:
func (ef *ExternalFunctions) CommitChaincodeDefinition(chname, ccname string, cd *ChaincodeDefinition, publicState ReadWritableState, orgStates []OpaqueState) (map[string]bool, error) {
approvals, err := ef.CheckCommitReadiness(chname, ccname, cd, publicState, orgStates)
if err != nil {
return nil, err
}
if err = ef.Resources.Serializer.Serialize(NamespacesName, ccname, cd, publicState); err != nil {
return nil, errors.WithMessage(err, "could not serialize chaincode definition")
}
return approvals, nil
}
- CheckCommitReadiness函数调用:检查是否满足提交条件,调用QueryOrgApprovals函数查询本组织是否已认可chaincode定义,不满足则直接报错返回;
- Resources.Serializer.Serialize函数调用:将相关信息存储到channel的ledger里。
6. 调用chaincode
chaincode的执行同样是通过ProcessProposal接口与Endorser交互,前面已有所分析。
core\chaincode\handler.go
执行chaincode的核心代码为Execute函数:
var ccresp *pb.ChaincodeMessage
select {
case ccresp = <-txctx.ResponseNotifier:
// response is sent to user or calling chaincode. ChaincodeMessage_ERROR
// are typically treated as error
case <-time.After(timeout):
err = errors.New("timeout expired while executing transaction")
h.Metrics.ExecuteTimeouts.With("chaincode", h.chaincodeID).Add(1)
case <-h.streamDone():
err = errors.New("chaincode stream terminated")
}
- 设置了超时时间,chaincode必须在有限的时间内执行完成。
- chaincode和Peer直接用ChaincodeMessage进行交互,消息的类型有如下几种,包含写入状态、删除状态、获取状态等:
const (
ChaincodeMessage_UNDEFINED ChaincodeMessage_Type = 0
ChaincodeMessage_REGISTER ChaincodeMessage_Type = 1
ChaincodeMessage_REGISTERED ChaincodeMessage_Type = 2
ChaincodeMessage_INIT ChaincodeMessage_Type = 3
ChaincodeMessage_READY ChaincodeMessage_Type = 4
ChaincodeMessage_TRANSACTION ChaincodeMessage_Type = 5
ChaincodeMessage_COMPLETED ChaincodeMessage_Type = 6
ChaincodeMessage_ERROR ChaincodeMessage_Type = 7
ChaincodeMessage_GET_STATE ChaincodeMessage_Type = 8
ChaincodeMessage_PUT_STATE ChaincodeMessage_Type = 9
ChaincodeMessage_DEL_STATE ChaincodeMessage_Type = 10
ChaincodeMessage_INVOKE_CHAINCODE ChaincodeMessage_Type = 11
ChaincodeMessage_RESPONSE ChaincodeMessage_Type = 13
ChaincodeMessage_GET_STATE_BY_RANGE ChaincodeMessage_Type = 14
ChaincodeMessage_GET_QUERY_RESULT ChaincodeMessage_Type = 15
ChaincodeMessage_QUERY_STATE_NEXT ChaincodeMessage_Type = 16
ChaincodeMessage_QUERY_STATE_CLOSE ChaincodeMessage_Type = 17
ChaincodeMessage_KEEPALIVE ChaincodeMessage_Type = 18
ChaincodeMessage_GET_HISTORY_FOR_KEY ChaincodeMessage_Type = 19
ChaincodeMessage_GET_STATE_METADATA ChaincodeMessage_Type = 20
ChaincodeMessage_PUT_STATE_METADATA ChaincodeMessage_Type = 21
ChaincodeMessage_GET_PRIVATE_DATA_HASH ChaincodeMessage_Type = 22
)