@adamhand
2019-03-28T13:54:22.000000Z
字数 11035
阅读 1763
Tendermint是一个开源的完整的区块链实现,可以用于公链或联盟链,其官方定位是面向开发者的区块链共识引擎。尽管tendermint包含了区块链的完整实现,但它却是以SDK的形式将这些核心功能提供出来,供开发者方便地定制自己的专有区块链。
Tendermint 包含了两个主要的技术组件
Tendermint Core通过一个满足 ABCI 标准的 socket 协议与应用进行交流。此外,如果需要使用Tendermint开发区块链系统,还要实现client部分。所以,一个完整的基于tendermint的区块链系统包括三个部分:client部分,ABCI部分和tendermint core部分。如下图所示:
tendermint采用的共识机制属于一种权益证明( Proof Of Stake)算法,一组验证人(Validator)代替了矿工(Miner)的角色,依据抵押的权益比例轮流出块。
tendermint同时是拜占庭容错的(Byzantine Fault Tolerance),因此对于3f+1个验证节点组成的区块链,即使有f个节点出现拜占庭错误,也可以保证全局正确共识的达成。
下图是tendermint的状态机。
“验证人”(validator)轮流对交易区块进行提议,并对这些区块进行投票。区块会被提交到链上,每一个块占据一个“高度”(height)。
如果提交失败,协议就会开始下一轮的提交,并且一个新的验证人会继续提交那个高度的区块。
ABCI 包含了 3 个主要的消息类型,它们由tendermint core 发送至应用,应用会对消息产生相应的回复。
一个应用可能有多个 ABCI socket 连接。Tendermint Core 给应用创建了三个 ABCI 连接:
KVStore是tendermint提供的一个例子,用来说明abci-cli client(模拟tendermint core)和abci应用服务端之间通过socket进行通信的过程。
程序入口在tendermint\abci\cmd\abci-cli\main.go中的Execute()方法。这个方法主要做了三件事:
注册全局 Flags,主要包括:
程序如下所示:
func addGlobalFlags() {RootCmd.PersistentFlags().StringVarP(&flagAddress, "address", "", "tcp://0.0.0.0:26658", "address of application socket")RootCmd.PersistentFlags().StringVarP(&flagAbci, "abci", "", "socket", "either socket or grpc")RootCmd.PersistentFlags().BoolVarP(&flagVerbose, "verbose", "v", false, "print the command and results as if it were a console session")RootCmd.PersistentFlags().StringVarP(&flagLogLevel, "log_level", "", "debug", "set the logger level")}
添加子命令到RootCmd命令,包括kvstore 和 dummy 以及 echoCmd、infoCmd、deliverTxCmd 和 commitCmd 等客户端命令。这些命令会在后面被触发执行。
func addCommands() {RootCmd.AddCommand(batchCmd)RootCmd.AddCommand(consoleCmd)RootCmd.AddCommand(echoCmd)RootCmd.AddCommand(infoCmd)RootCmd.AddCommand(setOptionCmd)RootCmd.AddCommand(deliverTxCmd)RootCmd.AddCommand(checkTxCmd)RootCmd.AddCommand(commitCmd)RootCmd.AddCommand(versionCmd)RootCmd.AddCommand(testCmd)addQueryFlags()RootCmd.AddCommand(queryCmd)// examplesaddCounterFlags()RootCmd.AddCommand(counterCmd)addKVStoreFlags()RootCmd.AddCommand(kvstoreCmd)}
比如KVStore命令如下:
var kvstoreCmd = &cobra.Command{Use: "kvstore",Short: "ABCI demo example",Long: "ABCI demo example",Args: cobra.ExactArgs(0),RunE: func(cmd *cobra.Command, args []string) error {return cmdKVStore(cmd, args)},}
当运行abci-cli kdstore的时候,就会执行这个命令,触发cmdKVStore(cmd, args)函数。这个函数具体会在后面建立abci服务端的时候分析。
这个函数的作用是执行上面添加的命令,但是在执行之前首先会执行RootCmd中的PersistentPreRunE函数。从名字可以看出,这个函数是PreRun的,也就是要先执行。这个函数中比较重要的操作如下:
if client == nil {var err errorclient, err = abcicli.NewClient(flagAddress, flagAbci, false)if err != nil {return err}client.SetLogger(logger.With("module", "abci-client"))if err := client.Start(); err != nil {return err}}
这段代码的意思是通过NewClient()函数创建abci-client。NewClient()函数有两个重要参数:
NewClient()函数逻辑如下:
func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {switch transport {case "socket":client = NewSocketClient(addr, mustConnect)case "grpc":client = NewGRPCClient(addr, mustConnect)default:err = fmt.Errorf("Unknown abci transport %s", transport)}return}
因为选择的是socket,所以会进入NewSocketClient(addr, mustConnect)函数,该函数的逻辑如下:
func NewSocketClient(addr string, mustConnect bool) *socketClient {cli := &socketClient{reqQueue: make(chan *ReqRes, reqQueueSize),flushTimer: cmn.NewThrottleTimer("socketClient", flushThrottleMS),mustConnect: mustConnect,addr: addr,reqSent: list.New(),resCb: nil,}cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)return cli}
函数首先创建一个socketClient的结构cli,然后将其作为参数传递给cmn.NewBaseService。先看一下socketClient。
type socketClient struct {cmn.BaseServicereqQueue chan *ReqResflushTimer *cmn.ThrottleTimermustConnect boolmtx sync.Mutexaddr stringconn net.Connerr errorreqSent *list.ListresCb func(*types.Request, *types.Response) // listens to all callbacks}
这个结构实现了abci/client/Client的所有方法,所以它实现了abci/client/Client接口。同时它还有一个匿名字段cmn.BaseService,BaseService实现了Service接口,所以socketClient也间接实现了 libs/common/service/Service 接口。能够使用其中的Start()方法。
func (bs *BaseService) Start() error {if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {if atomic.LoadUint32(&bs.stopped) == 1 {bs.Logger.Error(fmt.Sprintf("Not starting %v -- already stopped", bs.name), "impl", bs.impl)// revert flagatomic.StoreUint32(&bs.started, 0)return ErrAlreadyStopped}bs.Logger.Info(fmt.Sprintf("Starting %v", bs.name), "impl", bs.impl)err := bs.impl.OnStart()if err != nil {// revert flagatomic.StoreUint32(&bs.started, 0)return err}return nil}bs.Logger.Debug(fmt.Sprintf("Not starting %v -- already started", bs.name), "impl", bs.impl)return ErrAlreadyStarted}
而Start()方法会调用BaseService中的OnStart()方法,这个方法中主要做的就是连接abci服务端并启动两个协程:go cli.sendRequestsRoutine(conn)、go cli.recvResponseRoutine(conn):
func (cli *socketClient) OnStart() error {if err := cli.BaseService.OnStart(); err != nil {return err}var err errorvar conn net.ConnRETRY_LOOP:for {conn, err = cmn.Connect(cli.addr)if err != nil {if cli.mustConnect {return err}cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr), "err", err)time.Sleep(time.Second * dialRetryIntervalSeconds)continue RETRY_LOOP}cli.conn = conngo cli.sendRequestsRoutine(conn)go cli.recvResponseRoutine(conn)return nil}}
sendRequestsRoutine(conn)用来向cli服务器发送请求,recvResponseRoutine()用来处理响应结果。sendRequestsRoutine()逻辑如下,在这个函数中reqres会等待cli.reqQueue信道传来消息,之后会先写入缓冲区中,然后在发送给cliserver。
func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {w := bufio.NewWriter(conn)for {select {case <-cli.flushTimer.Ch:select {case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):default:// Probably will fill the buffer, or retry later.}case <-cli.Quit():returncase reqres := <-cli.reqQueue:cli.willSendReq(reqres)err := types.WriteMessage(reqres.Request, w)if err != nil {cli.StopForError(fmt.Errorf("Error writing msg: %v", err))return}// cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {err = w.Flush()if err != nil {cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))return}}}}}
recvResponseRoutine处理应答的逻辑如下。先从连接中读取应答,出错的话就关闭连接,否则就调用didRecvResponse()处理应答。
func (cli *socketClient) recvResponseRoutine(conn net.Conn) {r := bufio.NewReader(conn) // Buffer readsfor {var res = &types.Response{}err := types.ReadMessage(r, res)if err != nil {cli.StopForError(err)return}switch r := res.Value.(type) {case *types.Response_Exception:// XXX After setting cli.err, release waiters (e.g. reqres.Done())cli.StopForError(errors.New(r.Exception.Error))returndefault:// cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)err := cli.didRecvResponse(res)if err != nil {cli.StopForError(err)return}}}}
func (cli *socketClient) didRecvResponse(res *types.Response) error {cli.mtx.Lock()defer cli.mtx.Unlock()// Get the first ReqResnext := cli.reqSent.Front()if next == nil {return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))}reqres := next.Value.(*ReqRes)if !resMatchesReq(reqres.Request, res) {return fmt.Errorf("Unexpected result type %v when response to %v expected",reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))}reqres.Response = res // Set responsereqres.Done() // Release waiterscli.reqSent.Remove(next) // Pop first item from linked list// Notify reqRes listener if setif cb := reqres.GetCallback(); cb != nil {cb(res)}// Notify client listener if setif cli.resCb != nil {cli.resCb(reqres.Request, res)}return nil}
这样abci-client就启动了。
前面说了,当运行abci-cli kdstore的时候,会触发cmdKVStore(cmd, args)函数,这个函数的逻辑如下:
func cmdKVStore(cmd *cobra.Command, args []string) error {logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))// Create the application - in memory or persisted to diskvar app types.Applicationif flagPersist == "" {app = kvstore.NewKVStoreApplication()} else {app = kvstore.NewPersistentKVStoreApplication(flagPersist)app.(*kvstore.PersistentKVStoreApplication).SetLogger(logger.With("module", "kvstore"))}// Start the listenersrv, err := server.NewServer(flagAddress, flagAbci, app)if err != nil {return err}srv.SetLogger(logger.With("module", "abci-server"))if err := srv.Start(); err != nil {return err}// Wait forevercmn.TrapSignal(func() {// Cleanupsrv.Stop()})return nil}
这个函数主要做了两件事:
NewKVStoreApplication()逻辑如下,主要是从内存存储 MemDB 结构中获取对应状态,app会以参数的形式传递给NewServer函数用来创建Server。
func NewKVStoreApplication() *KVStoreApplication {state := loadState(dbm.NewMemDB())return &KVStoreApplication{state: state}}
NewServer()主要调用了NewSocketServer(),函数的逻辑如下:
func NewSocketServer(protoAddr string, app types.Application) cmn.Service {proto, addr := cmn.ProtocolAndAddress(protoAddr)s := &SocketServer{proto: proto,addr: addr,listener: nil,app: app,conns: make(map[int]net.Conn),}s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)return s}
关键点在最后一行:s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)。BaseService是SocketServer中的一个匿名字段,BaseServcie接口又实现了Service接口,可以调用接口的Start()方法,而Start()方法又会调用OnStart()方法,如下:
func (s *SocketServer) OnStart() error {if err := s.BaseService.OnStart(); err != nil {return err}ln, err := net.Listen(s.proto, s.addr)if err != nil {return err}s.listener = lngo s.acceptConnectionsRoutine()return nil}
关键地方在acceptConnectionsRoutine()这个协程,这个协程的作用是在连接中读取请求并向连接中写入应答,逻辑如下:
func (s *SocketServer) acceptConnectionsRoutine() {for {// Accept a connections.Logger.Info("Waiting for new connection...")conn, err := s.listener.Accept()if err != nil {if !s.IsRunning() {return // Ignore error from listener closing.}s.Logger.Error("Failed to accept connection: " + err.Error())continue}s.Logger.Info("Accepted a new connection")connID := s.addConn(conn)closeConn := make(chan error, 2) // Push to signal connection closedresponses := make(chan *types.Response, 1000) // A channel to buffer responses// Read requests from conn and deal with themgo s.handleRequests(closeConn, conn, responses)// Pull responses from 'responses' and write them to conn.go s.handleResponses(closeConn, conn, responses)// Wait until signal to close connectiongo s.waitForClose(closeConn, connID)}}
KVStore服务端启动以及和abci-cli交互的过程。