[关闭]
@myecho 2018-05-12T16:11:18.000000Z 字数 5623 阅读 1160

net/rpc 自定义codec

Golang


发送请求

发送rpc请求时,先是call

func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
    // Call里面调用了client.Go,然后返回一个chan,之后阻塞等待,这是基本的同步调用
    call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done   //堵塞调用等待最后的返回通知
    return call.Error
}
    func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
    // 构建call对象
    call := new(Call)                                                           
    call.ServiceMethod = serviceMethod                                          
    call.Args = args                                                            
    call.Reply = reply   
    // 如果非外部传入call,自己构建                                                       
    if done == nil {                                                            
        done = make(chan *Call, 10) // buffered.                                
    } else {                                                                    
        // If caller passes done != nil, it must arrange that                   
        // done has enough buffer for the number of simultaneous                
        // RPCs that will be using that channel.  If the channel                
        // is totally unbuffered, it's best not to run at all.                  
        if cap(done) == 0 {                                                     
            log.Panic("rpc: done channel is unbuffered")                        
        }                                                                       
    }                                                                           
    call.Done = done
    // 发送请求                                                            
    client.send(call)                                                           
    return call                                                                 
} 

然后到send函数

func (client *Client) send(call *Call) {
    //
    // 入口处就来了一把锁,这把锁显然是为了保证一个请求能够被原子的写入socket中. 毕竟一个
    // rpc连接是会被多个goroutine并发写入的,因此这里需要保证发送请求的原子性。
    // 
    client.sending.Lock()
    defer client.sending.Unlock()
    // 
    // 这里又来了一把锁,这把锁的目标是客户端的pending等待队列,也就是将每个rpc请求放入等待队列
    // 的时候,需要对这个pending队列做并发写保护。
    //
    // Register this call.
    client.mutex.Lock()
    if client.shutdown || client.closing {
        call.Error = ErrShutdown
        client.mutex.Unlock()
        call.done()
        return
    }
    // 
    // pending队列其实是使用map实现的,这里可以看到每个rpc请求都会生存一个唯一递增的seq, 这个
    // seq就是用来标记请求的,这个很像tcp包的seq。
    seq := client.seq
    client.seq++
    client.pending[seq] = call
    client.mutex.Unlock()
    // 
    // 下面就是将一个rpc请求的所有数据 (请求方法名、seq、请求参数等)进行序列化打包,然后发送
    // 出去. 这里主要采用的是Go标准库自带的gob算法进行请求的序列化。
    //
    // Encode and send the request.
    client.request.Seq = seq
    client.request.ServiceMethod = call.ServiceMethod
    err := client.codec.WriteRequest(&client.request, call.Args)
    if err != nil {
        client.mutex.Lock()
        call = client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()
        if call != nil {
            call.Error = err
            call.done()
        }
    }
}

之后就是利用codec的WriteRequest方法将call时传入的参数进行编码再conn.flush即可完成请求的原子传送过程。

接受应答

前面提到在创建一个rpc客户端对象的时候,同时会启动一个input goroutine来等待服务器的响应。

func (client *Client) input() {
    var err error
    var response Response
    //
    // 这里的for循环就是永久负责这个连接的响应读取,只有在连接上发生错误后,才会退出。
    // 
    for err == nil {
        //
        // 首先是读响应头, 响应头一般有一个很重要的信息就是正文数据长度,有了这个长度信息,才知道
        // 读多少正文才是一个应答完毕。
        //
        response = Response{}
        //在这里我认为读取多少个字节是用户自定义的,读取头部字节的长度
        err = client.codec.ReadResponseHeader(&response)
        if err != nil {
            break
        }
        // 
        // 这里是一个很重要的步骤,从响应头里取到seq,这个seq就是客户端生成的seq,在上文的send
        // 过程中发送给了服务器,服务器应答的时候,必须将这个seq响应给客户端。只有这样客户端才
        // 知道这个应答是对应pending队列中的那个请求的。
        // 
        // 这里对pending队列枷锁保护,通过seq提取对应的请求call对象。
        //
        seq := response.Seq
        client.mutex.Lock()
        call := client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()
        //
        // 这个switch里主要就是处理异常以及正常情况读取响应正文了。异常情况,英文解释很详细了。
        // 
        switch {
        case call == nil:
            // We've got no pending call. That usually means that
            // WriteRequest partially failed, and call was already
            // removed; response is a server telling us about an
            // error reading request body. We should still attempt
            // to read error body, but there's no one to give it to.
            err = client.codec.ReadResponseBody(nil)
            if err != nil {
                err = errors.New("reading error body: " + err.Error())
            }
        case response.Error != "":
            // We've got an error response. Give this to the request;
            // any subsequent requests will get the ReadResponseBody
            // error if there is one.
            call.Error = ServerError(response.Error)
            err = client.codec.ReadResponseBody(nil)
            if err != nil {
                err = errors.New("reading error body: " + err.Error())
            }
            call.done()
        default:
            //
            // 开始读取响应的正文,正文放到call中的Reply中去。
            // 
            err = client.codec.ReadResponseBody(call.Reply)
            if err != nil {
                call.Error = errors.New("reading body " + err.Error())
            }
            call.done()
        }
    }
    //
    // 下面部分的代码都是在处理连接上出错,以及服务端关闭连接等情况的清理工作. 这部分很重要,
    //  否则可能导致一些调用rpc的goroutine永久阻塞等待,不能恢复工作。
    // 
    // Terminate pending calls.
    client.sending.Lock()
    client.mutex.Lock()
    client.shutdown = true
    closing := client.closing
    if err == io.EOF {
        if closing {
            err = ErrShutdown
        } else {
            err = io.ErrUnexpectedEOF
        }
    }
    for _, call := range client.pending {
        call.Error = err
        call.done()
    }
    client.mutex.Unlock()
    client.sending.Unlock()
    if err != io.EOF && !closing {
        log.Println("rpc: client protocol error:", err)
    }
}

// 把call对象传递给调用者,主要是获取内部的Error
func (call *Call) done() {                                                      
    select {                                                                    
    case call.Done <- call:                                                     
        // ok                                                                   
    default:                                                                    
        // We don't want to block here.  It is the caller's responsibility to make
        // sure the channel has enough buffer space. See comment in Go().       
        if debugLog {                                                           
            log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
        }                                                                       
    }                                                                           
}

编写codec

先看下默认的gob编码解码是如何使用的。

func NewClient(conn io.ReadWriteCloser) *Client {
    encBuf := bufio.NewWriter(conn)
    client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
    return NewClientWithCodec(client)
}

而io.ReadWriteCloser可以从net.Dial调用中获得,那这里设置长连接线程池之类的信息?

因此自定义的codec通过如下方式使用

//RPC Communication (client side)
conn, err = net.Dial("tcp", "localhost:5555")
rpcCodec := codec.GoRpc.ClientCodec(conn, h) // OR codec.MsgpackSpecRpc...
client := rpc.NewClientWithCodec(rpcCodec)

其中New...方法如下

func NewClientWithCodec(codec ClientCodec) *Client {
        client := &Client{
            codec:   codec,
            pending: make(map[uint64]*Call),
        }
        go client.input()
        return client
    }

而官方要求的codec要求实现如下的接口定义:

// A ClientCodec implements writing of RPC requests and
// reading of RPC responses for the client side of an RPC session.
// The client calls WriteRequest to write a request to the connection
// and calls ReadResponseHeader and ReadResponseBody in pairs
// to read responses. The client calls Close when finished with the
// connection. ReadResponseBody may be called with a nil
// argument to force the body of the response to be read and then
// discarded.
type ClientCodec interface {
    // WriteRequest must be safe for concurrent use by multiple goroutines.
    WriteRequest(*Request, interface{}) error
    ReadResponseHeader(*Response) error
    ReadResponseBody(interface{}) error
    Close() error
}

可能存在的问题

  1. go net/rpc框架单协程读取某个provider的返回的请求,(发送的时候是原子发的/也是一个个发,也是一个个的读返回的请求)这个时候返回的过程中还需要加锁和处理解码,性能到底够不够?---感觉是不太够,可是如何处理呢?每个协程单独处理自己的?--需要修改源码,建议先跑出版本1.0再说
  2. 解析过程中的锁如何优化掉?---如果能像httpServer一样加上tcp连接池的话,那么锁其实就没有必要了,没有全局共享的东西需要加锁了。如果需要修改的话,可能需要参考下net/http是如何处理连接的?

参考资料:
1. http://ju.outofmemory.cn/entry/71724
2. https://www.ctolib.com/topics-7710.html
3. https://github.com/golang/go/blob/master/src/net/rpc/client.go

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注