@myecho
2018-05-12T16:11:18.000000Z
字数 5623
阅读 1179
Golang
发送rpc请求时,先是call
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
// Call里面调用了client.Go,然后返回一个chan,之后阻塞等待,这是基本的同步调用
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done //堵塞调用等待最后的返回通知
return call.Error
}
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
// 构建call对象
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
// 如果非外部传入call,自己构建
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
// 发送请求
client.send(call)
return call
}
然后到send函数
func (client *Client) send(call *Call) {
//
// 入口处就来了一把锁,这把锁显然是为了保证一个请求能够被原子的写入socket中. 毕竟一个
// rpc连接是会被多个goroutine并发写入的,因此这里需要保证发送请求的原子性。
//
client.sending.Lock()
defer client.sending.Unlock()
//
// 这里又来了一把锁,这把锁的目标是客户端的pending等待队列,也就是将每个rpc请求放入等待队列
// 的时候,需要对这个pending队列做并发写保护。
//
// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
call.Error = ErrShutdown
client.mutex.Unlock()
call.done()
return
}
//
// pending队列其实是使用map实现的,这里可以看到每个rpc请求都会生存一个唯一递增的seq, 这个
// seq就是用来标记请求的,这个很像tcp包的seq。
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
//
// 下面就是将一个rpc请求的所有数据 (请求方法名、seq、请求参数等)进行序列化打包,然后发送
// 出去. 这里主要采用的是Go标准库自带的gob算法进行请求的序列化。
//
// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
之后就是利用codec的WriteRequest方法将call时传入的参数进行编码再conn.flush即可完成请求的原子传送过程。
前面提到在创建一个rpc客户端对象的时候,同时会启动一个input goroutine来等待服务器的响应。
func (client *Client) input() {
var err error
var response Response
//
// 这里的for循环就是永久负责这个连接的响应读取,只有在连接上发生错误后,才会退出。
//
for err == nil {
//
// 首先是读响应头, 响应头一般有一个很重要的信息就是正文数据长度,有了这个长度信息,才知道
// 读多少正文才是一个应答完毕。
//
response = Response{}
//在这里我认为读取多少个字节是用户自定义的,读取头部字节的长度
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
//
// 这里是一个很重要的步骤,从响应头里取到seq,这个seq就是客户端生成的seq,在上文的send
// 过程中发送给了服务器,服务器应答的时候,必须将这个seq响应给客户端。只有这样客户端才
// 知道这个应答是对应pending队列中的那个请求的。
//
// 这里对pending队列枷锁保护,通过seq提取对应的请求call对象。
//
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
//
// 这个switch里主要就是处理异常以及正常情况读取响应正文了。异常情况,英文解释很详细了。
//
switch {
case call == nil:
// We've got no pending call. That usually means that
// WriteRequest partially failed, and call was already
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
//
// 开始读取响应的正文,正文放到call中的Reply中去。
//
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
//
// 下面部分的代码都是在处理连接上出错,以及服务端关闭连接等情况的清理工作. 这部分很重要,
// 否则可能导致一些调用rpc的goroutine永久阻塞等待,不能恢复工作。
//
// Terminate pending calls.
client.sending.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.sending.Unlock()
if err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}
// 把call对象传递给调用者,主要是获取内部的Error
func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
if debugLog {
log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
}
}
}
先看下默认的gob编码解码是如何使用的。
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}
而io.ReadWriteCloser可以从net.Dial调用中获得,那这里设置长连接线程池之类的信息?
因此自定义的codec通过如下方式使用
//RPC Communication (client side)
conn, err = net.Dial("tcp", "localhost:5555")
rpcCodec := codec.GoRpc.ClientCodec(conn, h) // OR codec.MsgpackSpecRpc...
client := rpc.NewClientWithCodec(rpcCodec)
其中New...方法如下
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
而官方要求的codec要求实现如下的接口定义:
// A ClientCodec implements writing of RPC requests and
// reading of RPC responses for the client side of an RPC session.
// The client calls WriteRequest to write a request to the connection
// and calls ReadResponseHeader and ReadResponseBody in pairs
// to read responses. The client calls Close when finished with the
// connection. ReadResponseBody may be called with a nil
// argument to force the body of the response to be read and then
// discarded.
type ClientCodec interface {
// WriteRequest must be safe for concurrent use by multiple goroutines.
WriteRequest(*Request, interface{}) error
ReadResponseHeader(*Response) error
ReadResponseBody(interface{}) error
Close() error
}
参考资料:
1. http://ju.outofmemory.cn/entry/71724
2. https://www.ctolib.com/topics-7710.html
3. https://github.com/golang/go/blob/master/src/net/rpc/client.go