RocketMQ 轻松入门

1. 文档汇总

源码地址:https://github.com/apache/rocketmq

中文文档:https://github.com/apache/rocketmq/tree/master/docs/cn

商业版:https://www.aliyun.com/product/rocketmq

官网翻译:http://www.itmuch.com/books/rocketmq/

FAQ:http://rocketmq.apache.org/docs/faq/

RocketMQ常用管理命令:https://blog.csdn.net/gwd1154978352/article/details/80829534

RocketMQ默认配置:https://www.cnblogs.com/jice/p/11981107.html

2. 架构设计

RocketMQ 轻松入门

一般见到的架构图都是这个样子的,我们来看下各个角色的作用。

2.1 Broker

RocketMQ的服务,或者说一个进程,叫做Broker。Broker的作用是存储和转发消息。RocketMQ单机大约能承受10万QPS的请求。

RocketMQ 轻松入门

为了提升Broker的可用性(防止单点故障),以及提升服务器的性能(实现负载均衡),通常会做集群部署。

和Kafka一样,RocketMQ集群的每一个Broker节点保存总数据的一部分,因此可以实现横向扩展。

RocketMQ 轻松入门

为了提升可靠性(防止数据丢失),每个Broker都有自己的副本(slave)。

RocketMQ 轻松入门

默认情况下,读写都放在master上。在slaveReadEnable=true的情况下,slave也可以参与读负载。但是默认只有Broker=1的slave才会参与读负载,而且实在master消费慢的情况下,由whichBrokerWhenConsumeSlowly这个参数决定。

2.2 Topic

Topic用于将消息按主题做划分。比如订单消息、物流消息等。注意,跟Kafka不同的是,在RocketMQ中,topic是一个逻辑概念,消息不是按照Topic划分存储的。

RocketMQ 轻松入门

Producer 将消息发往指定的Topic,Consumer订阅这个Topic就可以收到相应的消息。

跟Kafka一样,如果topic不存在则自动创建。

private Boolean autoCreateTopicEnable=tru;

Topic 和生产者,消费者都是多对多的关系,一个生产者可以发送消息给多个topic,一个消费者也可以订阅多个topic。

2.3 NameServer

当不同的消息存储在不同的Broker上,生产者和消费者对于Broker的获取,或者说路由选择是一个非常关键的问题。

所以,和分布式服务中的注册中心一样,RocketMQ也需要一个角色来统一管理Broker的信息。

在Kafka里面,是使用zookeeper来统一管理的。但是,RocketMQ偏偏没有这么做,他实现了一个自己的服务——NameServer。

我们可以将其理解为RocketMQ的路由中心,每一个NameServer节点都保存着全量的路由信息,为了保证高可用,NameServer自身也可以做集群的部署。作用类似于Eureka或者Redis Sentinel.

也就是说,Broker会在NameServer上注册自己,Producer和Consumer用NameServer来发现Broker。

RocketMQ 轻松入门

2.3.1NameServer作为路由中心是怎么工作的呢?

每个Broker节点在启动时,都会根据配置遍历NameServer列表。

rocket/conf/broke.conf

namesrv=localhost:9876

与每个NameServer建立TCP长连接,注册自己的信息,之后每隔30S发送心跳信息(服务主动注册)。

如果Broker挂掉了,不发送心跳了,NameServer怎么发现呢?

所以除了主动注册,还有定时探活。每个NameServer每隔10S检查一下各个Broker的最近一次心跳时间,如果发现某个Broker超过120S没有发送心跳,就认为这个Broker已经挂掉了,会将其从路由信息里移除。

RocketMQ 轻松入门

既然都是注册中心,那为什么不使用Zookeeper呢?

实际上在早期的版本里,RocketMQ也采用的zookeeper,但是去掉了zookeeper依赖,选择采用自己的NameServer。

这因为RocketMQ的架构设计决定了只需要一个轻量级的元数据数据库就足够了,只需要保持最终一致即可,而不需要zookeeper这样的强一致性解决方案,不需要再依赖另一个中间件,从而减少维护成本。

根据著名的CAP理论:一致性,可用性,分区容错性。zookeeper选择了CP,而NameServer选择了AP,放弃了实时一致性。

2.3.2 一致性问题

既然NameServer之间是互不通信的,也没有主从之分,那他们是怎么保持一致性的?

我们从以下三点分析:

2.3.2.1 服务注册

如果新增了Broker,怎么新增到所有的NameServer中?

因为没有master,Broker每隔30秒会向所有的NameServer发送心跳消息,所以还是能保持一致的。

2.3.2.2 服务剔除

如果一个Broker挂了,怎么从所有的NameServer中移除信息?

  • 如果Broker正常关闭,Netty的通道关闭监听器会监听到连接断开事件,然后会将这个Broker信息剔除。
  • 如果Broker异常关闭:NameServer的定时任务会每10秒扫描Broker列表,如果某个Broker的心跳包最新时间戳超过当前时间120S就会被移除。

通过以上两点,不管Broker挂了,还是恢复了,增加了还是减少了,NameServer都能够保持数据一致。

2.3.2.3 路由发现

如果Broker的信息更新了(增加或者减少节点),客户端怎么获取最新的Broker列表呢?

先说生产者。发送第一条消息的时候,根据Topic从nameserver获取路由消息。

然后是消费者。消费者一般是订阅固定的Topic,在启动的时候就要获取Broker的信息。

这之后如果Broker信息变了之后该怎么办?

因为NameServer不会主动推送服务信息给客户端,客户端也不会发送心跳到NameServer,所以在建立连接之后,需要生产者和消费者定期更新。

我们可以通过查看源码的时候了解他是怎么操作的。

在MQClientInstance类的send方法中,启动了一个定时任务。

RocketMQ 轻松入门

其中第二个任务updateTopicRouteInfoFromNameServer方法,是用来定期更新NameServer信息的,默认是30S获取一次。

RocketMQ 轻松入门

2.3.2.4 总结

各个NameServer的数据通过主动注册和定时探活是能够保持数据一致的。而消费者和生产者是通过定期更新路由从而获取最新的信息。

问题1:如果Broker挂掉,客户端30秒以后才更新路由信息,那是不是会出现最多30秒的数据延迟?

答:由如下几个解决思路

  • 重试
  • 把无法连接的Broker隔离
  • 或者优先选择延迟小的节点,就能避免连接到容易挂的Broker了。

问题2:如果作为路由中心的NameServer全部挂掉了,而且暂时没有恢复呢?

答:这个也没问题,客户端会缓存Broker的信息,不完全依赖与NameServer。

2.4 Producer

生产者,用于生产消息,会定时从NameServer拉取路由信息(不用配置RocketMQ的服务地址),然后根据路由信息与指定的Broker建立TCP长连接,从而将消息发送到Broker中,发送逻辑一致的Producer可以组成一个Group。

RocketMQ的生产者同样支持批量发送,不过List要自己传过去。

RocketMQ 轻松入门

Producer写数据只能操作master节点。

2.5 Consumer

消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。消费逻辑一致的Consumer 可以组成一个Group,这时候消息会在Consumer之间负载。

由于Master和Slave都可以读取消息,因此Consumer 会与Master和Slave都建立连接。

注意:同一个consumer group内的消费者应该订阅同一个topic。或者反过来,消费不同topic的消费者不应该采用相同的consumer group名字。如果不一样,后面的消费者的订阅,会覆盖前面的订阅。

消费者有两种消费方式:一种是集群消费(消息轮询),一种是广播消费(全部收到相同副本)。

RocketMQ 轻松入门

RocketMQ 轻松入门

2.5.1 pull

从消费模型来说,RocketMQ支持 pull和push两种模式。

Pull模式是consumer轮询从broker拉取消息。实现类:DefaultMQPullConsumer(过时),替代类:DefaultLitePullConsumer。

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("my_test_consumer_group");

Pull有两种实现方式:一种是普通的轮询(Polling)。不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。

普通轮询的缺点:因为大部分时候没有数据,这些无效的请求会大大地浪费服务器的资源。而且定时请求的间隔过长的时候,会导致消息延迟。

RocketMQ的pull用长轮询来实现。

客户端发起Long Polling,如果此时服务端没有相关数据,会hold住请求,直到服务端有相关数据,或者等待一定时间超时才会返回。返回后,客户端又会立即再次发起下一次Long Polling (所谓的hold住请求指的服务端暂时不回复结果,保存相关请求,不关闭请求连接,等相关数据准备好,写回客户端)。

长轮询解决了轮询的问题,唯一的缺点是服务器在挂起的时候比较耗内存。

2.5.2 push

Push模式是 Broker推送消息给consumer,实现类:DefaultMQPushConsumer。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_test_consumer_group");

RocketMQ的 push 模式实际上是基于pull模式实现的,只不过是在 pull模式上封装了一层,所以RocketMQ push模式并不是真正意义上的“推模式”。

在RocketMQ中,PushConsumer 会注册MessageListener 监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

2.6 Message Queue

RocketMQ支持多master的架构。

思考一个这样的问题:当我们有多个master的时候,发往一个Topic的多条消息会在多个master的 Broker上存储。

那么,发往某一个Topic的多条消息,是不是在所有的Broker上存储完全相同的内容?

肯定不是的。如果所有的master存储相同的内容,而slave又跟master存储相同的内容:第一个,浪费了存储空间。第二个,无法通过增加机器数量线性地提升Broker的性能,也就是只能垂直扩展,通过升级硬件的方式提升性能,无法实现横向(水平)扩展。那么在分布式的环境中,RocketMQ的性能肯定会受到非常大的限制。

一句话:不符合分片的思想。

那么最关键的问题来了,怎么把发到一个Topic里面的消息分布到不同的master上呢?

在kafka里面设计了一个partition,一个Topic可以拆分成多个 partition,这些partition可以分布在不同的 Broker上,这样就实现了数据的分片,也决定了kafka可以实现横向扩展。

RocketMQ有没有这样的设计呢?

在一个Broker上,RocketMQ只有一个存储文件,并没有像Kafka一样按照不同的Topic分开存储。数据目录:

RocketMQ 轻松入门

也就是说,如果有3个Broker,也就是只有3个用来存储不同数据的commitlog。那问题就来了,如果不按照分区去分布,数据应该根据什么分布呢?

RocketMQ里面设计了一个叫做Message Queue的逻辑概念,作用跟partition类似。

首先,我们创建Topic的时候会指定队列的数量,一个叫 writeQueueNums (写队列数量),一个readQueueNums(读队列数量)。

RocketMQ 轻松入门

写队列的数量决定了有几个Message Queue,读队列的数量决定了有几个线程来消费这些Message Queue(只是用来负载的)。

那不指定MQ的时候呢?默认有几个MQ呢?

服务端创建一个Topic默认8个队列(BrokerConfig):

private int defaultTopicQueueNums=8

topi不存在,生产者发送消息时创建默认4个队列(DefaultMQProducer):

private volatile int defaultTopicQueueNums=4;

MessageQueue在磁盘上是可以看到的,但是数量只跟写队列相关。

比如我用producer发送消息,consumerqueue目录下面就会出现四个目录。

RocketMQ 轻松入门

客户端封装了一个MessageQueue对象,里面其实就是三块内容:

private String topic;
private String brokerName;
private int queueId;

topic表示它是哪个topic 的队列。Broker代表它在哪个Broker 上,比如有两个master,一个叫broker-a,一个叫 broker-b。queueld 代表它是第几个分片。

举例:一个Topic有3个Message Queue,编号是1、2、3。刚好有三个Broker,第一个MQ指向Broker1,第二个MQ指向Broker2,第三个MQ指向Broker3。

发送消息的时候,生产者会根据一定的规则,获得MessageQueue,只要拿到了queueld,就知道要发往哪个Broker,然后在 commitlog 写入消息。

磁盘上看到的队列数量,是由写队列的数量决定的,而且在所有的master上个数是一样的(但是数据存储不一样)。

举例:集群有两个master。如果创建一个topic ,有2个写队列、1个读队列(topic名字:q-2-1)。

那么两台机器的consumequeue目录会出现2个队列,一共4个队列。/opt/rocketmq/store/broker-a/consumequeue/q-2-1也就是总队列数量是:写队列数*节点数。

RocketMQ 轻松入门

如果我们发送6条消息,给消息依次编号,会选择什么队列发送呢?

我们用代码测试以下:

先启动消费者代码:

public class SimpleConsumer {
    public static void main(String[] args) throws MQClientException {
        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_test_consumer_group");

        // Specify name server addresses.
        consumer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
        consumer.setMessageModel(MessageModel.BROADCASTING);

        // Subscribe one more more topics to consume.
        consumer.subscribe("q-2-1", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String messageBody = "";
                    try {
                        messageBody = new String(msg.getBody(), "utf-8");
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        // 重新消费
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    String tags = msg.getTags();
                    System.out.println("topic:" + topic + ",tags:" + tags + ",msg:" + messageBody);
                }

                // 消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

然后启动生产者:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("my_test_producer_group");
        producer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
        producer.start();

        for (int i = 0; i < 6; i++) {
            try {
                // tags 用于过滤消息 keys 索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息
                Message msg = new Message("q-2-1",
                        "TagA",
                        "2673",
                        ("RocketMQ " + String.format("%05d", i)).getBytes());

                SendResult sendResult = producer.send(msg);
                System.out.println(String.format("%05d", i) + " : " + sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        producer.shutdown();
    }
}

因为消费者只有一个读队列,所以它只能消费编号为0的队列。

反过来,如果是1个写队列,2个读队列:

那么broker的consumerqueue目录会出现1个队列。

RocketMQ 轻松入门

因为目前有2个读队列,所以所有消息都可以接收到。

总结

读写队列数量最好一致,否则会出现消费不了的情况。

3. Java开发

3.1 Java API

官方提供了Java客户端API,只需要引入相关依赖即可。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>

官方也提供了详细的代码案例。

package 作用
batch 批量消息,用List发送
benchmart 性能测试
broadcast 广播消息
delay 延迟消息msg.setDelayTimeLevel
filter 基于tag或sql表达式过滤
operation 命令行
ordermessage 顺序消息
quickstart 入门
rpc 实现RPC调用
simple ACL,异步,assign,subscribe
tracemessage 消息追踪
transaction 事务消息

3.1.1 生产者

下面我们看看简单的生产者代码:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("my_test_producer_group");
        producer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
        producer.start();

        for (int i = 0; i < 6; i++) {
            try {
                // tags 用于过滤消息 keys 索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息
                Message msg = new Message("q-2-1",
                        "TagA",
                        "2673",
                        ("RocketMQ " + String.format("%05d", i)).getBytes());

                SendResult sendResult = producer.send(msg);
                System.out.println(String.format("%05d", i) + " : " + sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        producer.shutdown();
    }
}

代码解读:

  1. 多个发送同一类消息的生产者称之为一个生产者组。
  2. 生产者需要通过NameServer获取所有Broker的路由信息,多个用分号隔开。跟Redis 的哨兵一样,通过哨兵获取服务端地址。
  3. Message代表一条消息,必须指定Topic,代表消息的分类,是一个逻辑概念,比如订单类的消息、资金类的消息。
  4. Tags:可选,用于消费端过滤消息,是对消息用途的细分,比如Topic是订单类的消息, tag可以是create,update, delete。
  5. Keys:消息关键词。如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于消息的索引,可以设置为消息的唯一编号(主键)。
  6. SendResult是发送结果的封装,包括消息状态、消息ld、选择的队列等等,只要不抛异常,就代表发送是成功的。
SendResult [sendStatus=SEND_OK, msgId=COA8006F5B6418B4AAC299CF37140009, offsetMsgld=null,messageQueue=MessageQueue [topic=ransaction-test-topic, brokerName=broker-b, queueld=3], queueOffset=5]

msgld:生产者生成的唯一编号,全局唯一,也叫uniqld。

offsetMsgld:消息偏移ID,该ID记录了消息所在集群的物理地址,主要包含所存储Broker服务器的地址(IP与端口号)以及所在commitlog 文件的物理偏移量。

3.1.2 消费者

public class SimpleConsumer {
    public static void main(String[] args) throws MQClientException {
        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_test_consumer_group");

        // Specify name server addresses.
        consumer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
        consumer.setMessageModel(MessageModel.BROADCASTING);

        // Subscribe one more more topics to consume.
        consumer.subscribe("q-2-1", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String messageBody = "";
                    try {
                        messageBody = new String(msg.getBody(), "utf-8");
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        // 重新消费
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    String tags = msg.getTags();
                    System.out.println("topic:" + topic + ",tags:" + tags + ",msg:" + messageBody);
                }

                // 消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

消费者代码解读:

1、消费者组别是消费同一topic的消费者分组。

2、消费者需要从nameServer拿到topic的 queue 所在的 Broker地址,多个用分号隔开。

3、Consumer可以以两种模式启动,广播(Broadcast)和集群(Cluster),广播模式下,一条消息会发送给所有Consumer,集群模式下消息只会发送给一个Consumer.

4、订阅可以使用通配符,topic名字后面的参数跟生产者消息的tag就对应起来了,可以使用通配符,*代表匹配所有消息;||隔开多个。

consumer.subscribe("q-2-1", "*");

5、return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;这句话是告诉Broker消费成功,可以更新offset了。也就是发送ACK。

3.2 Spring boot

3.2.1 依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

在spring boot中使用RocketMQ可以起到简化配置,管理对象,提供模板方法的目的。

3.2.2 配置

客户端的配置可以直接写在配置文件中

server.port=9096
spring.application.name=demo
rocketmq.name-server=localhost:9876
rocketmq.producer.group=test-group
rocketmq.producer.send-message-timeout=3000

3.2.3 消费者

创建消费者类,加上@RocketMQMessageListener注解监听消息。

/**
 * MessageModel:集群模式;广播模式
 * ConsumeMode:顺序消费;无序消费
 */
@Component
@RocketMQMessageListener(topic = "springboot-topic", consumerGroup = "consumer-group",
        //selectorExpression = "tag1",selectorType = SelectorType.TAG,
        messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class MessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        try {
            System.out.println("----------接收到rocketmq消息:" + message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

1、使用注解中的selectorExpression属性过滤消息。

2、注解属性中的MessageModel是消息模型,两个值:

​ CLISTERING:多个消息者轮询消费消息(默认)。

​ BROADCASTING:所有消费者都收到同样的消息。

3、consumeMode是消费模型,两个值:

  • CONCURRENTLY:并发的(默认)。消费端并发消费,消息顺序得不到保证。到底有多少个线程并发消费,取决于线程池的大小。
  • ORDERLY:有序的。消费端有序消费,也就是生产者发送的顺序跟消费者消费的顺序是一致的。

​ 两者的区别:顺序消费需要对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理。很显然并发消费效率更高。

3.2.4 生产者

@Component
public class MessageSender {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void syncSend() {
        SendResult result = rocketMQTemplate.syncSend("springboot-topic:tag", "这是一条同步消息", 10000);
        System.out.println(result);
    }
}

生产者的代码更加简单,只需要注入RocketMQTemplate就可以发送消息。

发送消息有几种类型:

1、同步(syncSend方法)∶指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。

2、异步(asyncSend方法)︰指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。MQ的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理。

3、单向(sendOneWay方法)∶特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

发送方法如何选择?

  • 当发送的消息不重要时,采用one-way方式,以提高吞吐量;
  • 当发送的消息很重要时,且对响应时间不敏感的时候采用sync方式;
  • 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式。

4. 项目地址

RocketMQ-Demo

上一篇:rocketmq 精华


下一篇:生产环境出现网络分区,RocketMQ集群表示毫无压力!!!