传输协议
可见传输内容主要可以分为以下4部分:
(1) 消息长度:总长度,四个字节存储,占用一个int类型;
(2) 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
(3) 消息头数据:经过序列化后的消息头数据;
(4) 消息主体数据:消息主体的二进制字节数据内容
NettyDecoder
构造函数
定义获取1个网络包的格式
# FRAME_MAX_LENGTH=16777216
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
decode
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer); // decode
} catch (Exception e) {
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
RemotingCommand
decode
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit(); // 获取byteBuffer的总长度
int oriHeaderLen = byteBuffer.getInt(); // 序列化类型(8byte) & 消息头长度(24byte):共同占用一个int类型 详见markProtocolType
int headerLength = getHeaderLength(oriHeaderLen); //获取消息头的长度,这里和0xFFFFFF(6个F)做与运算,也就是24位,把序列化协议去掉了
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData); // 从byteBuffer里面读取headerLength长度的数据到headerData
// 获取序列化类型 有1:rocketmq和0:json协议
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
// body的长度 = 总长度的值 - 总长度占用字节数:4 - 头长度
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData); // 从byteBuffer里面读取body数据
}
cmd.body = bodyData; // 获取body数据
return cmd;
}
NettyEncoder
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader(); // 获取Header数据 放入out
out.writeBytes(header); // 放入header数据
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body); //获取body数据 放入out
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}
RemotingCommand
encodeHeader
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData;
headerData = this.headerEncode();
length += headerData.length; // 总长度4byte + headerData.length
// 3> body data length
length += bodyLength; // length += length(总长度4byte + headerData.length) + bodyLength
// 存放(总长度4byte + header长度4byte(虚化列类型和header长度) + headerData.length ,一句话就是不包括body数据
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // +4 是因为 length不包括消息总长度
// length 消息总长度
result.putInt(length);
// header length header长度4byte(虚化列类型和header长度)
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data header的字节数据
result.put(headerData);
// limit = position; position = 0; mark = -1;
result.flip();
return result;
}
headerEncode
把RemotingCommand转成二进制数据
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { // rocketmq协议 -> byte[]
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else { // todo json方式 obj -> jsonString -> byte[]
return RemotingSerializable.encode(this); //把RemotingCommand自己传入进去,转成json的二进制数据
}
}
NettyRemotingClient
start()
public void start() {
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(), // 这个
new NettyDecoder(), // 这个
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
........
}
NettyRemotingServer
public void start() {
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder, // 这里
new NettyDecoder(), // 这里
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
}
自定义RocketMQ的消息协议
在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作
自定义协议实现
详见RocketMQSerializable的rocketMQProtocolEncode、rocketMQProtocolDecode