举个栗子之gorpc - 消息的编码和解码

2022年的第一个rpc,比以往来的更早一些...

留杭过年...写点东西

初始化项目gorpc

借助go module我们可以轻易创建一个新的项目

mkdir gorpc
go mod init github.com/taadis/gorpc

// output:
go: creating new go.mod: module github.com/taadis/gorpc

消息约定

rpc 的客户端和服务端之间通信需要传输数据消息,典型的消息结构一般有2部分组成

  • header 消息头 - 用来承载约定内容,一般是相对固定的。
  • body 消息体 - 用来承载用户数据,通常是不定长,也就是动态的

这里我们先定义更泛的消息体,因为消息体是动态的,所以我们可以直接用go中的 interface{} 来定义,就不用特别声明一个结构体了。

然后我们来定义一个header结构体

// codec.go

type Header struct {
    Sequence      uint64 // sequence number chosen by client
    ServiceMethod string // format "Service.Method"
    Error         error
}

Sequence 序列号是客户端带过来的,每个请求总要有点不一样的地方,方便服务端根据序列号来区分不同的调用,可以理解是唯一ID。

ServiceMethod 是要远程调用的服务下的方法名词,这里对照为go语言中结构体的方法名,比如用户创建方法"User.Create"。

Error 是错误信息,用来放置一端发生的错误,以便另一种接收到消息时能根据错误进行处理,而不是直接丢失响应。

消息的编码解码

rpc 的客户端和服务端之间通信的消息有其特有的格式,因此都需要涉及编码和解码这一关键步骤,也就是我们熟称的序列化和反序列化。

以便抽象理解,我们定义一个统一的Codec接口

// codec.go

type Codec interface {
    ReadHeader(*Header) error
    ReadBody(interface{}) error
    Write(*Header, interface{}) error
}

ReadHeader 读取信息头,如果有错误返回错误。

ReadBody 读取消息体,数据是动态的,所以使用interface{}作为参数,如果有错误返回错误。

Write 消息接收处理完成后,我们需要把结果告知给客户端,需要一个写入的操作。

这里我们借助标准库内置的encoding/gob来提高工作效率。

当然你也可以用encoding/json,encoding/xml或者其他编解码包,这里选择encoding/gob,仅仅是因为这是go所特有的。JUST GO。

接下来我们基于encoding/gob实现一个gobCodec

rpc 请求是一种网络请求,本质还是I/O,所以我们可以用io.ReadWriteCloser来定义网络链接conn.

通过gob.Decoder解码请求中的数据流至对应的结构体参数,

完成服务端调用之后,把返回结果再用gob.Encoder编码至数据流中,

最后通过bufio.Writer写入数据完成响应。

// codec.go

type gobCodec struct {
    conn     io.ReadWriteCloser
    decoder  *gob.Decoder
    encoder  *gob.Encoder
    writeBuf *bufio.Writer
}

封装一个newGobCodec函数,方便后续调用。

func newGobCodec(conn io.ReadWriteCloser) Codec {
    writeBuf := bufio.NewWriter(conn)
    return &gobCodec{
        conn:     conn,
        decoder:  gob.NewDecoder(conn),
        encoder:  gob.NewEncoder(writeBuf),
	writeBuf: writeBuf,
    }
}

实现 Codec 接口中的 ReadHeader 方法

func (c *gobCodec) ReadHeader(header *Header) error {
    return c.decoder.Decode(header)
}

实现 Codec 接口中的 ReadBody 方法

func (c *gobCodec) ReadBody(body interface{}) error {
    return c.decoder.Decode(body)
}

实现 Codec 接口中的 Write 方法

func (c *gobCodec) Write(header *Header, body interface{}) error {
    defer func() {
        if c.writeBuf.Flush() != nil {
	    c.conn.Close()
        }
    }()

    if err := c.encoder.Encode(header); err != nil {
    	return err
    }

    if err := c.encoder.Encode(body); err != nil {
	return err
    }

    return nil
}

至此,rpc 中比较底层的数据编码和解码我们已经抽象出来了Codec接口,并借助encoding/gob实现了gobCodec.

上一篇:java IO流


下一篇:day26 stark组件开发之关键搜索功能