rocketmq 精华

(ps:)通过本人语雀文档阅读体验更好哦--有目录

介绍

rocket mq 翻译成中文就是火箭消息队列,从名字就可以看出来,它是一个很快的消息队列... rocket mq 是 阿里巴巴研制的后面贡献给 apache 基金会,其设计思想很多都是来自 kafka,所以和 kafka 有不少类似的地方,但是也是有很多 kafka 没有的新特性,比如:广播消费(这个其实 kafka 也是可以通过设置消费组来实现,但是 rocket mq 比较方便)、延迟消费、多线程消费、拥有自己的 nameservice 服务器等具体的看下表。

消息产品 客户端SDK 协议和规范 有序消息 延迟消息 批量消息 广播消息 消息过滤器 服务器触发重新投递 消息存储 消息追溯 高可用性和故障转移 消息跟踪 配置 管理和操作工具
kafka Java、Scala 等 拉模式,支持TCP 确保分区内的消息排序 不支持 支持,带有异步生产者 不支持 支持,可以使用Kafka Streams过滤消息 不支持 高性能文件存储 支持的偏移指示 支持,需要 ZooKeeper 服务器 不支持 Kafka 使用键值对格式进行配置。这些值可以从文件或以编程方式提供。 支持,使用终端命令公开核心指标
火箭MQ Java、C++、Go 拉模型,支持TCP、JMS、OpenMessaging 确保消息的严格排序,并可以优雅地横向扩展 支持的 支持,同步模式避免消息丢失 支持的 支持,基于 SQL92 的属性过滤器表达式 支持的 高性能和低延迟的文件存储 支持的时间戳和偏移量两个表示 受支持的主从模型,无需其他套件 支持的 开箱即用,用户只需要注意几个配置 支持的、丰富的网络和终端命令来公开核心指标

整体架构

技术架构

rocketmq 精华

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。

  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证

部署架构

rocketmq 精华

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。

  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

集群工作流程

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

一些概念

  • 生产者:消息发送方

  • 生产者组:同一类Producer的集合

  • topic :一类消息的集合,可以根据业务场景来指定 topic

  • tag:一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用*设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤。

  • keys:每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题,应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。

  • broker:消息中转角色,负责存储消息、转发消息。

  • queue(队列):一个逻辑概念,通过 queue 来对消费者进行消息的并发消费

  • NameServer:名称服务器,功能类似 kafka 中 zk 中代表的角色。名称服务充当路由消息的提供者。

  • consumer:消费者,消费消息

  • consumer group:消费者组,同一类消费者,消费相同的 topic 信息

生产者

和大多数的消息中间件一样,生产者就是消息发送方,事件触发后,生产者将消息发送到 mq 服务器,以便消费组进行消息消费处理。

rocket mq 消息发送有三种方式

  • 同步发送

  • 异步发送

  • 单向发送

前两者消息发送是可靠的,会有消息应答和重试,单向发送没有应答也没有重试机制,消息可能会丢失。

Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上(会定时拉取这些元素据信息),轮询(默认是轮询,但是可以在代码中指定queue选择策略)从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

消息发送的时候可以通过指定 tag 来区分具体的场景,便于消费者指定消费哪些 tag。可以通过指定 key 来检索消息。

同步发送

消息是发送到 broker 上的,reocket mq 有个队列的概念,类似于 kafka 的分区,但是和分区这种物理概念不同,队列是个逻辑概念,这里先把它当成分区来理解即可。

消息是发送到 broker 上的队列的,由于一个 topic 可能有多个队列,因此由负载均衡策略或者自己指定的策略发送到特定的队列上。

public class SyncProducer {
	public static void main(String[] args) throws Exception {
    	// 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 设置NameServer的地址
    	producer.setNamesrvAddr("localhost:9876");
    	// 启动Producer实例
        producer.start();
    	for (int i = 0; i < 100; i++) {
    	    // 创建消息,并指定Topic,Tag和消息体
    	    Message msg = new Message("TopicTest" /* Topic */,
        	"TagA" /* Tag */,
        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
    	}
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    }
}

发送失败策略

对于普通消息,消息发送默认采用轮询策略来选择所发送到的队列,如果发送失败,默认重试 2 次,但是重试时会选择其他 broker,不会选择之前失败的那台 broker,当然,若只有一个 broker,也只能发送到这台 broker 了,但是会尽量发送到该 broker 上的其他 queue。

如果超过重试次数,则抛出异常,由程序员保证消息不丢,当然当生产者出现 RemotingException、MQClientException 和 MQBrokerException时,Producer 会自动重投消息,重投消息可能会导致消息发送重复,这是不可避免的。

以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用 send 同步方法发送失败时,则尝试将消息存储到 db,然后由后台线程定时重试,确保消息一定到达 Broker。

保证

rocket mq 通过同步消息发送可以保证消息不丢,但是无法保证消息不重复,如果对消息重复有要求的在消费的时候需要做幂等处理。这也是 rocket mq 整体的保证:我可以不丢消息,但是消息可能会重复。

异步发送

默认 send(msg) 将阻塞,直到返回响应。因此,如果您关心性能,我们建议您使用以异步方式运行的 send(msg, callback)。异步也可以获取响应。

public class AsyncProducer {
	public static void main(String[] args) throws Exception {
    	// 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
    	// 启动Producer实例
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
	
	int messageCount = 100;
        // 根据消息数量实例化倒计时计算器
	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
    	for (int i = 0; i < messageCount; i++) {
                final int index = i;
            	// 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback接收异步返回结果的回调
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
      	                System.out.printf("%-10d Exception %s %n", index, e);
      	                e.printStackTrace();
                    }
            	});
    	}
	// 等待5s
	countDownLatch.await(5, TimeUnit.SECONDS);
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    }
}

失败重试

异步发送失败重试时,不会选择其他 broker,仅在同一台 broker 上重试,所以该策略无法保证消息不丢。

单向发送

单向发送一般是用于不关心发送是否成功的场景,单项发送无法获取响应,也不进行重试,常用于日志发送场景,失败了也不会造成什么影响。

public class OnewayProducer {
	public static void main(String[] args) throws Exception{
    	// 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
    	// 启动Producer实例
        producer.start();
    	for (int i = 0; i < 100; i++) {
        	// 创建消息,并指定Topic,Tag和消息体
        	Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 发送单向消息,没有任何返回结果
        	producer.sendOneway(msg);

    	}
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    }
}

NameServer

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个Nameserver 实例组成集群,但相互独立,没有信息交换。

NameService 是无状态的,互相没有通信,所以也就没有所谓的 leader 和 follower 的概念,非要有的话,那就是每台实例都是 leader,都可以提供服务。

工作方式

所有 broker 服务器都需要和每个 NameService 维持一个长连接,定时发送心跳,NameServer 维护着所有 broker 信息。生产者和消费组客户端也会和其中某一台 NameServer 建立一个长连接,定时获取最新的 broker 信息。

客户端 NameServer 选择策略

客户端首先会选择一个随机数,然后对 NameServer 节点数取模,得到的就是要连接的节点索引,如果连接失败,就采用轮询策略,去尝试连接其他节点。

存在的问题

虽然使用 NameServer 服务器而不适用 zk 可以降低对外部系统的耦合度,并且一台服务器既可以是 NameServer 服务器,也可以是 broker。但是由于 NameServer 是无状态的,互相没有消息同步,那么在某一个瞬间可能会导致彼此信息不一致的情况。但是最终信息是会一致的。

因为是无状态的,因此 NameServer 扩容的时候必须在客户端配置中把扩容的机器的地址新增上,可以说扩容既方便又麻烦。

Broker

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

模块

  1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。

  2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息

  3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。

  4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。

  5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

rocketmq 精华

消息存储

rocketmq 精华

(图一)

正常来说,我们 broker 是部署多台的,以便 broker 的负载均衡,降低压力。多台 broker 之间是怎么分配消息的呢,这个和 kafka 差不多,是按照队列来的,如下图。有一个 broker 集群,有两台 broker master,一共有四个队列,每台 broker master 分配两个队列,生产者根据发送方负载均衡策略发送到指定的队列上。这里的 q1,q2,q3,q4 其实是个逻辑概念,并没有存储真正的数据,他们就是我们下面要将的 consume queue。真正数据其实是存储在 commit log 里。

rocketmq 精华

(图二)

commit log

到现在为止,我们一直在说消息是生产者发送到队列,消费者消费队列里的消息,会让我们误以为消息就是存储在队列里的,如果消息就是存储在队列里,那就和 kafka 每什么区别了,kafka 将消息存储在分区里。但其实,队列只是一个逻辑概念。消息实际上是存储在 commit log 文件里。

commit log 是消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

所有的消息,即使是不同的 topic 以及不同的队列,都是存储在 commit log里的,不做区分。那么问题来了,既然所有 topic 和队列的消息都存储在里面,消费者怎么知道怎么消费呢,难道 broker 要遍历所有消息,找到满足符合要求的数据然后推送给消费者吗,这样就太低效了,这时候就是 consume queue 登场的时候了。

consumeQueue

消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ 是基于主题 topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 topic 检索消息是非常低效的。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量offset,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topi c的commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue 文件采取定长设计,每一个条目共20个字节,分别为8字节的 commitlog 物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;

说白了 consume Queue 就是一个索引,用来定位具体消息的位置。

indexLog

index log 提供了一种可以通过key或时间区间来查询消息的方法。IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

消息存储总结

rocket mq 采用这种混合型的存储结构,主要是为了一个低延迟的读取,虽然 kafka 的顺序读也是很快的,但是 rocket mq 采用预读的形式将数据放入内存,对内存进行操作,会更快一些

弊端

这种存储架构有几个弊端:

  • 提交日志和消费队列需要在逻辑上保持一致,这给编程模型带来了额外的复杂性。
  • 每次都是通过 comsume queue 获取到消息在 commit log 文件的位置,会产生大量的随机读。

设计目的

但是官方也出了这样的设计目的,同时这也是为了保证 rocket mq 的低延迟和高可靠性:

  1. 随机阅读。尽量多读,提高页缓存命中率,减少读IO操作。所以大内存还是比较可取的。如果大量的消息堆积,读取性能会不会很差?答案是否定的,原因如下:
  • 即使消息大小只有1KB,系统也会提前读取更多数据。这意味着对于后续数据读取,将执行对主内存的访问,而不是慢速磁盘 IO 读取。
  • 从磁盘随机访问 CommitLog。如果在SSD的情况下将I/O调度器设置为NOOP,则读取qps将大大加速。

预读应该是很多针对读频繁场景的正常操作,

  1. 鉴于 ConsumeQueue 仅存储固定大小的元数据,主要用于记录消费进度,因此很好地支持随机读取。利用page cache prefetch,访问 ConsumeQueue 和访问主存一样高效,即使是在海量消息堆积的情况下。因此,ConsumeQueue 不会对读取性能带来明显的损失。
  2. CommitLog 存储几乎所有信息,包括消息数据。类似于关系型数据库的redo log,只要commit log存在,消费队列、消息键索引等所有需要的数据都可以完全恢复。

消息刷盘

rocketmq 精华

同步刷盘

同步刷盘是消息写入到磁盘后才会给生产者返回 ack 消息,这种刷盘模式对可靠性来讲是不错的保障,但是对效率来说就比较低了。

异步刷盘

异步刷盘是只要消息写入PageCache即可将成功的ACK返回给Producer端,又后台线程进行 pageCache 刷到磁盘。这种降低了读写延迟,提高了吞吐量,但是如果消息没有即使刷入磁盘,机房断电了,消息会丢失。

消息过滤

rocket mq 可以针对 tag 和 sql 表达式进行过滤,由于 consume queue 存储着 tag 的 hash code,因此可以直接在 queue 这里进行 tag 过滤,而无需进入到 commit log,但是可能会存在 hash 值一致的情况,因此拿到具体消息后还是要进行一遍过滤。

具体的消息过滤细节这里就不详细说明了(因为我不太关心,所以没有深入学习...)

消息筛选

前面我们说过有个索引文件 index log 以及发送的时候可以指定 key,这个就是为消息筛选做的一些准备,有兴趣的可以自己看官方文档

事务机制

rocket mq 也支持事务的,但是我感觉使用场景不多。前面的 produce group 其实就是为事务服务的,当某一台生产者挂掉了,broker 会通知相同的 group 的其他生产者进行回滚,有兴趣的自己翻阅下官方文档。

消费者

消费者无非就是消费谁,怎么消费,怎样才算消费完成,接下来也将从这几个方面进行介绍。

消费谁

前面其实已经有了很多铺垫,这里再啰嗦一下。消费的时候从 NameService 服务器获取到 broker ip 列表,然后获取到自己要消费的 topic 队列所在的 broker 的地址。根据 broker 内存储的偏移量,从偏移量 + 1的位置开始拉取消费。

因此,消费者是以队列为基本单位进行消费的

怎么消费

public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {

    	// 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

    	// 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");

    	// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("TopicTest", "*");
    	// 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
	}
}

消费者组

和 kafka 一样,rocket mq 也有消费组的概念,一个消费组负责一起消费一个 topic 的所有消息,如果是集群消费,每个消费者负责 n 个队列。也就是队列和消费者是多对1的状态,一个消费者可以消费多个队列,一个队列只能被一个消费者消费。当然,如果消费组里的消费者数量大于队列的数量,那么就有一些消费者闲置,没有消息可以消费。如果是广播消费,那么消费组里的所有消费者都全量消费所有队列消息。

消费线程

和 kafka 不同的是,kafka 的消费并发度最高等于分区数,假如有20个分区,那么最多只能有20个线程一起消费。 rocket mq 不同,它可以指定一个消费者开多个线程去消费,因此并发度最高等于一个消费者的线程数 * 队列数。

这也是 rocket mq 的一个卖点,消费速度更快了。

同时,当消息挤压的时候不一定非要扩容,只需要增加消费者的线程数,当线程数到达一个瓶颈了,再进行扩容,变相的节省了资源。

消费方式

rocket mq 的消费方式有两种,广播消费和集群消费,集群消费就是一个消费者负责n个队列,广播消费就是每个消费者都全量消费所有队列消息。

和 kafka 不同的是,kafka 的集群消费需要指定多个消费者组,每个消费者组一个消费者,但是 rocket mq 支持一个消费者组里的所有消费者都全量消费 topic 消息,这样也更方便许多,无需申请多个消费组。

广播消费

广播消费就是一个消费组里的每个消费者都全量消费 topic 信息。由于每个消费者的消费进度可能都不一样,消费者之间互不关心各自的消费进度,因此,消费进度,也就是消息的偏移量保存在每台消费者自己身上。

集群消费

集群消费是一个消费组里的消费者各自负责几个队列,消费者数量如果大于队列数量,那么会造成有几个消费者被闲置,因此最好的情况下就是消费组里的消费者数量与队列保持一致。

集群消费有种情况就是消费过程中,可能组里来了新的消费者,或者有消费者离开了,那么就需要进行再均衡,也就是我们常说的 rebalence,就需要给组里的消费者重新分配负责的队列。那么每个消费者就需要知道队列里的消息被消费到哪里了,以前消费过的消息就不再消费了,直接消费还没消费的。这时候消费进度,也就是队列偏移量就不能保存在本地了,因为如果你挂了,其他消费者不知道你消费到哪。所以消费进度就需要保存在 broker 上。具体的消息格式是map<topic&consumergroup,map<queue,offset>>,就是一个map嵌套map,外层map key 为 topic 与 消费组的结合,value为内层map,内层 map key 为特定队列,value 为消费到的偏移量。整个map代表,某个 topic 的各个队列被某个消费组消费到了哪里。通过这个消息,当组里有消费者加入或者离开后,队列重新分配的时候,消费者就知道要从哪里开始消费了。

rebalance

rebalabce 也就是再均衡

触发时机
  • 某个主题的队列数量发生了变化
  • 某个消费组的消费者数量发生了变化
影响
  • 消费暂停:在再均衡的那一瞬间,消费者是没有拉取消息进行消费的

  • 消息重复:如果采用异步提交偏移量的方式,可能在再均衡前提交的偏移量丢失了,这时候在均衡后,新的消费者在消费对应的队列时就会重上一次成功提交的偏移量处开始消费,导致消息重复

  • 消息峰刺:由于再均衡的时候消费是暂停的,所以消息会积压一点,再均衡后,消费压力就大了些,会出现峰刺的情况

如何再均衡

当队列数量改变或者消费组里的消费者数量改变的时候,由于 broker 内部维护着多个topic、消费组、消费者、队列关系的数据结构,因此 broker 服务器能够感知到这个变化,感知到变化后,会通知消费者,消费者拿到最新的队列元素据后,自己会采用 queue 分配算法计算得出自己要消费的队列,然后开始消费。

与 kafka 的不同

kafka 每个消费组都有一个 broker 负责,这个 broker 称为分区协调器,每个消费组可以由不同的 broker 负责。消费组里还有个概念叫组长,组长往往都是最新加入消费组的那个消费者。当分区数量发生变化或者消费组的消费者数量发生变化的时候,分区协调器会把最新的分区和消费者信息告诉组长,由组长计算得出每个消费者应该负责的分区。然后告诉协调器,协调器再下发给每个消费者,告诉他们应该再均衡了,要开始消费新分区了。

因此,再均衡 kafka 和 rocket mq 最大的不同就是一个是由组长计算得出,一个是自己计算自己的。

怎样才算消费完成

同步提交偏移量

同步提交偏移量也就是只有当当前的消息偏移量成功提交后,才会拉取下一批消息进行消费,会重试

异步提交偏移量

异步提交偏移量是直接发送消息偏移量,不关心是否提交成功,直接拉取下一批消息。

异步提交可能会导致消费重复消息。当异步提交后,broker 由于各种原因,没有收到偏移量,这时候如果发生了再均衡,消息会从上一次提交的偏移量处开始消费,导致消息重复。

但是异步提交的方式消费速度更快,性能更高。

消费总结

消费方要注意的其实就是三个方面:

  • 提前计算好需要开几台消费者,每台消费者要开几个线程进行消费,避免后期的扩缩容导致分区再均衡

  • 如果对消息重复有要求的话处理逻辑需要做好消息幂等

  • 提交偏移量需要根据自己的业务场景,如果不能容忍消息重复,那就同步提交,如果可以容忍消息重复那就异步提交

延迟消息

如果对延迟消息有兴趣的话可以自己看官网,这里不做太多的描述

消息事务

rocket mq 对事务的支持是采用的 XA,如果对消息事务有兴趣的话可以自己看官网,这里不做太多描述

顺序消息

顺序消息分为两种:全局顺序和队列顺序

全局顺序

全局顺序是采用一个 topic 一个 queue 实现的,这种可以保证严格的消息生产和消费顺序,由于只有一个 queue 因此只能单个消费者消费。如果要保证严格的消费顺序,也只能开一个线程消费。

这里需要注意的点是,如果发送的是全局顺序消费,那么生产者不会进行消息重试。因为顺序发送必然是同步的,如果是异步的也就有可能在发送的时候是顺序发的,但是到达 broker 就不一定是按顺序到达了。前面也提到过,同步发送的重试会选择不同的 broker 和 queue,由于全局顺序只有一个 queue,所以也只能分布在一台 broker 上,所以不会进行消息重发。

队列顺序

队列顺序是天然就支持的,但是如果正常发送消费的话,消息还真不一定是按照真实的顺序进行存储和消费的。如果生产者采用的是异步发送,那么有可能同一个 queue,由于网络原因,后一条消息先于前一条消息存储进 queue 了,那就不是顺序了。如果消费的时候指定多线程消费,那也不能保证顺序了。

因此如果对队列中的消息有消费顺序要求的话,那最好就是发送的时候采用同步发送,消费的时候采用单线程消费。

即使这样,从整个 topic 视角来看,消息的消费也不是顺序的。由于有多个 queue,queueA的消息A可能在queueB的消息B之前发送的。但是由于有多个消费者分别消费不同的 queue,每台消费者的消费能力不同,会导致queueB的消息B先消费。

总结

整个 rocket mq 分为 四个大模块:生产者,消费者,broker 代理服务器,NameServer

生产者具有三种发送方式:同步发送,异步发送,单向发送,其中同步发送和异步发送有消息失败重试机制

消费者有可以同步消费,异步消费,也可以开多线程进行消费。消费并发度和线程数和消费者数有关。当然消费方式还可以分为广播消费和集群消费。当队列数或者消费者数量发生变化的时候,会产生再均衡机制。

broker 主要负责消息的存储,转发,过滤等。消息存储有 commit log 文件存储实际消息信息,consume queue 存储逻辑消费信息。index log 存储索引信息。它也负责给消费者提供信息,这里的信息包括实际的消费信息和偏移量等。

NameServer在 rocket mq 中的角色类似于 zk 在 kafka 中的角色,负责管理元信息。NameServer 虽然也是一个集群,但是每台服务器之间没有相互通讯,所有 broker 都和每台 NameServer 服务器建立长连接,定时发送心跳和自己负责的 topic 以及队列信息。consumer 和 producer 选一台 NameServer 服务器进行长连接,定时获取 broker 信息,以便指定自己要从哪台 broker 消费和发送消息到哪台 broker

每个模块都有同步和异步的方式,如果采用全同步的话,消息的保证行最高,可以不丢消息,消息重复数也会变少很多,但是全同步会导致性能和吞吐低,除非对消息有严格的要求不能丢失,并且对性能和吞吐要求不大,可以全同步。采用全异步的话吞吐量和性能是最高的,但是消息可能会丢,也会重复,所以对少量消息丢失没影响,但是对性能要求高的可以全异步。

其实还是得根据自己的业务场景选择具体的方式,看是注重性能,还是注重消息的保证。

ps

文章为本人学习过程中的一些个人见解,漏洞是必不可少的,希望各位大佬多多指教,帮忙修复修复漏洞!!!

参考资料

官方中文文档

官方文档

上一篇:Redis数据迁移工具redis-shake的使用


下一篇:RocketMQ 轻松入门