Rocketmq-顺序消息底层原理

Rocketmq只能保证局部顺序,不能保证全局顺序消息

在RocketMQ中提供了基于队列(分区)的顺序消费。

 

RocketMQ中顺序性主要指的是消息顺序消费。RocketMQ 中每一个消费组一个单独的线程池并发消费拉取到的消息,即消费端是多线程消费。而顺序消费的并发度等于该消费者分配到的队列数。

RokcetMQ的完成顺序性主要是由3把琐来实现的。下图是RocketMQ顺序消费的工作原理:

 Rocketmq-顺序消息底层原理

1、消费端在启动时首先会进行队列负载机制,遵循一个消费者可以分配多个队列,但一个队列只会被一个消费者消费的原则。

2、消费者根据分配的队列,向 Broker 申请琐,如果申请到琐,则拉取消息,否则放弃消息拉取,等到下一个队列负载周期(20s)再试。

public void start() {
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
//                    锁定消费队列=》
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
         }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    }
}
try{
//      批量锁定消息队列=》
        Set<MessageQueue> lockOKMQSet =
        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
        for (MessageQueue mq : lockOKMQSet) {
            ProcessQueue processQueue = this.processQueueTable.get(mq);
            if (processQueue != null) {
                if (!processQueue.isLocked()) {
                    log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
                }
//      锁定处理队列
                processQueue.setLocked(true);
                processQueue.setLastLockTimestamp(System.currentTimeMillis());
            }
        }
}

3、拉取到消息后会在消费端的线程池中进行消费,但消费的时候,会对消费队列进行加锁,即同一个消费队列中的多条消息会串行执行

class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;


        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }


        @Override
        public void run() {
            if (this.processQueue.isDroped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}",
                    this.messageQueue);
                return;
            }

            // 保证在当前Consumer内,同一队列串行消费
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                // 保证在Consumer集群,同一队列串行消费
                if (MessageModel.BROADCASTING
                    .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    final long beginTime = System.currentTimeMillis();
                    for (boolean continueConsume = true; continueConsume;) {
                        if (this.processQueue.isDroped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}",
                                this.messageQueue);
                            break;
                        }

                        if (MessageModel.CLUSTERING
                            .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
                                .messageModel())
                                && !this.processQueue.isLocked()) {
                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
                                this.processQueue, 10);
                            break;
                        }

                        if (MessageModel.CLUSTERING
                            .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
                                .messageModel())
                                && this.processQueue.isLockExpired()) {
                            log.warn("the message queue lock expired, so consume later, {}",
                                this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
                                this.processQueue, 10);
                            break;
                        }

                        // 在线程数小于队列数情况下,防止个别队列被饿死
                        long interval = System.currentTimeMillis() - beginTime;
                        if (interval > MaxTimeConsumeContinuously) {
                            // 过10ms后再消费
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue,
                                messageQueue, 10);
                            break;
                        }

                        final int consumeBatchSize =
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumer
                                    .getConsumeMessageBatchMaxSize();

                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                        if (!msgs.isEmpty()) {
                            final ConsumeOrderlyContext context =
                                    new ConsumeOrderlyContext(this.messageQueue);

                            ConsumeOrderlyStatus status = null;

                            // 执行Hook
                            ConsumeMessageContext consumeMessageContext = null;
                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                consumeMessageContext = new ConsumeMessageContext();
                                consumeMessageContext
                                    .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer
                                        .getConsumerGroup());
                                consumeMessageContext.setMq(messageQueue);
                                consumeMessageContext.setMsgList(msgs);
                                consumeMessageContext.setSuccess(false);
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
                                    .executeHookBefore(consumeMessageContext);
                            }

                            long beginTimestamp = System.currentTimeMillis();
                            //处理队列加锁
                            try {
                                this.processQueue.getLockConsume().lock();
                                if (this.processQueue.isDroped()) {
                                    log.warn(
                                        "consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }

                                status =
                                        messageListener.consumeMessage(Collections.unmodifiableList(msgs),
                                            context);
                            }
                            catch (Throwable e) {
                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",//
                                    RemotingHelper.exceptionSimpleDesc(e),//
                                    ConsumeMessageOrderlyService.this.consumerGroup,//
                                    msgs,//
                                    messageQueue);
                            }
                            finally {
                                this.processQueue.getLockConsume().unlock();
                            }

                            // 针对异常返回代码打印日志
                            if (null == status //
                                    || ConsumeOrderlyStatus.ROLLBACK == status//
                                    || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                                log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",//
                                    ConsumeMessageOrderlyService.this.consumerGroup,//
                                    msgs,//
                                    messageQueue);
                            }

                            long consumeRT = System.currentTimeMillis() - beginTimestamp;

                            // 用户抛出异常或者返回null,都挂起队列
                            if (null == status) {
                                status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                            }

                            // 执行Hook
                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                consumeMessageContext.setStatus(status.toString());
                                consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status
                                        || ConsumeOrderlyStatus.COMMIT == status);
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
                                    .executeHookAfter(consumeMessageContext);
                            }

                            // 记录统计信息
                            ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(
                                ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(),
                                consumeRT);

                            continueConsume =
                                    ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status,
                                        context, this);
                        }
                        else {
                            continueConsume = false;
                        }
                    }
                }
                // 没有拿到当前队列的锁,稍后再消费
                else {
                    if (this.processQueue.isDroped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}",
                            this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
                        this.processQueue, 100);
                }
            }
        }


        public ProcessQueue getProcessQueue() {
            return processQueue;
        }


        public MessageQueue getMessageQueue() {
            return messageQueue;
        }
    }

4、在消费的过程中,会对处理队列(ProccessQueue)进行加锁,保证处理中的消息消费完成,发生队列负载后,其他消费者才能继续消费。

最后一把琐有什么用呢?

例如队列 q3 目前是分配给消费者C2进行消费,已将拉取了32条消息在线程池中处理,然后对消费者进行了扩容,分配给C2的q3队列,被分配给C3了,由于C2已将处理了一部分,位点信息还没有提交,如果C3立马去消费q3队列中的消息,那存在一部分数据会被重复消费,故在C2消费者在消费q3队列的时候,消息没有消费完成,那负载队列就不能丢弃该队列,就不会在broker端释放琐,其他消费者就无法从该队列消费,尽最大可能保证了消息的重复消费,保证顺序性语义。


 

上一篇:mmkv原理,字节跳动+京东+美团+腾讯面试总结,威力加强版


下一篇:javascript 实现原生下载的各种情况