前提
最近在看Netty
相关的资料,刚好SOFA-BOLT
是一个比较成熟的Netty
自定义协议栈实现,于是决定研读SOFA-BOLT
的源码,详细分析其协议的组成,简单分析其客户端和服务端的源码实现。
- 吐槽一下:
SOFA-BOLT
的代码缩进和FastJson
类似,变量名称强制对齐,对于一般开发者来说看着源码会有不适感
当前阅读的源码是2021-08
左右的SOFA-BOLT
仓库的master
分支源码。
SOFA-BOLT简单介绍
SOFA-BOLT
是蚂蚁金融服务集团开发的一套基于Netty
实现的网络通信框架,本质是一套Netty
私有协议栈封装,目的是为了让开发者能将更多的精力放在基于网络通信的业务逻辑实现上,而不是过多的纠结于网络底层NIO
的实现以及处理难以调试的网络问题和Netty
二次开发问题。SOFA-BOLT
的架构设计和功能如下:
上图来源于SOFA-BOLT官网https://www.sofastack.tech/projects/sofa-bolt/overview
SOFA-BOLT协议透视
由于SOFA-BOLT
协议是基于Netty
实现的自定义协议栈,协议本身的实现可以快速地在Encoder
和Decoder
的实现中找到,进一步定位到com.alipay.remoting.rpc
包中。从源码得知,SOFA-BOLT
协议目前有两个版本,协议在RpcProtocol
和RpcProtocolV2
的类顶部注释中有比较详细的介绍,基于这些介绍可以简单整理出两个版本协议的基本构成。
V1版本协议的基本构成
-
V1
版本的协议请求Frame
基本构成:
-
V1
版本的协议响应Frame
基本构成:
针对V1
版本的协议,各个属性展开如下:
- 请求
Frame
和响应Frame
的公共属性:
属性Code | 属性含义 | Java类型 | 大小(byte) | 备注 |
---|---|---|---|---|
proto | 协议编码 | byte | 1 |
V1 版本下,proto = 1 ,V2 版本下,proto = 2
|
type | 类型 | byte | 1 |
0 => RESPONSE ,1 => REQUEST ,2 => REQUEST_ONEWAY
|
cmdcode | 命令编码 | short | 2 |
1 => rpc request ,2 => rpc response
|
ver2 | 命令版本 | byte | 1 | 从源码得知目前固定为1
|
requestId | 请求ID | int | 4 | 某个请求CMD 的全局唯一标识 |
codec | 编码解码器 | byte | 1 | - |
上表中,codec从字面上理解是编码解码器,实际上是序列化和反序列实现的标记,V1和V2目前都是固定codec = 1,通过源码跟踪到SerializerManager的配置值为Hessian2 = 1,也就是默认使用Hessian2进行序列化和反序列化,详细见源码中的HessianSerializer
- 请求
Frame
特有的属性:
属性Code | 属性含义 | Java类型 | 大小(byte) | 备注 |
---|---|---|---|---|
timeout | 请求超时时间 | int | 4 | |
classLen | 请求对象(参数)类型的名称长度 | short | 2 | 值>=0
|
headerLen | 请求头长度 | short | 2 | 值>=0
|
contentLen | 请求内容长度 | int | 4 | 值>=0
|
className bytes | 请求对象(参数)类型的名称 | byte[] |
- | |
header bytes | 请求头 | byte[] |
- | |
content bytes | 请求内容 | byte[] |
- |
- 响应
Frame
特有的属性:
属性Code | 属性含义 | Java类型 | 大小(byte) | 备注 |
---|---|---|---|---|
respstatus | 响应状态值 | short | 2 | 在ResponseStatus 中定义,目前内置13 种状态,例如0 => SUCCESS
|
classLen | 响应对象(参数)类型的名称长度 | short | 2 | 值>=0
|
headerLen | 响应头长度 | short | 2 | 值>=0
|
contentLen | 响应内容长度 | int | 4 | 值>=0
|
className bytes | 响应对象(参数)类型的名称 | byte[] |
- | |
header bytes | 响应头 | byte[] |
- | |
content bytes | 响应内容 | byte[] |
- |
这里可以看出V1
版本中的请求Frame
和响应Frame
只有细微的差别,(请求Frame
中独立存在timeout
属性,而响应Frame
独立存在respstatus
属性),绝大部分的属性都是复用的,并且三个长度和三个字节数组是相互制约的:
classLen <=> className bytes
headerLen <=> header bytes
contentLen <=> content bytes
V2版本协议的基本构成
-
V2
版本的协议请求Frame
基本构成:
-
V2
版本的协议响应Frame
基本构成:
V2
版本的协议相比V1
版本多了2
个必传公共属性和1
个可选公共属性:
属性Code | 属性含义 | Java类型 | 大小(byte) | 备注 |
---|---|---|---|---|
ver1 | 协议版本 | byte | 1 | 是为了在V2 版本协议中兼容V1 版本的协议 |
switch | 协议开关 | byte | 1 | 基于BitSet 实现的开关,最多8 个 |
CRC32 | 循环冗余校验值 | int | 4 | 可选的,由开关ProtocolSwitch.CRC_SWITCH_INDEX 决定是否启用,启用的时候会基于整个Frame 进行计算 |
这几个新增属性中,switch
代表ProtocolSwitch
实现中的BitSet
转换出来的byte
字段,由于byte
只有8
位,因此协议在传输过程中最多只能传递8
个开关的状态,这些开关的下标为[0,7]
。CRC32
是基于整个Frame
转换出来的byte
数组进行计算,JDK
中有原生从API
,可以简单构建一个工具类如下进行计算:
public enum Crc32Utils {
/**
* 单例
*/
X;
/**
* 进行CRC32结果计算
*
* @param content 内容
* @return crc32 result
*/
public long crc32(byte[] content) {
CRC32 crc32 = new CRC32();
crc32.update(content, 0, content.length);
long r = crc32.getValue();
// crc32.reset();
return r;
}
}
V2
版本协议把CRC32
的计算结果强制转换为int
类型,可以思考一下这里为什么不会溢出。
SOFA-BOLT架构
考虑到如果分析源码,文章篇幅会比较长,并且如果有开发过Netty
自定义协议栈的经验,SOFA-BOLT
的源码并不复杂,这里仅仅分析SOFA-BOLT
的架构和核心组件功能。协议由接口Protocol
定义:
public interface Protocol {
// 命令编码器
CommandEncoder getEncoder();
// 命令解码器
CommandDecoder getDecoder();
// 心跳触发器
HeartbeatTrigger getHeartbeatTrigger();
// 命令处理器
CommandHandler getCommandHandler();
// 命令工厂
CommandFactory getCommandFactory();
}
由V2
版本协议实现RpcProtocolV2
可以得知:
另外,所有需要发送或者接收的Frame
都被封装为Command
,而Command
的类族如下:
也就是:
-
RequestCommand
定义了请求命令需要的所有属性,最终由RpcCommandEncoderV2
进行编码 -
ResponseCommand
定义了响应命令需要的所有属性,最终由RpcCommandDecoderV2
进行解码
梳理完上面的组件就可以画出下面的一个基于SOFA-BOLT
协议进行的Client => Server
的交互图:
SOFA-BOLT使用
由于sofa-bolt
已经封装好了完整的RpcClient
和RpcServer
,使用此协议只需要引用依赖,然后初始化客户端和服务端,编写对应的UserProcessor
实现即可。引入相关依赖:
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.65</version>
</dependency>
新建请求实体类RequestMessage
、响应实体类ResponseMessage
和对应的处理器RequestMessageProcessor
:
@Data
public class RequestMessage implements Serializable {
private Long id;
private String content;
}
@Data
public class ResponseMessage implements Serializable {
private Long id;
private String content;
private Long status;
}
public class RequestMessageProcessor extends SyncUserProcessor<RequestMessage> {
@Override
public Object handleRequest(BizContext bizContext, RequestMessage requestMessage) throws Exception {
ResponseMessage message = new ResponseMessage();
message.setContent(requestMessage.getContent());
message.setId(requestMessage.getId());
message.setStatus(10087L);
return message;
}
@Override
public String interest() {
return RequestMessage.class.getName();
}
}
其中处理器需要同步处理需要继承超类SyncUserProcessor
,选用异步处理的时候需要继承超类AsyncUserProcessor
,作为参数的所有实体类必须实现Serializable
接口(如果有嵌套对象,每个嵌套对象所在类也必须实现Serializable
接口),否则会出现序列化相关的异常。最后编写客户端和服务端的代码:
@Slf4j
public class BlotApp {
private static final int PORT = 8081;
private static final String ADDRESS = "127.0.0.1:" + PORT;
public static void main(String[] args) throws Exception {
RequestMessageProcessor processor = new RequestMessageProcessor();
RpcServer server = new RpcServer(8081, true);
server.startup();
server.registerUserProcessor(processor);
RpcClient client = new RpcClient();
client.startup();
RequestMessage request = new RequestMessage();
request.setId(99L);
request.setContent("hello bolt");
ResponseMessage response = (ResponseMessage) client.invokeSync(ADDRESS, request, 2000);
log.info("响应结果:{}", response);
}
}
运行输出结果:
响应结果:ResponseMessage(id=99, content=hello bolt, status=10087)
基于SOFA-BOLT协议编写简单CURD项目
本地测试MySQL
服务构建客户表如下:
CREATE DATABASE test;
USE test;
CREATE TABLE t_customer
(
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
customer_name VARCHAR(32) NOT NULL
);
为了简化JDBC
操作,引入spring-boot-starter-jdbc
(这里只借用JdbcTemplate
的轻度封装)相关依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
编写核心同步处理器:
// 创建
@Data
public class CreateCustomerReq implements Serializable {
private String customerName;
}
@Data
public class CreateCustomerResp implements Serializable {
private Long code;
private Long customerId;
}
public class CreateCustomerProcessor extends SyncUserProcessor<CreateCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public CreateCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, CreateCustomerReq req) throws Exception {
KeyHolder keyHolder = new GeneratedKeyHolder();
jdbcTemplate.update(connection -> {
PreparedStatement ps = connection.prepareStatement("insert into t_customer(customer_name) VALUES (?)",
Statement.RETURN_GENERATED_KEYS);
ps.setString(1, req.getCustomerName());
return ps;
}, keyHolder);
CreateCustomerResp resp = new CreateCustomerResp();
resp.setCustomerId(Objects.requireNonNull(keyHolder.getKey()).longValue());
resp.setCode(RespCode.SUCCESS);
return resp;
}
@Override
public String interest() {
return CreateCustomerReq.class.getName();
}
}
// 更新
@Data
public class UpdateCustomerReq implements Serializable {
private Long customerId;
private String customerName;
}
public class UpdateCustomerProcessor extends SyncUserProcessor<UpdateCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public UpdateCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, UpdateCustomerReq req) throws Exception {
UpdateCustomerResp resp = new UpdateCustomerResp();
int updateCount = jdbcTemplate.update("UPDATE t_customer SET customer_name = ? WHERE id = ?", ps -> {
ps.setString(1, req.getCustomerName());
ps.setLong(2, req.getCustomerId());
});
if (updateCount > 0) {
resp.setCode(RespCode.SUCCESS);
}
return resp;
}
@Override
public String interest() {
return UpdateCustomerReq.class.getName();
}
}
// 删除
@Data
public class DeleteCustomerReq implements Serializable {
private Long customerId;
}
@Data
public class DeleteCustomerResp implements Serializable {
private Long code;
}
public class DeleteCustomerProcessor extends SyncUserProcessor<DeleteCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public DeleteCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, DeleteCustomerReq req) throws Exception {
DeleteCustomerResp resp = new DeleteCustomerResp();
int updateCount = jdbcTemplate.update("DELETE FROM t_customer WHERE id = ?", ps -> ps.setLong(1,req.getCustomerId()));
if (updateCount > 0){
resp.setCode(RespCode.SUCCESS);
}
return resp;
}
@Override
public String interest() {
return DeleteCustomerReq.class.getName();
}
}
// 查询
@Data
public class SelectCustomerReq implements Serializable {
private Long customerId;
}
@Data
public class SelectCustomerResp implements Serializable {
private Long code;
private Long customerId;
private String customerName;
}
public class SelectCustomerProcessor extends SyncUserProcessor<SelectCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public SelectCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, SelectCustomerReq req) throws Exception {
SelectCustomerResp resp = new SelectCustomerResp();
Customer result = jdbcTemplate.query("SELECT * FROM t_customer WHERE id = ?", ps -> ps.setLong(1, req.getCustomerId()), rs -> {
Customer customer = null;
if (rs.next()) {
customer = new Customer();
customer.setId(rs.getLong("id"));
customer.setCustomerName(rs.getString("customer_name"));
}
return customer;
});
if (Objects.nonNull(result)) {
resp.setCustomerId(result.getId());
resp.setCustomerName(result.getCustomerName());
resp.setCode(RespCode.SUCCESS);
}
return resp;
}
@Override
public String interest() {
return SelectCustomerReq.class.getName();
}
@Data
public static class Customer {
private Long id;
private String customerName;
}
}
编写数据源、客户端和服务端代码:
public class CurdApp {
private static final int PORT = 8081;
private static final String ADDRESS = "127.0.0.1:" + PORT;
public static void main(String[] args) throws Exception {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/test?useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai");
config.setDriverClassName(Driver.class.getName());
config.setUsername("root");
config.setPassword("root");
HikariDataSource dataSource = new HikariDataSource(config);
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
CreateCustomerProcessor createCustomerProcessor = new CreateCustomerProcessor(jdbcTemplate);
UpdateCustomerProcessor updateCustomerProcessor = new UpdateCustomerProcessor(jdbcTemplate);
DeleteCustomerProcessor deleteCustomerProcessor = new DeleteCustomerProcessor(jdbcTemplate);
SelectCustomerProcessor selectCustomerProcessor = new SelectCustomerProcessor(jdbcTemplate);
RpcServer server = new RpcServer(PORT, true);
server.registerUserProcessor(createCustomerProcessor);
server.registerUserProcessor(updateCustomerProcessor);
server.registerUserProcessor(deleteCustomerProcessor);
server.registerUserProcessor(selectCustomerProcessor);
server.startup();
RpcClient client = new RpcClient();
client.startup();
CreateCustomerReq createCustomerReq = new CreateCustomerReq();
createCustomerReq.setCustomerName("throwable.club");
CreateCustomerResp createCustomerResp = (CreateCustomerResp)
client.invokeSync(ADDRESS, createCustomerReq, 5000);
System.out.println("创建用户[throwable.club]结果:" + createCustomerResp);
SelectCustomerReq selectCustomerReq = new SelectCustomerReq();
selectCustomerReq.setCustomerId(createCustomerResp.getCustomerId());
SelectCustomerResp selectCustomerResp = (SelectCustomerResp)
client.invokeSync(ADDRESS, selectCustomerReq, 5000);
System.out.println(String.format("查询用户[id=%d]结果:%s", selectCustomerReq.getCustomerId(),
selectCustomerResp));
UpdateCustomerReq updateCustomerReq = new UpdateCustomerReq();
updateCustomerReq.setCustomerId(selectCustomerReq.getCustomerId());
updateCustomerReq.setCustomerName("throwx.cn");
UpdateCustomerResp updateCustomerResp = (UpdateCustomerResp)
client.invokeSync(ADDRESS, updateCustomerReq, 5000);
System.out.println(String.format("更新用户[id=%d]结果:%s", updateCustomerReq.getCustomerId(),
updateCustomerResp));
selectCustomerReq.setCustomerId(updateCustomerReq.getCustomerId());
selectCustomerResp = (SelectCustomerResp)
client.invokeSync(ADDRESS, selectCustomerReq, 5000);
System.out.println(String.format("查询更新后的用户[id=%d]结果:%s", selectCustomerReq.getCustomerId(),
selectCustomerResp));
DeleteCustomerReq deleteCustomerReq = new DeleteCustomerReq();
deleteCustomerReq.setCustomerId(selectCustomerResp.getCustomerId());
DeleteCustomerResp deleteCustomerResp = (DeleteCustomerResp)
client.invokeSync(ADDRESS, deleteCustomerReq, 5000);
System.out.println(String.format("删除用户[id=%d]结果:%s", deleteCustomerReq.getCustomerId(),
deleteCustomerResp));
}
}
执行结果如下:
创建用户[throwable.club]结果:CreateCustomerResp(code=0, customerId=1)
查询用户[id=1]结果:SelectCustomerResp(code=0, customerId=1, customerName=throwable.club)
更新用户[id=1]结果:UpdateCustomerResp(code=0)
查询更新后的用户[id=1]结果:SelectCustomerResp(code=0, customerId=1, customerName=throwx.cn)
更新用户[id=1]结果:DeleteCustomerResp(code=0)
确认最后删除操作结束后验证数据库表,确认t_customer
表为空。
基于GO语言编写SOFA-BOLT协议客户端
这里尝试使用GO
语言编写一个SOFA-BOLT
协议客户端,考虑到实现一个完整版本会比较复杂,这里简化为只实现Encode
和命令调用部分,暂时不处理响应和Decode
。编写结构体RequestCommand
如下:
// RequestCommand sofa-bolt v2 req cmd
type RequestCommand struct {
ProtocolCode uint8
ProtocolVersion uint8
Type uint8
CommandCode uint16
CommandVersion uint8
RequestId uint32
Codec uint8
Switch uint8
Timeout uint32
ClassLength uint16
HeaderLength uint16
ContentLength uint32
ClassName []byte
Header []byte
Content []byte
}
这里注意一点,所有的整数类型必须使用具体的类型,例如uint
必须用uint32
,否则会出现Buffer
写入异常的问题。接着编写一个编码方法:
// encode req => slice
func encode(cmd *RequestCommand) []byte {
container := make([]byte, 0)
buf := bytes.NewBuffer(container)
buf.WriteByte(cmd.ProtocolCode)
buf.WriteByte(cmd.ProtocolVersion)
buf.WriteByte(cmd.Type)
binary.Write(buf, binary.BigEndian, cmd.CommandCode)
buf.WriteByte(cmd.CommandVersion)
binary.Write(buf, binary.BigEndian, cmd.RequestId)
buf.WriteByte(cmd.Codec)
buf.WriteByte(cmd.Switch)
binary.Write(buf, binary.BigEndian, cmd.Timeout)
binary.Write(buf, binary.BigEndian, cmd.ClassLength)
binary.Write(buf, binary.BigEndian, cmd.HeaderLength)
binary.Write(buf, binary.BigEndian, cmd.ContentLength)
buf.Write(cmd.ClassName)
buf.Write(cmd.Header)
buf.Write(cmd.Content)
return buf.Bytes()
}
最后编写TCP
客户端:
type Req struct {
Id int64 `json:"id"`
Name string `json:"name"`
}
package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"net"
)
func main() {
con, err := net.Dial("tcp", "127.0.0.1:9999")
if err != nil {
fmt.Println("err:", err)
return
}
defer con.Close()
req := &Req{
Id: 8080,
Name: "throwx.cn",
}
content, err := json.Marshal(req)
if err != nil {
fmt.Println("err:", err)
return
}
var header []byte
className := []byte("com.alipay.remoting.Req")
cmd := &RequestCommand{
ProtocolCode: 2,
ProtocolVersion: 2,
Type: 1,
CommandCode: 1,
CommandVersion: 1,
RequestId: 10087,
Codec: 1,
Switch: 0,
Timeout: 5000,
ClassLength: uint16(len(className)),
HeaderLength: 0,
ContentLength: uint32(len(content)),
ClassName: className,
Header: header,
Content: content,
}
pkg := encode(cmd)
_, err = con.Write(pkg)
if err != nil {
fmt.Println("err:", err)
return
}
}
协议的V2版本Crc32属性是可选的,这里为了简化处理也暂时忽略了
这里看到Content
属性为了简化处理使用了JSON
做序列化,因此需要稍微改动SOFA-BOLT
的源码,引入FastJson
和FastJsonSerializer
,改动见下图:
先启动BoltApp
(SOFA-BOLT
服务端),再执行GO
编写的客户端,结果如下:
小结
SOFA-BOLT
是一个高性能成熟可扩展的Netty
私有协议封装,比起原生Netty
编程,提供了便捷的同步、异步调用,提供基础心跳支持和重连等特性。引入SyncUserProcessor
和AsyncUserProcessor
的功能,对于业务开发更加友好。SOFA-BOLT
协议本质也是一个紧凑、高性能的RPC
协议。在考虑引入Netty
进行底层通讯的场景,可以优先考虑使用SOFA-BOLT
或者考虑把SOFA-BOLT
作为候选方案之一,只因SOFA-BOLT
是轻量级的,学习曲线平缓,基本没有其他中间件依赖。
Demo
所在仓库:
(本文完 c-5-d e-a-20210806)