rocketmq传输协议

传输协议

rocketmq传输协议

可见传输内容主要可以分为以下4部分:

(1) 消息长度:总长度,四个字节存储,占用一个int类型;
(2) 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
(3) 消息头数据:经过序列化后的消息头数据;
(4) 消息主体数据:消息主体的二进制字节数据内容

NettyDecoder

构造函数

定义获取1个网络包的格式

# FRAME_MAX_LENGTH=16777216
public NettyDecoder() { 
    super(FRAME_MAX_LENGTH, 0, 4, 0, 4); 
}

rocketmq传输协议

decode

public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }
            ByteBuffer byteBuffer = frame.nioBuffer();
            return RemotingCommand.decode(byteBuffer); // decode
        } catch (Exception e) {
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }

RemotingCommand

decode

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit(); // 获取byteBuffer的总长度
        int oriHeaderLen = byteBuffer.getInt(); // 序列化类型(8byte) &  消息头长度(24byte):共同占用一个int类型  详见markProtocolType
        int headerLength = getHeaderLength(oriHeaderLen); //获取消息头的长度,这里和0xFFFFFF(6个F)做与运算,也就是24位,把序列化协议去掉了

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData); // 从byteBuffer里面读取headerLength长度的数据到headerData
        // 获取序列化类型  有1:rocketmq和0:json协议
        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
        // body的长度 = 总长度的值 - 总长度占用字节数:4 - 头长度
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData); // 从byteBuffer里面读取body数据
        }
        cmd.body = bodyData; // 获取body数据

        return cmd;
    }

NettyEncoder

@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader(); // 获取Header数据 放入out
            out.writeBytes(header); // 放入header数据
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body); //获取body数据 放入out
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }
}

RemotingCommand

encodeHeader

public ByteBuffer encodeHeader(final int bodyLength) {
	// 1> header length size
	int length = 4;

	// 2> header data length
	byte[] headerData;
	headerData = this.headerEncode();

	length += headerData.length; // 总长度4byte + headerData.length

	// 3> body data length
	length += bodyLength; // length += length(总长度4byte + headerData.length) + bodyLength
	// 存放(总长度4byte + header长度4byte(虚化列类型和header长度) + headerData.length ,一句话就是不包括body数据
	ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // +4 是因为 length不包括消息总长度

	// length  消息总长度
	result.putInt(length);

	// header length  header长度4byte(虚化列类型和header长度)
	result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

	// header data header的字节数据
	result.put(headerData);
	//  limit = position; position = 0; mark = -1;
	result.flip();

	return result;
}

headerEncode

把RemotingCommand转成二进制数据

private byte[] headerEncode() {
	this.makeCustomHeaderToNet();
	if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { // rocketmq协议 -> byte[]
		return RocketMQSerializable.rocketMQProtocolEncode(this);
	} else { // todo json方式  obj -> jsonString -> byte[]
		return RemotingSerializable.encode(this);  //把RemotingCommand自己传入进去,转成json的二进制数据
	}
}

NettyRemotingClient

start()

public void start() {

	Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				ChannelPipeline pipeline = ch.pipeline();
				pipeline.addLast(
					defaultEventExecutorGroup,
					new NettyEncoder(), // 这个
					new NettyDecoder(), // 这个
					new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
					new NettyConnectManageHandler(),
					new NettyClientHandler());
			}
		});

	........
}

NettyRemotingServer

public void start() {
        
	ServerBootstrap childHandler =
	this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
		.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				ch.pipeline()
					.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
					.addLast(defaultEventExecutorGroup,
						encoder, // 这里
						new NettyDecoder(), // 这里
						new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
						connectionManageHandler,
						serverHandler
					);
			}
		});

}

自定义RocketMQ的消息协议

在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作
rocketmq传输协议

自定义协议实现

详见RocketMQSerializable的rocketMQProtocolEncode、rocketMQProtocolDecode

上一篇:RocketMQ (四) 使用RocketMQ原生API收发消息


下一篇:Linux&Windows下安装RocketMQ