RocketMQ源码详解 | Producer篇 · 其二:消息组成、发送链路

概述

在上一节 RocketMQ源码详解 | Producer篇 · 其一:Start,然后 Send 一条消息 中,我们了解了 Producer 在发送消息的流程。这次我们再来具体下看消息的构成与其发送的链路

Message

在 RocketMQ 的使用中,Message 类是在发送消息时必须用到的,其中 body 即是消息的存放位置,还有的就是消息的 标识(flag) 和 属性(properties)

public class Message {
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
}

消息的标识(flag)

变量名 含义
COMPRESSED_FLAG 压缩消息。消息为批量的时候,就会进行压缩,默认使用5级的 zip
MULTI_TAGS_FLAG 有多个 tag。
TRANSACTION_NOT_TYPE 事务为未知状态。当 Broker 回查 Producer 的时候,如果为 Commit 应该提交,为 Rollback 应该回滚,为 Unknown 时应该继续回查
TRANSACTION_PREPARED_TYPE 事务的运行状态。当前消息是事务的一部分
TRANSACTION_COMMIT_TYPE 事务的提交消息。要求提交事务
TRANSACTION_ROLLBACK_TYPE 事务的回滚消息。要求回滚事务
BORNHOST_V6_FLAG 生成该消息的 host 是否 ipv6 的地址
STOREHOSTADDRESS_V6_FLAG 持久化该消息的 host 是否是 ipv6 的地址

消息的属性(properties)

而消息的 properties 较多,只摘了一小段

属性 含义
KEYS 消息的 Key。服务器会通过 key 设置索引,应用可以通过 Topic 和 Key 来查找这条消息以及被谁消费
TAGS 消息的子类型,可以根据 tag 选择性消费
DELAY 延迟消息的延迟级别(共16级,理论上可以有18级)
RETRY_TOPIC 需要重试的 Topic(在 Broker 中会存放到 SCHEDULE_TOPIC_XXXX Topic,其中有 18 个 queue,对应 18 个重试延迟)
REAL_TOPIC 真实的 Topic (RocketMQ 经常使用更换目的 Topic 的"把戏",如事务消息和延时消息,这个字段记录了真正的 Topic)
PGROUP 生产者组
MAX_OFFSET\MIN_OFFSET 在 pull 中的最大偏移量和最小偏移量
TRANSFER_FLAG 事务有关标识
MSG_TYPE 消息类型,是否为回复消息
BUYER_ID 嗯...买家ID?

当然,这只是在生产者中的消息的样子,在 Broker 和消费者的眼中中,它是这样的

public class MessageExt extends Message {
private static final long serialVersionUID = 5720810158625748049L; private String brokerName; private int queueId; // 存盘的大小
private int storeSize; // 在 ConsumerQueue 中的偏移量
private long queueOffset;
private int sysFlag;
// 消息创建时间
private long bornTimestamp;
// 创建地址
private SocketAddress bornHost; // 存盘时间
private long storeTimestamp;
private SocketAddress storeHost;
private String msgId;
// 在 commitLog 中的偏移量
private long commitLogOffset;
// crc 校验
private int bodyCRC;
// 消费重试次数
private int reconsumeTimes; private long preparedTransactionOffset;
}

消息的包装

那么,producer 生成了这样的消息后,会直接将其发出去吗?

让我们继续跟踪上一篇没讲完的内容

MQClientAPIImpl#sendMessage

long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
// 是否为 reply 消息
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
// 是 smart 消息则加上请求头
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody()); /* -- pass -- */

在这里,我们可以看到在这又加了层套娃(只保留了body),然后才发送

RemotingCommand 的具体属性如下

private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private transient byte[] body;

我们还在他的方法中找到了一个叫 encode 的方法,并且返回的是 ByteBuffer 。因此这就是实际发送的消息。

public ByteBuffer encode() {
// 1> header length size
int length = 4; // 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length; // 3> body data length
if (this.body != null) {
length += body.length;
} ByteBuffer result = ByteBuffer.allocate(4 + length); // length
result.putInt(length); // header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data
result.put(headerData); // body data;
if (this.body != null) {
result.put(this.body);
} result.flip(); return result;
}

消息的结构

具体的消息结构如下图:

RocketMQ源码详解 | Producer篇 · 其二:消息组成、发送链路

其中每个字段在 Request 和 Response 中都有不同的含义

code

在 Request 中,为请求的操作码

public class RequestCode {
// 发送消息
public static final int SEND_MESSAGE = 10;
// 拉取消息
public static final int PULL_MESSAGE = 11;
// 查询消息(所在topic, 需要的 key, 最大数量, 开始偏移量, 结束偏移量)
public static final int QUERY_MESSAGE = 12;
// 查询 Broker 偏移量(未使用)
public static final int QUERY_BROKER_OFFSET = 13;
/*
* 查询消费者偏移量
* 消费者会将偏移量存储在内存中,当使用主从架构时,会默认由主 Broker 负责读于写
* 为避免消息堆积,堆积消息超过指定的值时,会由从服务器来接管读,但会导致消费进度问题
* 所以主从消费进度的一致性由 从服务器主动上报 和 消费者内存进度优先 来保证
*/
// 查询消费者自己的偏移量
public static final int QUERY_CONSUMER_OFFSET = 14;
// 提交自己的偏移量
public static final int UPDATE_CONSUMER_OFFSET = 15;
// 创建或更新Topic public static final int UPDATE_AND_CREATE_TOPIC = 17;
// 获取所有的Topic信息
public static final int GET_ALL_TOPIC_CONFIG = 21;
/* unused */
public static final int GET_TOPIC_CONFIG_LIST = 22;
public static final int GET_TOPIC_NAME_LIST = 23;
// 更新 Broker 配置
public static final int UPDATE_BROKER_CONFIG = 25;
// 获取 Broker 配置
public static final int GET_BROKER_CONFIG = 26;
public static final int TRIGGER_DELETE_FILES = 27;
// 获取 Broker 运行时信息
public static final int GET_BROKER_RUNTIME_INFO = 28;
// 通过时间戳查找偏移量
public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29;
// 获取最大偏移量
public static final int GET_MAX_OFFSET = 30;
// 获取最小偏移量
public static final int GET_MIN_OFFSET = 31;
//
public static final int GET_EARLIEST_MSG_STORETIME = 32; /* 由 Broker 处理 */
// 通过消息ID查询消息
public static final int VIEW_MESSAGE_BY_ID = 33;
// 心跳消息
public static final int HEART_BEAT = 34;
// 注销客户端
public static final int UNREGISTER_CLIENT = 35;
// 报告消费失败(一段时间后重试) (Deprecated)
public static final int CONSUMER_SEND_MSG_BACK = 36;
// 事务结果(可能是 commit 或 rollback)
public static final int END_TRANSACTION = 37;
// 通过消费者组获取消费者列表
public static final int GET_CONSUMER_LIST_BY_GROUP = 38; // 检查事务状态; Broker对于事务的未知状态的回查操作
public static final int CHECK_TRANSACTION_STATE = 39;
// 通知消费者的ID已经被更改
public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40; // 批量锁定 Queue (rebalance使用)
public static final int LOCK_BATCH_MQ = 41;
// 解锁 Queue
public static final int UNLOCK_BATCH_MQ = 42;
// 获得该 Broker 上的所有的消费者偏移量
public static final int GET_ALL_CONSUMER_OFFSET = 43;
// 获得延迟 Topic 上的偏移量
public static final int GET_ALL_DELAY_OFFSET = 45;
// 检查客户端配置
public static final int CHECK_CLIENT_CONFIG = 46;
// 更新或创建 ACL
public static final int UPDATE_AND_CREATE_ACL_CONFIG = 50;
// 删除 ACL 配置
public static final int DELETE_ACL_CONFIG = 51;
// 获取 Broker 集群的 ACL 信息
public static final int GET_BROKER_CLUSTER_ACL_INFO = 52;
// 更新全局白名单
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;
// 获取 Broker 集群的 ACL 配置
public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54; /* NameServer 相关 */ // 放入键值配置
public static final int PUT_KV_CONFIG = 100;
// 获取键值配置
public static final int GET_KV_CONFIG = 101;
// 删除键值配置
public static final int DELETE_KV_CONFIG = 102;
// 注册 Broker
public static final int REGISTER_BROKER = 103;
// 注销 Broker
public static final int UNREGISTER_BROKER = 104;
// 获取指定 Topic 的路由信息
public static final int GET_ROUTEINFO_BY_TOPIC = 105;
// 获取 Broker 的集群信息
public static final int GET_BROKER_CLUSTER_INFO = 106; // 更新或创建订阅组
public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;
// 获取所有订阅组的配置
public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201;
// 获取 Topic 的度量指标
public static final int GET_TOPIC_STATS_INFO = 202;
// 获取消费者在线列表(rpc)
public static final int GET_CONSUMER_CONNECTION_LIST = 203;
// 获取生产者在线列表
public static final int GET_PRODUCER_CONNECTION_LIST = 204;
public static final int WIPE_WRITE_PERM_OF_BROKER = 205;
// 从 NameSrv 获取所有 Topic
public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206;
// 删除订阅组
public static final int DELETE_SUBSCRIPTIONGROUP = 207;
// 获取消费者的度量指标
public static final int GET_CONSUME_STATS = 208; public static final int SUSPEND_CONSUMER = 209;
public static final int RESUME_CONSUMER = 210;
public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211;
public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212; public static final int ADJUST_CONSUMER_THREAD_POOL = 213; public static final int WHO_CONSUME_THE_MESSAGE = 214; // 删除 Broker 中的 Topic
public static final int DELETE_TOPIC_IN_BROKER = 215;
// 删除 NameSrv 中的 Topic
public static final int DELETE_TOPIC_IN_NAMESRV = 216;
// 获取键值列表
public static final int GET_KVLIST_BY_NAMESPACE = 219; // 重置消费者的消费进度
public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;
// 从消费者中获取消费者的度量指标
public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221;
// 让 Broker 重置消费进度
public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222;
// 让 Broker 更新消费者的度量信息
public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223;
// 查询消息被谁消费
public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300;
// 从集群中获取 Topic
public static final int GET_TOPICS_BY_CLUSTER = 224;
// 注册过滤器服务器
public static final int REGISTER_FILTER_SERVER = 301;
// 注册消息过滤类
public static final int REGISTER_MESSAGE_FILTER_CLASS = 302;
// 查询消费时间
public static final int QUERY_CONSUME_TIME_SPAN = 303;
// 从 NameSrv 中获取系统Topic
public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304;
// 从 Broker 中获取系统Topic
public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305;
// 清理过期的消费队列
public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306;
// 获取 Consumer 的运行时信息
public static final int GET_CONSUMER_RUNNING_INFO = 307;
// 查询修正偏移量
public static final int QUERY_CORRECTION_OFFSET = 308;
// 直接消费消息
public static final int CONSUME_MESSAGE_DIRECTLY = 309;
// 发送消息(v2),优化网络数据包
public static final int SEND_MESSAGE_V2 = 310; // 单元化相关 topic
public static final int GET_UNIT_TOPIC_LIST = 311;
// 获取含有单元化订阅组的 Topic 列表
public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312;
// 获取含有单元化订阅组的非单元化 Topic 列表
public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313;
// 克隆消费进度
public static final int CLONE_GROUP_OFFSET = 314;
// 查询 Broker 上的度量信息
public static final int VIEW_BROKER_STATS_DATA = 315; // 清理未使用的 Topic
public static final int CLEAN_UNUSED_TOPIC = 316;
// 获取 broker 上的有关消费的度量信息
public static final int GET_BROKER_CONSUME_STATS = 317; /* update the config of name server */
public static final int UPDATE_NAMESRV_CONFIG = 318;
/* get config from name server */
public static final int GET_NAMESRV_CONFIG = 319; // 发送批量消息
public static final int SEND_BATCH_MESSAGE = 320;
// 查询消费的 Queue
public static final int QUERY_CONSUME_QUEUE = 321;
// 查询数据版本
public static final int QUERY_DATA_VERSION = 322; /* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before */
public static final int RESUME_CHECK_HALF_MESSAGE = 323;
// 回送消息
public static final int SEND_REPLY_MESSAGE = 324;
public static final int SEND_REPLY_MESSAGE_V2 = 325;
// push回送消息到客户端
public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
}

在 Response 中,为响应码

public class ResponseCode extends RemotingSysResponseCode {
// 刷新到磁盘超时
public static final int FLUSH_DISK_TIMEOUT = 10;
// 从节点不可达
public static final int SLAVE_NOT_AVAILABLE = 11;
// 从节点刷盘超时
public static final int FLUSH_SLAVE_TIMEOUT = 12;
// 非法的消息结构
public static final int MESSAGE_ILLEGAL = 13;
// 服务不可用
public static final int SERVICE_NOT_AVAILABLE = 14;
// 版本不支持
public static final int VERSION_NOT_SUPPORTED = 15;
// 未授权的
public static final int NO_PERMISSION = 16;
// Topic 不存在
public static final int TOPIC_NOT_EXIST = 17;
// Topic 已经存在
public static final int TOPIC_EXIST_ALREADY = 18;
// 要拉取的偏移量不存在
public static final int PULL_NOT_FOUND = 19;
// 立刻重新拉取
public static final int PULL_RETRY_IMMEDIATELY = 20;
// 重定向拉取的偏移量
public static final int PULL_OFFSET_MOVED = 21;
// 不存在的队列
public static final int QUERY_NOT_FOUND = 22;
// 订阅的 url 解析失败
public static final int SUBSCRIPTION_PARSE_FAILED = 23;
// 目标订阅不存在
public static final int SUBSCRIPTION_NOT_EXIST = 24;
// 订阅不是最新的
public static final int SUBSCRIPTION_NOT_LATEST = 25;
// 订阅组不存在
public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;
// 订阅的数据不存在 (tag表达式异常)
public static final int FILTER_DATA_NOT_EXIST = 27;
// 该 Broker 上订阅的数据不是最新的
public static final int FILTER_DATA_NOT_LATEST = 28; // 事务应该提交
public static final int TRANSACTION_SHOULD_COMMIT = 200;
// 事务应该回滚
public static final int TRANSACTION_SHOULD_ROLLBACK = 201;
// 事务状态位置
public static final int TRANSACTION_STATE_UNKNOW = 202;
// 事务状态Group错误
public static final int TRANSACTION_STATE_GROUP_WRONG = 203; // 买家ID不存在
public static final int NO_BUYER_ID = 204;
public static final int NOT_IN_CURRENT_UNIT = 205;
// 消费者不在线(rpc)
public static final int CONSUMER_NOT_ONLINE = 206;
// 消费超时
public static final int CONSUME_MSG_TIMEOUT = 207;
// 消息不存在
public static final int NO_MESSAGE = 208;
// 更新或创建 ACL 配置失败
public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209;
// 删除 ACL 配置失败
public static final int DELETE_ACL_CONFIG_FAILED = 210;
// 更新全局白名单地址失败
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211; }
lang

字段为消息发起方编码语言,这里默认为 java

private LanguageCode language = LanguageCode.JAVA;
version

消息发起方的程序版本

opaque

该字段是为了在同一连接上标识不同的请求,在响应的时候能够回调对应的函数( rocketmq 的发送使用了 TCP 连接复用)

remark

在 Reqeust 中,用于传输自定义文本

在 Response 中,用于传输错误的原因

ext

传输自定义的消息头

消息的发送

在知道消息长啥样后,就可以继续看发送代码了

switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
} return null;

NettyRemotingClient#invokeOneway

我们先来看最简单的Oneway

public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 创建 Channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr, request);
// 使用建立好的连接发送
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}

以上可以大致抽象为两个操作:获取或建立TCP连接、通过连接发送数据,同时一旦发生异常则关闭连接

NettyRemotingClient#getAndCreateChannel

先看第一个操作

private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
// 地址为空则说明要获取的是NameServer的地址
if (null == addr) {
return getAndCreateNameserverChannel();
} // 尝试从缓存中获取
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
} // 没有或未就绪则新建连接
return this.createChannel(addr);
}

可以看出,这里是由一个 ChannelTable 来维护所有的连接,而 ChannelTable 又是由 NettyRemotingClient 维护,即其是在 JVM 上的全局共享实例。

然后再具体查看创建的方法,可以发现 Channel 最终是由客户端的 Bootstrap 异步创建

ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
} // 连接的建立是串行的
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
// 双重校验保证连接确实没被创建
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isOK()) {
// 连接建立完成
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
// 建立过但失败了
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
} if (createNewConnection) {
// 实际上的连接创建
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
log.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
} if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
// 阻塞直到创建完成
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
}
} else {
log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
} return null;

NettyRemotingAbstract#invokeOnewayImpl

然后接着看第二个操作:通过连接发送数据

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 在请求头上的 flag 标记为 oneway 请求
request.markOnewayRPC();
// 获取信号量,保证不会系统不会承受过多请求
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 真正发送完成后,释放锁
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
// 超出请求数
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
// 超时
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}

这块比较简单,在获取发送 oneway 的信号量后调用 Channel 的 writeAndFlush 方法发送,发送完成后释放

MQClientAPIImpl#sendMessageSync

然后来看同步的发送

// 发送请求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
// 处理响应
return this.processSendResponse(brokerName, msg, response,addr);

其中在 NettyRemotingClient#invokeSync 做的事和 oneway 发送差不多,都是创建或获取 Channel 然后处理钩子然后调用父类的响应实现。

所以我们直接来看父类是咋做的

NettyRemotingAbstract#invokeSyncImpl

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque(); try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
} // 发送失败,回填 responseFuture 并在 responseTable 移除其
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
}); // 使用 countDownLatch 来等待响应到达
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}

发现和 oneway 的发送的区别了吗?其中最大的区别有两个:

  1. 在 oneway 中出现的信号量限流不见了
  2. 出现了 responseTable 来管理所有的 responseFuture

我们发现了以下定义

protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);

由之前介绍到的 opaque 可以知道,这里对 opaque 和 responseFuture 做了映射,当响应到来时,可以根据 opaque 处理对应的 responseFuture。而流控的消失也是可以理解的,毕竟同步发送会阻塞整个线程,所以在发送方来做流控是不合理的

最后发送完成后使用 processSendResponse 处理响应后返回发送结果

NettyRemotingAbstract#invokeAsyncImpl

最后看异步的发送

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
// 信号量流控
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
// 发生了任何阻塞操作后都要检查是否超时...
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
} final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}

这里和同步发送区别不大,主要还是用了信号量做流控,且不在 responseFuture 使用 countDownLatch 阻塞

Netty 组件

RocketMQ源码详解 | Producer篇 · 其二:消息组成、发送链路

总所周知,在 Channel 写入消息后,就会进入 Pipeline 的尾部,并往出站的方向流出,接下来就看看在出站的过程中又是怎样做的

NettyRemotingAbstract

RocketMQ源码详解 | Producer篇 · 其二:消息组成、发送链路

在 rocketMQ 的 remoting 模块下有一个 netty 包,那里就是 RPC 调用的处理位置。

而在 NettyRemotingAbstract 下,我们就已经看到了有三个重要的方法 invokeOnewayImpl、invokeAsyncImpl、invokeSyncImpl

同时这个类的子类有 NettyRemotingClient 和 NettyRemotingServer,我们先来看和 Producer 有关的部分

public abstract class NettyRemotingAbstract {
/* oneway请求的信号量 */
protected final Semaphore semaphoreOneway;
/* async请求的信号量 */
protected final Semaphore semaphoreAsync;
/* 缓存所有进行中的请求(因为请求是并行的,要对对应的请求做出对应响应), */
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256); /* 对请求码进行对应的处理 */
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64); /* Executor to feed netty events to user defined {@link ChannelEventListener}. */
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor(); /* 默认请求处理器 */
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/* SSL上下文 {@link SslHandler}. */
protected volatile SslContext sslContext;
/* rpc hooks */
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>(); /**
* Custom channel event listener.
*
* @return custom channel event listener if defined; null otherwise.
*/
public abstract ChannelEventListener getChannelEventListener();
}

request 响应部分和 response 响应部分

/**
* 对请求进行响应
* @param ctx Channel handler context.
* @param msg incoming remoting command.
* @throws Exception if there were any error while processing the incoming command.
*/
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
// 该请求可能是一个 request, 也可能是自己发出一个请求的 response
if (cmd != null) {
// 通过请求头上的 flag 就能判断
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
} /**
* Process incoming request command issued by remote peer.
*
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque(); if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
// 如果不是 oneway 请求的话,则需要进行响应
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString()); if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
}; // 是否触发流控
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
} try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// 交由对应的处理器处理
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
} if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
// 没有对应的响应方式且不存在默认响应器
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
} /**
* Process response from remote peer to the previous issued requests.
*
* @param ctx channel handler context.
* @param cmd response command instance.
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); // 处理回调方法
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
// 如果不是的话,说明这是一个阻塞调用,还需要去进行释放
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
} /**
* 在回调执行器中执行回调。如果回调执行器为空,则直接在当前线程中运行
*/
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
} if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
} /**
* 执行回调方法需要的线程池
*/
public abstract ExecutorService getCallbackExecutor(); /**
* 定期调用此方法来扫描和过期已弃用的请求。
*/
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue(); if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
} for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
} /**
* 该服务用于使用子类实现的自定义的事件处理器对指定的事件进行处理
*/
class NettyEventExecutor extends ServiceThread {
}

这段代码看起来比较长,但实际逻辑还是比较简单的

  • 响应 Request
    1. 流量控制触发检查;触发则直接返回
    2. 获取 request code 对应的处理器,执行请求;根据同步或异步执行不同请求
    3. 返回响应;如果为 oneway request,不进行响应
  • 响应 Response
    1. 取出对应的活动中的 request
    2. 根据异步或同步来处理回调

NettyRemotingClient

RocketMQ源码详解 | Producer篇 · 其二:消息组成、发送链路

再来看它的客户端实现类,这个类中我们主要看 Bootstrap 的创建

// 建立客户端的 Bootstrap
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
// 禁用 Nagle 算法
.option(ChannelOption.TCP_NODELAY, true)
// 关闭 keepalive,由我们自己管理连接
.option(ChannelOption.SO_KEEPALIVE, false)
// 设定的超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
// 发送窗口和接收窗口的大小
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// TLS层添加
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
// 使用自定义的 EventLoop
defaultEventExecutorGroup,
// 注册编解码注册器
new NettyEncoder(),
new NettyDecoder(),
// 注册 idle 检查,下一个handle通过覆写userEventTriggered监听连接超时事件
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
// 管理连接,超时处理,维护channelTables与存活的连接
new NettyConnectManageHandler(),
// 实际上处理收到的请求
new NettyClientHandler());
}
});

编码和解码我们都已经知道是直接编解码成字节流,而 NettyClientHandler 的实现就是直接调用父类的请求处理,所以我们主要看下 NettyConnectManageHandler

class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote); super.connect(ctx, remoteAddress, localAddress, promise); if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
}
} @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
closeChannel(ctx.channel());
super.disconnect(ctx, promise); if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
} @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
// 从 channelTables 移除
closeChannel(ctx.channel());
super.close(ctx, promise);
// 处理已经失败的请求,调用回调方法
NettyRemotingClient.this.failFast(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
} @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// 处理超过时未接收心跳的 channel
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
} ctx.fireUserEventTriggered(evt);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
}
}

NettyConnectManageHandler 继承了 ChannelDuplexHandler 类,以此监听 Channel。

其主要做的事是:

  • 在连接 close 时移除在 channelTables 中移除并关闭连接
  • 关闭超时的连接

最后,NettyClientHandler 将收到的请求直接传入 NettyRemotingAbstractprocessMessageReceived 方法来处理

上一篇:Linux中用postfix搭建邮件服务器实战详解


下一篇:RocketMQ 源码学习笔记————Producer 是怎么将消息发送至 Broker 的?