@adamhand
2019-03-28T21:54:22.000000Z
字数 11035
阅读 1455
Tendermint是一个开源的完整的区块链实现,可以用于公链或联盟链,其官方定位是面向开发者的区块链共识引擎。尽管tendermint包含了区块链的完整实现,但它却是以SDK的形式将这些核心功能提供出来,供开发者方便地定制自己的专有区块链。
Tendermint 包含了两个主要的技术组件
Tendermint Core通过一个满足 ABCI 标准的 socket 协议与应用进行交流。此外,如果需要使用Tendermint开发区块链系统,还要实现client部分。所以,一个完整的基于tendermint的区块链系统包括三个部分:client部分,ABCI部分和tendermint core部分。如下图所示:
tendermint采用的共识机制属于一种权益证明( Proof Of Stake)算法,一组验证人(Validator)代替了矿工(Miner)的角色,依据抵押的权益比例轮流出块。
tendermint同时是拜占庭容错的(Byzantine Fault Tolerance),因此对于3f+1个验证节点组成的区块链,即使有f个节点出现拜占庭错误,也可以保证全局正确共识的达成。
下图是tendermint的状态机。
“验证人”(validator)轮流对交易区块进行提议,并对这些区块进行投票。区块会被提交到链上,每一个块占据一个“高度”(height)。
如果提交失败,协议就会开始下一轮的提交,并且一个新的验证人会继续提交那个高度的区块。
ABCI 包含了 3 个主要的消息类型,它们由tendermint core 发送至应用,应用会对消息产生相应的回复。
一个应用可能有多个 ABCI socket 连接。Tendermint Core 给应用创建了三个 ABCI 连接:
KVStore是tendermint提供的一个例子,用来说明abci-cli client(模拟tendermint core)和abci应用服务端之间通过socket进行通信的过程。
程序入口在tendermint\abci\cmd\abci-cli\main.go中的Execute()方法。这个方法主要做了三件事:
注册全局 Flags,主要包括:
程序如下所示:
func addGlobalFlags() {
RootCmd.PersistentFlags().StringVarP(&flagAddress, "address", "", "tcp://0.0.0.0:26658", "address of application socket")
RootCmd.PersistentFlags().StringVarP(&flagAbci, "abci", "", "socket", "either socket or grpc")
RootCmd.PersistentFlags().BoolVarP(&flagVerbose, "verbose", "v", false, "print the command and results as if it were a console session")
RootCmd.PersistentFlags().StringVarP(&flagLogLevel, "log_level", "", "debug", "set the logger level")
}
添加子命令到RootCmd命令,包括kvstore 和 dummy 以及 echoCmd、infoCmd、deliverTxCmd 和 commitCmd 等客户端命令。这些命令会在后面被触发执行。
func addCommands() {
RootCmd.AddCommand(batchCmd)
RootCmd.AddCommand(consoleCmd)
RootCmd.AddCommand(echoCmd)
RootCmd.AddCommand(infoCmd)
RootCmd.AddCommand(setOptionCmd)
RootCmd.AddCommand(deliverTxCmd)
RootCmd.AddCommand(checkTxCmd)
RootCmd.AddCommand(commitCmd)
RootCmd.AddCommand(versionCmd)
RootCmd.AddCommand(testCmd)
addQueryFlags()
RootCmd.AddCommand(queryCmd)
// examples
addCounterFlags()
RootCmd.AddCommand(counterCmd)
addKVStoreFlags()
RootCmd.AddCommand(kvstoreCmd)
}
比如KVStore命令如下:
var kvstoreCmd = &cobra.Command{
Use: "kvstore",
Short: "ABCI demo example",
Long: "ABCI demo example",
Args: cobra.ExactArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
return cmdKVStore(cmd, args)
},
}
当运行abci-cli kdstore
的时候,就会执行这个命令,触发cmdKVStore(cmd, args)
函数。这个函数具体会在后面建立abci服务端的时候分析。
这个函数的作用是执行上面添加的命令,但是在执行之前首先会执行RootCmd中的PersistentPreRunE函数。从名字可以看出,这个函数是PreRun
的,也就是要先执行。这个函数中比较重要的操作如下:
if client == nil {
var err error
client, err = abcicli.NewClient(flagAddress, flagAbci, false)
if err != nil {
return err
}
client.SetLogger(logger.With("module", "abci-client"))
if err := client.Start(); err != nil {
return err
}
}
这段代码的意思是通过NewClient()函数创建abci-client。NewClient()函数有两个重要参数:
NewClient()函数逻辑如下:
func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
switch transport {
case "socket":
client = NewSocketClient(addr, mustConnect)
case "grpc":
client = NewGRPCClient(addr, mustConnect)
default:
err = fmt.Errorf("Unknown abci transport %s", transport)
}
return
}
因为选择的是socket,所以会进入NewSocketClient(addr, mustConnect)函数,该函数的逻辑如下:
func NewSocketClient(addr string, mustConnect bool) *socketClient {
cli := &socketClient{
reqQueue: make(chan *ReqRes, reqQueueSize),
flushTimer: cmn.NewThrottleTimer("socketClient", flushThrottleMS),
mustConnect: mustConnect,
addr: addr,
reqSent: list.New(),
resCb: nil,
}
cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)
return cli
}
函数首先创建一个socketClient的结构cli,然后将其作为参数传递给cmn.NewBaseService。先看一下socketClient。
type socketClient struct {
cmn.BaseService
reqQueue chan *ReqRes
flushTimer *cmn.ThrottleTimer
mustConnect bool
mtx sync.Mutex
addr string
conn net.Conn
err error
reqSent *list.List
resCb func(*types.Request, *types.Response) // listens to all callbacks
}
这个结构实现了abci/client/Client的所有方法,所以它实现了abci/client/Client接口。同时它还有一个匿名字段cmn.BaseService,BaseService实现了Service接口,所以socketClient也间接实现了 libs/common/service/Service 接口。能够使用其中的Start()方法。
func (bs *BaseService) Start() error {
if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
if atomic.LoadUint32(&bs.stopped) == 1 {
bs.Logger.Error(fmt.Sprintf("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
// revert flag
atomic.StoreUint32(&bs.started, 0)
return ErrAlreadyStopped
}
bs.Logger.Info(fmt.Sprintf("Starting %v", bs.name), "impl", bs.impl)
err := bs.impl.OnStart()
if err != nil {
// revert flag
atomic.StoreUint32(&bs.started, 0)
return err
}
return nil
}
bs.Logger.Debug(fmt.Sprintf("Not starting %v -- already started", bs.name), "impl", bs.impl)
return ErrAlreadyStarted
}
而Start()方法会调用BaseService中的OnStart()方法,这个方法中主要做的就是连接abci服务端并启动两个协程:go cli.sendRequestsRoutine(conn)、go cli.recvResponseRoutine(conn):
func (cli *socketClient) OnStart() error {
if err := cli.BaseService.OnStart(); err != nil {
return err
}
var err error
var conn net.Conn
RETRY_LOOP:
for {
conn, err = cmn.Connect(cli.addr)
if err != nil {
if cli.mustConnect {
return err
}
cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr), "err", err)
time.Sleep(time.Second * dialRetryIntervalSeconds)
continue RETRY_LOOP
}
cli.conn = conn
go cli.sendRequestsRoutine(conn)
go cli.recvResponseRoutine(conn)
return nil
}
}
sendRequestsRoutine(conn)用来向cli服务器发送请求,recvResponseRoutine()用来处理响应结果。sendRequestsRoutine()逻辑如下,在这个函数中reqres会等待cli.reqQueue信道传来消息,之后会先写入缓冲区中,然后在发送给cliserver。
func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
w := bufio.NewWriter(conn)
for {
select {
case <-cli.flushTimer.Ch:
select {
case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
default:
// Probably will fill the buffer, or retry later.
}
case <-cli.Quit():
return
case reqres := <-cli.reqQueue:
cli.willSendReq(reqres)
err := types.WriteMessage(reqres.Request, w)
if err != nil {
cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
return
}
// cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
err = w.Flush()
if err != nil {
cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
return
}
}
}
}
}
recvResponseRoutine处理应答的逻辑如下。先从连接中读取应答,出错的话就关闭连接,否则就调用didRecvResponse()处理应答。
func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
r := bufio.NewReader(conn) // Buffer reads
for {
var res = &types.Response{}
err := types.ReadMessage(r, res)
if err != nil {
cli.StopForError(err)
return
}
switch r := res.Value.(type) {
case *types.Response_Exception:
// XXX After setting cli.err, release waiters (e.g. reqres.Done())
cli.StopForError(errors.New(r.Exception.Error))
return
default:
// cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
err := cli.didRecvResponse(res)
if err != nil {
cli.StopForError(err)
return
}
}
}
}
func (cli *socketClient) didRecvResponse(res *types.Response) error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
// Get the first ReqRes
next := cli.reqSent.Front()
if next == nil {
return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
}
reqres := next.Value.(*ReqRes)
if !resMatchesReq(reqres.Request, res) {
return fmt.Errorf("Unexpected result type %v when response to %v expected",
reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
}
reqres.Response = res // Set response
reqres.Done() // Release waiters
cli.reqSent.Remove(next) // Pop first item from linked list
// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(res)
}
// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}
return nil
}
这样abci-client就启动了。
前面说了,当运行abci-cli kdstore的时候,会触发cmdKVStore(cmd, args)函数,这个函数的逻辑如下:
func cmdKVStore(cmd *cobra.Command, args []string) error {
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
// Create the application - in memory or persisted to disk
var app types.Application
if flagPersist == "" {
app = kvstore.NewKVStoreApplication()
} else {
app = kvstore.NewPersistentKVStoreApplication(flagPersist)
app.(*kvstore.PersistentKVStoreApplication).SetLogger(logger.With("module", "kvstore"))
}
// Start the listener
srv, err := server.NewServer(flagAddress, flagAbci, app)
if err != nil {
return err
}
srv.SetLogger(logger.With("module", "abci-server"))
if err := srv.Start(); err != nil {
return err
}
// Wait forever
cmn.TrapSignal(func() {
// Cleanup
srv.Stop()
})
return nil
}
这个函数主要做了两件事:
NewKVStoreApplication()逻辑如下,主要是从内存存储 MemDB 结构中获取对应状态,app会以参数的形式传递给NewServer函数用来创建Server。
func NewKVStoreApplication() *KVStoreApplication {
state := loadState(dbm.NewMemDB())
return &KVStoreApplication{state: state}
}
NewServer()主要调用了NewSocketServer(),函数的逻辑如下:
func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
proto, addr := cmn.ProtocolAndAddress(protoAddr)
s := &SocketServer{
proto: proto,
addr: addr,
listener: nil,
app: app,
conns: make(map[int]net.Conn),
}
s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
return s
}
关键点在最后一行:s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)。BaseService是SocketServer中的一个匿名字段,BaseServcie接口又实现了Service接口,可以调用接口的Start()方法,而Start()方法又会调用OnStart()方法,如下:
func (s *SocketServer) OnStart() error {
if err := s.BaseService.OnStart(); err != nil {
return err
}
ln, err := net.Listen(s.proto, s.addr)
if err != nil {
return err
}
s.listener = ln
go s.acceptConnectionsRoutine()
return nil
}
关键地方在acceptConnectionsRoutine()这个协程,这个协程的作用是在连接中读取请求并向连接中写入应答,逻辑如下:
func (s *SocketServer) acceptConnectionsRoutine() {
for {
// Accept a connection
s.Logger.Info("Waiting for new connection...")
conn, err := s.listener.Accept()
if err != nil {
if !s.IsRunning() {
return // Ignore error from listener closing.
}
s.Logger.Error("Failed to accept connection: " + err.Error())
continue
}
s.Logger.Info("Accepted a new connection")
connID := s.addConn(conn)
closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan *types.Response, 1000) // A channel to buffer responses
// Read requests from conn and deal with them
go s.handleRequests(closeConn, conn, responses)
// Pull responses from 'responses' and write them to conn.
go s.handleResponses(closeConn, conn, responses)
// Wait until signal to close connection
go s.waitForClose(closeConn, connID)
}
}
KVStore服务端启动以及和abci-cli交互的过程。