标签:engine ext color The ready flags exec tip slot
// dbft算法返回下一个出块节点,如此看返回的就是创世块,也就是说出块节点不变化 func GetValidators(txs []*types.Transaction) ([]keypair.PublicKey, error) { // TODO implement vote return genesis.GenesisBookkeepers, nil } // 入口 func setupAPP() *cli.App { app := cli.NewApp() app.Usage = "Ontology CLI" app.Action = startOntology app.Version = config.Version app.Copyright = "Copyright in 2018 The Ontology Authors" app.Commands = []cli.Command{ // 定义各种操作命令的入口 cmd.AccountCommand, cmd.InfoCommand, cmd.AssetCommand, cmd.ContractCommand, cmd.ExportCommand, } // 定义command { Action: invokeContract, Name: "invoke", Usage: "Invoke smart contract", ArgsUsage: " ", Flags: []cli.Flag{ utils.RPCPortFlag, utils.TransactionGasPriceFlag, utils.TransactionGasLimitFlag, utils.ContractAddrFlag, utils.ContractParamsFlag, utils.ContractVersionFlag, utils.ContractPrepareInvokeFlag, utils.ContractReturnTypeFlag, utils.WalletFileFlag, utils.AccountAddressFlag, }, }, // 可以看到处理函数是:invokeContract func invokeContract(ctx *cli.Context) error { SetRpcPort(ctx) // 设置rpc的端口,DEFAULT_RPC_PORT = uint(20336) // 1、检查是否传入合约地址 // 2、解析合约地址 // 3、解析合约invoke参数 // 4、检查是否设置了预执行ContractPrepareInvokeFlag标记,如果设置了,则预执行 if ctx.IsSet(utils.GetFlagName(utils.ContractPrepareInvokeFlag)) { preResult, err := utils.PrepareInvokeNeoVMContract(contractAddr, params) // 预执行结果 if err != nil { return fmt.Errorf("PrepareInvokeNeoVMSmartContact error:%s", err) } if preResult.State == 0 { return fmt.Errorf("Contract invoke failed\n") } fmt.Printf("Contract invoke successfully\n") fmt.Printf("Gaslimit:%d\n", preResult.Gas) rawReturnTypes := ctx.String(utils.GetFlagName(utils.ContractReturnTypeFlag)) if rawReturnTypes == "" { fmt.Printf(" Return:%s (raw value)\n", preResult.Result) return nil } values, err := utils.ParseReturnValue(preResult.Result, rawReturnTypes) if err != nil { return fmt.Errorf("parseReturnValue values:%+v types:%s error:%s", values, rawReturnTypes, err) } switch len(values) { case 0: fmt.Printf(" Return: nil\n") case 1: fmt.Printf(" Return:%+v\n", values[0]) default: fmt.Printf(" Return:%+v\n", values) } return nil } // 至此,预检查已经结束。。。。。。 // 接下来的步骤是本地发起的没有预检查的网络上转发来的交易 signer, err := cmdcom.GetAccount(ctx) // 获取交易签名 if err != nil { return fmt.Errorf("Get signer account error:%s", err) } gasPrice := ctx.Uint64(utils.GetFlagName(utils.TransactionGasPriceFlag)) // 取出参数 gasLimit := ctx.Uint64(utils.GetFlagName(utils.TransactionGasLimitFlag)) networkId, err := utils.GetNetworkId() // 获取网络ID if err != nil { return err } if networkId == config.NETWORK_ID_SOLO_NET { gasPrice = 0 } txHash, err := utils.InvokeNeoVMContract(gasPrice, gasLimit, signer, contractAddr, params) // 调用VM执行交易 if err != nil { return fmt.Errorf("Invoke NeoVM contract error:%s", err) } fmt.Printf(" TxHash:%s\n", txHash) fmt.Printf("\nTip:\n") fmt.Printf(" Using ‘./ontology info status %s‘ to query transaction status\n", txHash) return nil } // 交易上下文 // Context is a type that is passed through to // each Handler action in a cli application. Context // can be used to retrieve context-specific Args and // parsed command-line options. type Context struct { App *App Command Command shellComplete bool flagSet *flag.FlagSet setFlags map[string]bool parentContext *Context } // 预执行交易 func PrepareInvokeNeoVMContract( contractAddress common.Address, params []interface{}, ) (*cstates.PreExecResult, error) { tx, err := httpcom.NewNeovmInvokeTransaction(0, 0, contractAddress, params) // 构造填充交易结构体 if err != nil { return nil, err } var buffer bytes.Buffer // 定义buffer,取出交易 err = tx.Serialize(&buffer) if err != nil { return nil, fmt.Errorf("Serialize error:%s", err) } txData := hex.EncodeToString(buffer.Bytes()) data, err := sendRpcRequest("sendrawtransaction", []interface{}{txData, 1}) // 发送交易请求 if err != nil { return nil, err } preResult := &cstates.PreExecResult{} // 解析交易结果 err = json.Unmarshal(data, &preResult) if err != nil { return nil, fmt.Errorf("json.Unmarshal PreExecResult:%s error:%s", data, err) } return preResult, nil } // 组装交易 func NewNeovmInvokeTransaction(gasPrice, gasLimit uint64, contractAddress common.Address, params []interface{}) (*types.Transaction, error) { invokeCode, err := BuildNeoVMInvokeCode(contractAddress, params) // 构造参数,构造neovm所需要的参数,包括了合约地址被打包进了invokeCode中 if err != nil { return nil, err } return NewSmartContractTransaction(gasPrice, gasLimit, invokeCode) } // 构造交易 func NewSmartContractTransaction(gasPrice, gasLimit uint64, invokeCode []byte) (*types.Transaction, error) { invokePayload := &payload.InvokeCode{ Code: invokeCode, } tx := &types.Transaction{ GasPrice: gasPrice, GasLimit: gasLimit, TxType: types.Invoke, Nonce: uint32(time.Now().Unix()), Payload: invokePayload, // 存放交易参数 Sigs: make([]*types.Sig, 0, 0), // 签名信息,见下面介绍: } return tx, nil } // 签名信息结构体 type Sig struct { SigData [][]byte // 签名数据 PubKeys []keypair.PublicKey // 公钥对 M uint16 // 目前不清楚,猜测是交易最终需要验证的个数,目前为0 } // 发送rpc请求:注意发送地址是本地请求 func sendRpcRequest(method string, params []interface{}) ([]byte, error) { ...... // 本地请求 addr := fmt.Sprintf("http://localhost:%d", config.DefConfig.Rpc.HttpJsonPort) // 读取返回结果 resp, err := http.Post(addr, "application/json", strings.NewReader(string(data))) if err != nil { return nil, fmt.Errorf("http post request:%s error:%s", data, err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("read rpc response body error:%s", err) } rpcRsp := &JsonRpcResponse{} err = json.Unmarshal(body, rpcRsp) if err != nil { return nil, fmt.Errorf("json.Unmarshal JsonRpcResponse:%s error:%s", body, err) } if rpcRsp.Error != 0 { return nil, fmt.Errorf("error code:%d desc:%s", rpcRsp.Error, rpcRsp.Desc) } return rpcRsp.Result, nil } // 接下来看交易预执行的流程 // 接收函数通过如下注册:在startOntology时候调用initRpc启动StartRPCServer func StartRPCServer() error { log.Debug() http.HandleFunc("/", rpc.Handle) ...... rpc.HandleFunc("sendrawtransaction", rpc.SendRawTransaction) ...... err := http.ListenAndServe(":"+strconv.Itoa(int(cfg.DefConfig.Rpc.HttpJsonPort)), nil) if err != nil { return fmt.Errorf("ListenAndServe error:%s", err) } return nil } // 预执行交易 //send raw transaction // A JSON example for sendrawtransaction method as following: // {"jsonrpc": "2.0", "method": "sendrawtransaction", "params": ["raw transactioin in hex"], "id": 0} 接收到的RPC参数 func SendRawTransaction(params []interface{}) map[string]interface{} { if len(params) < 1 { return responsePack(berr.INVALID_PARAMS, nil) } var hash common.Uint256 switch params[0].(type) { case string: str := params[0].(string) hex, err := common.HexToBytes(str) if err != nil { return responsePack(berr.INVALID_PARAMS, "") } var txn types.Transaction if err := txn.Deserialize(bytes.NewReader(hex)); err != nil { return responsePack(berr.INVALID_TRANSACTION, "") } hash = txn.Hash() log.Debugf("SendRawTransaction recv %s", hash.ToHexString()) if txn.TxType == types.Invoke || txn.TxType == types.Deploy { if len(params) > 1 { preExec, ok := params[1].(float64) // 不明白干啥的,需要看一下上面打包过来的参数都是什么东西 if ok && preExec == 1 { result, err := bactor.PreExecuteContract(&txn) // 预执行:使用的是base/actor,下面具体分析,返回执行结果 if err != nil { log.Infof("PreExec: ", err) return responsePack(berr.SMARTCODE_ERROR, "") } return responseSuccess(result) } } } log.Debugf("SendRawTransaction send to txpool %s", hash.ToHexString()) if errCode, desc := bcomn.SendTxToPool(&txn); errCode != ontErrors.ErrNoError { // 将交易发送到交易池,见后面分析 log.Warnf("SendRawTransaction verified %s error: %s", hash.ToHexString(), desc) return responsePack(berr.INVALID_TRANSACTION, desc) } log.Debugf("SendRawTransaction verified %s", hash.ToHexString()) // 即验证通过了 default: return responsePack(berr.INVALID_PARAMS, "") } return responseSuccess(hash.ToHexString()) } //PreExecuteContract from ledger func PreExecuteContract(tx *types.Transaction) (*cstate.PreExecResult, error) { return ledger.DefLedger.PreExecuteContract(tx) } func (self *Ledger) PreExecuteContract(tx *types.Transaction) (*cstate.PreExecResult, error) { return self.ldgStore.PreExecuteContract(tx) } //PreExecuteContract return the result of smart contract execution without commit to store func (this *LedgerStoreImp) PreExecuteContract(tx *types.Transaction) (*sstate.PreExecResult, error) { header, err := this.GetHeaderByHeight(this.GetCurrentBlockHeight()) // 通过本地的ledger获取区块头信息,获取的是当前记录最新的区块头Current block height if err != nil { // 关于this的结构,可以看下面分析 return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, err } config := &smartcontract.Config{ Time: header.Timestamp, Height: header.Height, Tx: tx, } cache := storage.NewCloneCache(this.stateStore.NewStateBatch()) preGas, err := this.getPreGas(config, cache) if err != nil { return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, err } if tx.TxType == types.Invoke { invoke := tx.Payload.(*payload.InvokeCode) sc := smartcontract.SmartContract{ Config: config, Store: this, CloneCache: cache, Gas: math.MaxUint64 - calcGasByCodeLen(len(invoke.Code), preGas[neovm.UINT_INVOKE_CODE_LEN_NAME]), } //start the smart contract executive function engine, _ := sc.NewExecuteEngine(invoke.Code) // ?没看到执行的代码啊 result, err := engine.Invoke() if err != nil { return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, err // 如果失败,返回定义的最小交易gas耗费值 } gasCost := math.MaxUint64 - sc.Gas mixGas := neovm.MIN_TRANSACTION_GAS if gasCost < mixGas { gasCost = mixGas } return &sstate.PreExecResult{State: event.CONTRACT_STATE_SUCCESS, Gas: gasCost, Result: scommon.ConvertNeoVmTypeHexString(result)}, nil // 返回成功的结果和交易的gas消耗 } else if tx.TxType == types.Deploy { deploy := tx.Payload.(*payload.DeployCode) return &sstate.PreExecResult{State: event.CONTRACT_STATE_SUCCESS, Gas: preGas[neovm.CONTRACT_CREATE_NAME] + calcGasByCodeLen(len(deploy.Code), preGas[neovm.UINT_DEPLOY_CODE_LEN_NAME]), Result: nil}, nil } else { return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, errors.NewErr("transaction type error") } } // 本地账本结构体 //LedgerStoreImp is main store struct fo ledger type LedgerStoreImp struct { blockStore *BlockStore //BlockStore for saving block & transaction data 块存储,本地账本存储,记录本地的块 stateStore *StateStore //StateStore for saving state data, like balance, smart contract execution result, and so on. // 状态存储 eventStore *EventStore //EventStore for saving log those gen after smart contract executed. // 执行日志后的事件存储 storedIndexCount uint32 //record the count of have saved block index // 已经存储的block索引 currBlockHeight uint32 //Current block height // 当前记录的最新的块高度 currBlockHash common.Uint256 //Current block hash 哈希 headerCache map[common.Uint256]*types.Header //BlockHash => Header 块头 headerIndex map[uint32]common.Uint256 //Header index, Mapping header height => block hash 索引 savingBlock bool //is saving block now 是否存储 vbftPeerInfoheader map[string]uint32 //pubInfo save pubkey,peerindex vbftPeerInfoblock map[string]uint32 //pubInfo save pubkey,peerindex lock sync.RWMutex } // 上面分析了交易经过了预执行,接着分析交易进入交易池 func SendTxToPool(txn *types.Transaction) (ontErrors.ErrCode, string) { if errCode, desc := bactor.AppendTxToPool(txn); errCode != ontErrors.ErrNoError { log.Warn("TxnPool verify error:", errCode.Error()) return errCode, desc } return ontErrors.ErrNoError, "" } //append transaction to pool to txpool actor func AppendTxToPool(txn *types.Transaction) (ontErrors.ErrCode, string) { if DisableSyncVerifyTx { txReq := &tcomn.TxReq{txn, tcomn.HttpSender, nil} txnPid.Tell(txReq) return ontErrors.ErrNoError, "" } ch := make(chan *tcomn.TxResult, 1) txReq := &tcomn.TxReq{txn, tcomn.HttpSender, ch} txnPid.Tell(txReq) // 向txnPid发送了一个req if msg, ok := <-ch; ok { ? 没看明白 return msg.Err, msg.Desc } return ontErrors.ErrUnknown, "" } // 处理req请求 // Receive implements the actor interface func (ta *TxActor) Receive(context actor.Context) { switch msg := context.Message().(type) { case *actor.Started: log.Info("txpool-tx actor started and be ready to receive tx msg") case *actor.Stopping: log.Warn("txpool-tx actor stopping") case *actor.Restarting: log.Warn("txpool-tx actor restarting") case *tc.TxReq: sender := msg.Sender log.Debugf("txpool-tx actor receives tx from %v ", sender.Sender()) ta.handleTransaction(sender, context.Self(), msg.Tx, msg.TxResultCh) ...... // handleTransaction handles a transaction from network and http func (ta *TxActor) handleTransaction(sender tc.SenderType, self *actor.PID, txn *tx.Transaction, txResultCh chan *tc.TxResult) { ta.server.increaseStats(tc.RcvStats) if len(txn.ToArray()) > tc.MAX_TX_SIZE { // 交易不能超过1M log.Debugf("handleTransaction: reject a transaction due to size over 1M") if sender == tc.HttpSender && txResultCh != nil { replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown, "size is over 1M") } return } if ta.server.getTransaction(txn.Hash()) != nil { // 是否是重复的交易,注意这里是冲本地获取的已经存处理过的交易 log.Debugf("handleTransaction: transaction %x already in the txn pool", txn.Hash()) ta.server.increaseStats(tc.DuplicateStats) // 记录DuplicateStats if sender == tc.HttpSender && txResultCh != nil { replyTxResult(txResultCh, txn.Hash(), errors.ErrDuplicateInput, fmt.Sprintf("transaction %x is already in the tx pool", txn.Hash())) } } else if ta.server.getTransactionCount() >= tc.MAX_CAPACITY { // 交易池已满 log.Debugf("handleTransaction: transaction pool is full for tx %x", txn.Hash()) ta.server.increaseStats(tc.FailureStats) // 记录FailureStats if sender == tc.HttpSender && txResultCh != nil { replyTxResult(txResultCh, txn.Hash(), errors.ErrTxPoolFull, "transaction pool is full") } } else { if _, overflow := common.SafeMul(txn.GasLimit, txn.GasPrice); overflow { // 溢出保护 log.Debugf("handleTransaction: gasLimit %v, gasPrice %v overflow", txn.GasLimit, txn.GasPrice) if sender == tc.HttpSender && txResultCh != nil { replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown, fmt.Sprintf("gasLimit %d * gasPrice %d overflow", txn.GasLimit, txn.GasPrice)) } return } // 从配置文件读取gas的limit和price gasLimitConfig := config.DefConfig.Common.GasLimit gasPriceConfig := ta.server.getGasPrice() if txn.GasLimit < gasLimitConfig || txn.GasPrice < gasPriceConfig { // 入参不满足配置文件设置 log.Debugf("handleTransaction: invalid gasLimit %v, gasPrice %v", txn.GasLimit, txn.GasPrice) if sender == tc.HttpSender && txResultCh != nil { replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown, fmt.Sprintf("Please input gasLimit >= %d and gasPrice >= %d", gasLimitConfig, gasPriceConfig)) } return } if txn.TxType == tx.Deploy && txn.GasLimit < neovm.CONTRACT_CREATE_GAS { // 如果是部署合约,要求gas必须大于某个值 log.Debugf("handleTransaction: deploy tx invalid gasLimit %v, gasPrice %v", txn.GasLimit, txn.GasPrice) if sender == tc.HttpSender && txResultCh != nil { replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown, fmt.Sprintf("Deploy tx gaslimit should >= %d", neovm.CONTRACT_CREATE_GAS)) } return } if !ta.server.disablePreExec { // 配置了禁用预先检查 if ok, desc := preExecCheck(txn); !ok { log.Debugf("handleTransaction: preExecCheck tx %x failed", txn.Hash()) if sender == tc.HttpSender && txResultCh != nil { replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown, desc) } return } log.Debugf("handleTransaction: preExecCheck tx %x passed", txn.Hash()) } <-ta.server.slots // 写入管道 ta.server.assignTxToWorker(txn, sender, txResultCh) // 将tx分配给worker } } // assignTxToWorker assigns a new transaction to a worker by LB func (s *TXPoolServer) assignTxToWorker(tx *tx.Transaction, sender tc.SenderType, txResultCh chan *tc.TxResult) bool { if tx == nil { return false } if ok := s.setPendingTx(tx, sender, txResultCh); !ok { // 重复交易,即work的pending队列中已经存在该交易 s.increaseStats(tc.DuplicateStats) if sender == tc.HttpSender && txResultCh != nil { replyTxResult(txResultCh, tx.Hash(), errors.ErrDuplicateInput, "duplicated transaction input detected") } return false } // Add the rcvTxn to the worker lb := make(tc.LBSlice, len(s.workers)) for i := 0; i < len(s.workers); i++ { entry := tc.LB{Size: len(s.workers[i].rcvTXCh) + len(s.workers[i].pendingTxList), WorkerID: uint8(i), } lb[i] = entry } sort.Sort(lb) // 排序 s.workers[lb[0].WorkerID].rcvTXCh <- tx // 将tx放入s.workers[lb[0].WorkerID].rcvTXCh,发送一个channel return true } // 处理channel // Start is the main event loop. func (worker *txPoolWorker) start() { worker.timer = time.NewTimer(time.Second * tc.EXPIRE_INTERVAL) for { select { case <-worker.stopCh: worker.server.wg.Done() return case rcvTx, ok := <-worker.rcvTXCh: // 处理rcvTXCh if ok { // Verify rcvTxn worker.verifyTx(rcvTx) } case stfTx, ok := <-worker.stfTxCh: if ok { worker.verifyStateful(stfTx) } case <-worker.timer.C: worker.handleTimeoutEvent() worker.timer.Stop() worker.timer.Reset(time.Second * tc.EXPIRE_INTERVAL) case rsp, ok := <-worker.rspCh: if ok { /* Handle the response from validator, if all of cases * are verified, put it to txnPool */ worker.handleRsp(rsp) } } } } // 验证rcvTXCh // verifyTx prepares a check request and sends it to the validators. func (worker *txPoolWorker) verifyTx(tx *tx.Transaction) { if tx := worker.server.getTransaction(tx.Hash()); tx != nil { // 交易已经存在 log.Infof("verifyTx: transaction %x already in the txn pool", tx.Hash()) worker.server.removePendingTx(tx.Hash(), errors.ErrDuplicateInput) return } if _, ok := worker.pendingTxList[tx.Hash()]; ok { // log.Infof("verifyTx: transaction %x already in the verifying process", tx.Hash()) return } // Construct the request and send it to each validator server to verify req := &types.CheckTx{ WorkerId: worker.workId, Tx: *tx, } worker.sendReq2Validator(req) // Construct the pending transaction pt := &pendingTx{ tx: tx, req: req, flag: 0, retries: 0, } // Add it to the pending transaction list worker.mu.Lock() worker.pendingTxList[tx.Hash()] = pt worker.mu.Unlock() // Record the time per a txn pt.valTime = time.Now() } // 过了预执行阶段,注意这段的逻辑和预执行很像 //Invoke neo vm smart contract. if isPreExec is true, the invoke will not really execute func InvokeNeoVMContract( gasPrice, gasLimit uint64, signer *account.Account, smartcodeAddress common.Address, params []interface{}) (string, error) { tx, err := httpcom.NewNeovmInvokeTransaction(gasPrice, gasLimit, smartcodeAddress, params) // 组装交易结构体 if err != nil { return "", err } return InvokeSmartContract(signer, tx) // invoke } //InvokeSmartContract is low level method to invoke contact. func InvokeSmartContract(signer *account.Account, tx *types.Transaction) (string, error) { // 为交易签名 err := SignTransaction(signer, tx) if err != nil { return "", fmt.Errorf("SignTransaction error:%s", err) } // 发送SendRawTransaction,这个和上面预交易一致 txHash, err := SendRawTransaction(tx) if err != nil { return "", fmt.Errorf("SendTransaction error:%s", err) } return txHash, nil }
标签:engine ext color The ready flags exec tip slot
原文地址:https://www.cnblogs.com/yunlion/p/9379165.html