源码分析-Producer

消息生产者的代码都在client模块中,相对于RocketMQ来讲,消息生产者就是客户端,也是消息的提供者。

源码分析-Producer

方法和属性

主要方法介绍

源码分析-Producer

  • //创建主题
    void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
    
  • //根据时间戳从队列中查找消息偏移量
    long searchOffset(final MessageQueue mq, final long timestamp)
    
  • //查找消息队列中最大的偏移量
    long maxOffset(final MessageQueue mq) throws MQClientException;
    
  • //查找消息队列中最小的偏移量
    long minOffset(final MessageQueue mq) 
    
  • //根据偏移量查找消息
    MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
            InterruptedException, MQClientException;
    
  • //根据条件查找消息
    QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
            final long end) throws MQClientException, InterruptedException;
    
  • //根据消息ID和主题查找消息
    MessageExt viewMessage(String topic,String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
    

源码分析-Producer

  • //启动
    void start() throws MQClientException;
    
  • //关闭
    void shutdown();
    
  • //查找该主题下所有消息
    List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
    
  • //同步发送消息
    SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
            InterruptedException;
    
  • //同步超时发送消息
    SendResult send(final Message msg, final long timeout) throws MQClientException,
            RemotingException, MQBrokerException, InterruptedException;
    
  • //异步发送消息
    void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
            RemotingException, InterruptedException;
    
  • //异步超时发送消息
    void send(final Message msg, final SendCallback sendCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException;
    
  • //发送单向消息
    void sendOneway(final Message msg) throws MQClientException, RemotingException,
        InterruptedException;
    
  • //选择指定队列同步发送消息
    SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
        RemotingException, MQBrokerException, InterruptedException;
    
  • //选择指定队列异步发送消息
    void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
        throws MQClientException, RemotingException, InterruptedException;
    
  • //选择指定队列单项发送消息
    void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
        RemotingException, InterruptedException;
    
  • //批量发送消息
    SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;
    

属性介绍

源码分析-Producer

producerGroup:生产者所属组
createTopicKey:默认Topic
defaultTopicQueueNums:默认主题在每一个Broker队列数量
sendMsgTimeout:发送消息默认超时时间,默认3s
compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k
retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2,总共执行3次
retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2
retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false
maxMessageSize:允许发送的最大消息长度,默认为4M

启动流程

源码分析-Producer

代码:DefaultMQProducerImpl#start

//检查生产者组是否满足要求
this.checkConfig();
//更改当前instanceName为进程ID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
    this.defaultMQProducer.changeInstanceNameToPID();
}
//获得MQ客户端实例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表

ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>();

同一个clientId只会创建一个MQClientInstance。

MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道

代码:MQClientManager#getAndCreateMQClientInstance

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, 
                                                     RPCHook rpcHook) {
    //构建客户端ID
    String clientId = clientConfig.buildMQClientId();
    //根据客户端ID或者客户端实例
    MQClientInstance instance = this.factoryTable.get(clientId);
    //实例如果为空就创建新的实例,并添加到实例表中
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

代码:DefaultMQProducerImpl#start

//注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
        null);
}
//启动生产者
if (startFactory) {
    mQClientFactory.start();
}

消息发送

源码分析-Producer

代码:DefaultMQProducerImpl#send(Message msg)

//发送消息
public SendResult send(Message msg) {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

代码:DefaultMQProducerImpl#send(Message msg,long timeout)

//发送消息,默认超时时间为3s
public SendResult send(Message msg,long timeout){
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

代码:DefaultMQProducerImpl#sendDefaultImpl

//校验消息
Validators.checkMessage(msg, this.defaultMQProducer);

验证消息

代码:Validators#checkMessage

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
    throws MQClientException {
    //判断是否为空
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // 校验主题
    Validators.checkTopic(msg.getTopic());
		
    // 校验消息体
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }

    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }

    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

查找路由

代码:DefaultMQProducerImpl#tryToFindTopicPublishInfo

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //从缓存中获得主题的路由信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    //路由信息为空,则从NameServer获取路由
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //如果未找到当前主题的路由信息,则用默认主题继续查找
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

源码分析-Producer

代码:TopicPublishInfo

public class TopicPublishInfo {
    private boolean orderTopic = false;	//是否是顺序消息
    private boolean haveTopicRouterInfo = false; 
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();	//该主题消息队列
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();//每选择一次消息队列,该值+1
    private TopicRouteData topicRouteData;//关联Topic路由元信息
}

代码:MQClientInstance#updateTopicRouteInfoFromNameServer

TopicRouteData topicRouteData;
//使用默认主题从NameServer获取路由信息
if (isDefault && defaultMQProducer != null) {
    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
        1000 * 3);
    if (topicRouteData != null) {
        for (QueueData data : topicRouteData.getQueueDatas()) {
            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
            data.setReadQueueNums(queueNums);
            data.setWriteQueueNums(queueNums);
        }
    }
} else {
    //使用指定主题从NameServer获取路由信息
    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
//判断路由是否需要更改
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
    changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
    log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) {
    //将topicRouteData转换为发布队列
    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
    publishInfo.setHaveTopicRouterInfo(true);
    //遍历生产
    Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, MQProducerInner> entry = it.next();
        MQProducerInner impl = entry.getValue();
        if (impl != null) {
            //生产者不为空时,更新publishInfo信息
            impl.updateTopicPublishInfo(topic, publishInfo);
        }
    }
}

代码:MQClientInstance#topicRouteData2TopicPublishInfo

public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
    	//创建TopicPublishInfo对象
        TopicPublishInfo info = new TopicPublishInfo();
    	//关联topicRoute
        info.setTopicRouteData(route);
    	//顺序消息,更新TopicPublishInfo
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
            String[] brokers = route.getOrderTopicConf().split(";");
            for (String broker : brokers) {
                String[] item = broker.split(":");
                int nums = Integer.parseInt(item[1]);
                for (int i = 0; i < nums; i++) {
                    MessageQueue mq = new MessageQueue(topic, item[0], i);
                    info.getMessageQueueList().add(mq);
                }
            }

            info.setOrderTopic(true);
        } else {
            //非顺序消息更新TopicPublishInfo
            List<QueueData> qds = route.getQueueDatas();
            Collections.sort(qds);
            //遍历topic队列信息
            for (QueueData qd : qds) {
                //是否是写队列
                if (PermName.isWriteable(qd.getPerm())) {
                    BrokerData brokerData = null;
                    //遍历写队列Broker
                    for (BrokerData bd : route.getBrokerDatas()) {
                        //根据名称获得读队列对应的Broker
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                        brokerData = bd;
                        break;
                    }
                }

                if (null == brokerData) {
                    continue;
                }

                if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                    continue;
                }
				//封装TopicPublishInfo写队列
                for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    info.getMessageQueueList().add(mq);
                }
            }
        }

        info.setOrderTopic(false);
    }
	//返回TopicPublishInfo对象
    return info;
}

选择队列

  • 默认不启用Broker故障延迟机制

代码:TopicPublishInfo#selectOneMessageQueue(lastBrokerName)

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //第一次选择队列
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        //sendWhichQueue
        int index = this.sendWhichQueue.getAndIncrement();
        //遍历消息队列集合
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            //sendWhichQueue自增后取模
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            //规避上次Broker队列
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        //如果以上情况都不满足,返回sendWhichQueue取模后的队列
        return selectOneMessageQueue();
    }
}

代码:TopicPublishInfo#selectOneMessageQueue()

//第一次选择队列
public MessageQueue selectOneMessageQueue() {
    //sendWhichQueue自增
    int index = this.sendWhichQueue.getAndIncrement();
    //对队列大小取模
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    //返回对应的队列
    return this.messageQueueList.get(pos);
}

启用Broker故障延迟机制

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //Broker故障延迟机制
    if (this.sendLatencyFaultEnable) {
        try {
            //对sendWhichQueue自增
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            //对消息队列轮询获取一个队列
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //验证该队列是否可用
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    //可用
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }
			//从规避的Broker中选择一个可用的Broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            //获得Broker的写队列集合
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                //获得一个队列,指定broker和队列ID并返回
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

源码分析-Producer

  • 延迟机制接口规范
public interface LatencyFaultTolerance<T> {
    //更新失败条目
    void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
	//判断Broker是否可用
    boolean isAvailable(final T name);
	//移除Fault条目
    void remove(final T name);
	//尝试从规避的Broker中选择一个可用的Broker
    T pickOneAtLeast();
}
  • FaultItem:失败条目
class FaultItem implements Comparable<FaultItem> {
    //条目唯一键,这里为brokerName
    private final String name;
    //本次消息发送延迟
    private volatile long currentLatency;
    //故障规避开始时间
    private volatile long startTimestamp;
}
  • 消息失败策略
public class MQFaultStrategy {
   //根据currentLatency本地消息发送延迟,从latencyMax尾部向前找到第一个比currentLatency小的索引,如果没有找到,返回0
	private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    //根据这个索引从notAvailableDuration取出对应的时间,在该时长内,Broker设置为不可用
	private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
}

原理分析

代码:DefaultMQProducerImpl#sendDefaultImpl

sendResult = this.sendKernelImpl(msg, 
                                 mq, 
                                 communicationMode, 
                                 sendCallback, 
                                 topicPublishInfo, 
                                 timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

如果上述发送过程出现异常,则调用DefaultMQProducerImpl#updateFaultItem

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    //参数一:broker名称
    //参数二:本次消息发送延迟时间
    //参数三:是否隔离
    this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

代码:MQFaultStrategy#updateFaultItem

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        //计算broker规避的时长
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        //更新该FaultItem规避时长
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

代码:MQFaultStrategy#computeNotAvailableDuration

private long computeNotAvailableDuration(final long currentLatency) {
    //遍历latencyMax
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        //找到第一个比currentLatency的latencyMax值
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }
    //没有找到则返回0
    return 0;
}

代码:LatencyFaultToleranceImpl#updateFaultItem

public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    //获得原FaultItem
    FaultItem old = this.faultItemTable.get(name);
    //为空新建faultItem对象,设置规避时长和开始时间
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        //更新规避时长和开始时间
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}

发送消息

消息发送API核心入口DefaultMQProducerImpl#sendKernelImpl***

private SendResult sendKernelImpl(
    final Message msg,	//待发送消息
    final MessageQueue mq,	//消息发送队列
    final CommunicationMode communicationMode,		//消息发送内模式
    final SendCallback sendCallback,	pp	//异步消息回调函数
    final TopicPublishInfo topicPublishInfo,	//主题路由信息
    final long timeout	//超时时间
    )

代码:DefaultMQProducerImpl#sendKernelImpl

//获得broker网络地址信息
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
    //没有找到从NameServer更新broker网络地址信息
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
//为消息分类唯一ID
if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}

boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
    topicWithNamespace = true;
}
//消息大小超过4K,启用消息压缩
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
    msgBodyCompressed = true;
}
//如果是事务消息,设置消息标记MessageSysFlag.TRANSACTION_PREPARED_TYPE
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
//如果注册了消息发送钩子函数,在执行消息发送前的增强逻辑
if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }

    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
}

代码:SendMessageHook

public interface SendMessageHook {
    String hookName();

    void sendMessageBefore(final SendMessageContext context);

    void sendMessageAfter(final SendMessageContext context);
}

代码:DefaultMQProducerImpl#sendKernelImpl

//构建消息发送请求包
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
//主题
requestHeader.setTopic(msg.getTopic());
//默认创建主题Key
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//该主题在单个Broker默认队列树
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
//队列ID
requestHeader.setQueueId(mq.getQueueId());
//消息系统标记
requestHeader.setSysFlag(sysFlag);
//消息发送时间
requestHeader.setBornTimestamp(System.currentTimeMillis());
//消息标记
requestHeader.setFlag(msg.getFlag());
//消息扩展信息
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
//消息重试次数
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
//是否是批量消息等
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }

    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}
case ASYNC:		//异步发送
    Message tmpMessage = msg;
    boolean messageCloned = false;
    if (msgBodyCompressed) {
        //If msg body was compressed, msgbody should be reset using prevBody.
        //Clone new message using commpressed message body and recover origin massage.
        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
        tmpMessage = MessageAccessor.cloneMessage(msg);
        messageCloned = true;
        msg.setBody(prevBody);
    }

    if (topicWithNamespace) {
        if (!messageCloned) {
            tmpMessage = MessageAccessor.cloneMessage(msg);
            messageCloned = true;
        }
        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), 
                                                    this.defaultMQProducer.getNamespace()));
    }

		long costTimeAsync = System.currentTimeMillis() - beginStartTime;
		if (timeout < costTimeAsync) {
		    throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
		}
		sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
        			brokerAddr,
        			mq.getBrokerName(),
        			tmpMessage,
        			requestHeader,
        			timeout - costTimeAsync,
        			communicationMode,
        			sendCallback,
        			topicPublishInfo,
        			this.mQClientFactory,
        			this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
        			context,
        			this);
    	break;
case ONEWAY:
case SYNC:		//同步发送
    long costTimeSync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}
case ASYNC:		//异步发送
    Message tmpMessage = msg;
    boolean messageCloned = false;
    if (msgBodyCompressed) {
        //If msg body was compressed, msgbody should be reset using prevBody.
        //Clone new message using commpressed message body and recover origin massage.
        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
        tmpMessage = MessageAccessor.cloneMessage(msg);
        messageCloned = true;
        msg.setBody(prevBody);
    }

    if (topicWithNamespace) {
        if (!messageCloned) {
            tmpMessage = MessageAccessor.cloneMessage(msg);
            messageCloned = true;
        }
        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), 
                                                    this.defaultMQProducer.getNamespace()));
    }

		long costTimeAsync = System.currentTimeMillis() - beginStartTime;
		if (timeout < costTimeAsync) {
		    throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
		}
		sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
        			brokerAddr,
        			mq.getBrokerName(),
        			tmpMessage,
        			requestHeader,
        			timeout - costTimeAsync,
        			communicationMode,
        			sendCallback,
        			topicPublishInfo,
        			this.mQClientFactory,
        			this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
        			context,
        			this);
    	break;
case ONEWAY:
case SYNC:		//同步发送
    long costTimeSync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}
//如果注册了钩子函数,则发送完毕后执行钩子函数
if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}

批量消息发送

源码分析-Producer

批量消息发送是将同一个主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息总长度不能超过DefaultMQProducer#maxMessageSize。

批量消息发送要解决的问题是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容。

代码:DefaultMQProducer#send

public SendResult send(Collection<Message> msgs) 
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //压缩消息集合成一条消息,然后发送出去
    return this.defaultMQProducerImpl.send(batch(msgs));
}

代码:DefaultMQProducer#batch

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
    MessageBatch msgBatch;
    try {
        //将集合消息封装到MessageBatch
        msgBatch = MessageBatch.generateFromList(msgs);
        //遍历消息集合,检查消息合法性,设置消息ID,设置Topic
        for (Message message : msgBatch) {
            Validators.checkMessage(message, this);
            MessageClientIDSetter.setUniqID(message);
            message.setTopic(withNamespace(message.getTopic()));
        }
        //压缩消息,设置消息body
        msgBatch.setBody(msgBatch.encode());
    } catch (Exception e) {
        throw new MQClientException("Failed to initiate the MessageBatch", e);
    }
    //设置msgBatch的topic
    msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
    return msgBatch;
}
上一篇:Flink发送Kafka报错


下一篇:一文彻底搞懂 Flink 网络流控与反压机制