【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

一、概述

RocketMQ是由Alibaba用Java开发、现已加入到Apache下的一个分布式消息中间件,具有高性能、低延迟,高可靠性。本文介绍了RocketMQ与Springboot的整合基本使用过程。

二、连接方式

※ 引入依赖 & 参数设置

引入依赖

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

参数设置

application.yaml

必填项:

rocketmq: 
	name-server: IP地址:端口号(默认9876)			   # NameServer IP地址、端口号
	producer: 
		group: 默认生产者组							# 默认生产者组,若不指定则默认使用该生产者组
	consumer:
    	group: 默认消费者组							# 默认消费者组,若不指定则默认使用该消费者组

常见选填项:

rocketmq: 
    producer: 
		send-message-timeout: 3000					# 发送消息超时等待时长
        retry-times-when-send-async-failed: 2		# 异步发送消息失败尝试次数,默认为2
        retry-times-when-send-failed: 2				# 同步发送消息失败尝试次数,默认为2
    consumer:
		topic: 默认Topic							   # 默认消费Topic

项目结构

项目结构图:

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

创建Maven项目spring-rocketmq,引入必要依赖:

<properties>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
</properties>

<!-- spring版本根据需求选择即可,这里选取2.5.3版本 -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.5.3</version>
</parent>

<!-- 其他必要依赖 -->
<dependencies>
	<!-- rocketmq -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
    
	<!-- spring web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- spring test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
	
    <!-- lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.20</version>
    </dependency>
    
    <!-- junit -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
    
    <!-- swagger3 -->
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-boot-starter</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

三、生产消息

0. RocketMQTemplate 属性

RocketMQTemplate类有很多属性:

// 日志功能
private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
// 默认生产者
private DefaultMQProducer producer;
// 默认Pull消费者
private DefaultLitePullConsumer consumer;
// 字符集:UTF-8
private String charset = "UTF-8";
// 消息Queue选择策略:Hash取模
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
// Message转换器
private RocketMQMessageConverter rocketMQMessageConverter = new RocketMQMessageConverter();

这些属性部分可通过setter、getter方法进行调整,以便于我们更好的使用RocketMQ。

1. 普通消息

我们设计一般消息、实体类、集合,用于进行测试发送多种类型消息下的不同情况

通用消息类 org.springframework.messaging.Message实例message

// SpringFrameWork中的Message类与原生RocketMQClient中的Message类不同
// 它们之间依靠 工具类RocketMQUtil中的 convertToRocketMessage()方法 转换
// 本篇文章消息均采用 字符串String 测试,基本数据类型的包装类型与之类似,也可以使用自定义类型,即其他引用类型
Message<String> message = MessageBuilder
                .withPayload(msg)
                .setHeader("KEYS", "1234")
                .build();				// msg:想要发送的消息体

实体类User实例message

// 简易用户实体User类
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private Integer id;			// 用户唯一ID
    private String username;	// 用户名
    private String password;	// 用户密码
    // ....其他字段予以省略
}

List集合实例list

// 这里添加三条相同的消息
List<Message<String>> list = new LinkedList<Message<String>>() {{
    add(message);
    add(message);
    add(message);
}};

① 同步发送消息

同步发送消息:Producer发出一条消息后,会在收到ACK之后才发下一条消息。该方式消息的可靠性最高,但消息发送效率太低。

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

// MyService:syncSendSimpleMessage
public void syncSendSimpleMessage(String msg) {
    /* 创建各种类型的消息 */
    Message<String> message = MessageBuilder.withPayload(msg).setHeader("KEYS", "1234").build();
    List<Message<String>> list = new LinkedList<Message<String>>() {{
        add(message);
        add(message);
        add(message);
    }};
    User user = new User(123456,"小李","123456");

    // 同步发送消息
    template.syncSend("MyTopicA" + ":MyTagA", message);
    template.syncSend("MyTopicA" + ":MyTagA", message, 10000);
    // 同步发送消息集合(批量)
    template.syncSend("MyTopicA" + ":MyTagA", list);
    template.syncSend("MyTopicA" + ":MyTagA", list, 10000);
    // 同步发送实体消息
    template.syncSend("MyTopicA" + ":MyTagA", user);
    template.syncSend("MyTopicA" + ":MyTagA", user, 10000);
}

② 异步发送消息

异步发送消息:Producer发出消息后无需等待MQ返回ACK,直接发送下一条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

public void asyncSendSimpleMessage(String msg){
    /* 创建各种类型的消息 */
    Message<String> message = MessageBuilder.withPayload(msg).setHeader("KEYS", "1234").build();
    List<Message<String>> list = new LinkedList<Message<String>>() {{
        add(message);
        add(message);
        add(message);
    }};
    User user = new User(123456,"小李","123456");
    /* 创建回调的匿名内部类 */
    SendCallback sendCallback = new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println();
        }

        @Override
        public void onException(Throwable e) {
            e.printStackTrace();
        }
    };

    // 异步发送消息
    template.asyncSend("MyTopicB" + ":MyTagB", message, sendCallback);
    template.asyncSend("MyTopicB" + ":MyTagB", message, sendCallback, 10000);
    // 异步发送消息集合(批量)
    template.asyncSend("MyTopicB" + ":MyTagB", list, sendCallback);
    template.asyncSend("MyTopicB" + ":MyTagB", list, sendCallback);
    // 异步发送实体消息
    template.asyncSend("MyTopicB" + ":MyTagB", user, sendCallback);
    template.asyncSend("MyTopicB" + ":MyTagB", user, sendCallback, 10000);
}

③ 单向发送消息

单向发送消息:Producer仅负责发送消息,不等待、不处理MQ的ACK,该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但可靠性较差。

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

3. 顺序消息

顺序消息指的是,严格按照消息的发送顺序进行消费的消息 (FIFO)。

默认情况下生产者会把消息以Round Robin轮询方式发送到不同Queue分区队列;而消费者消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。

※ 顺序消息选择队列三大内置策略

RocketMQ-Client为顺序消息内置了选择队列的三大内置策略,分别是基于Hash算法选择、基于同机房选择、基于随机算法选择,默认为基于Hash算法选,我们可以通过在使用RocketMQTemplate前,通过其附带的setMessageQueueSelector()方法设置其他选择策略。

所有选择算法类都实现了MeessageQueueSelector接口,并实现了其中的select()方法,我们也可以通过同样的方式创建自定义选择算法,并通过以上方法设置我们自己的选择算法。

基于Hash算法选择

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

public class SelectMessageQueueByHash implements MessageQueueSelector {
	
    /**
     * @param mqs 消息队列queue的list集合
     * @param msg 消息(org.apache.rocketmq.common.message.Message)
     * @param arg 传递参数,即面向用户方法中的hashKey参数,通过方法调用层层传递,包装为arg参数
     */
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        
        // 参数hashcode与queue数量取模
        int value = arg.hashCode() % mqs.size();
        // 若小于0,则取绝对值
        if (value < 0) {
            value = Math.abs(value);
        }
        // 返回此值,作为queue选择值
        return mqs.get(value);
    }
}

基于同机房选择

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    // Consumer-Internet-Data-Center:消费者互联网数据中心(机房)
    // Set集合,用于存放机房
    private Set<String> consumeridcs;
	
    /**
     * @param mqs 消息队列queue的list集合
     * @param msg 消息(org.apache.rocketmq.common.message.Message)
     * @param arg 传递参数,即面向用户方法中的hashKey参数,通过方法调用层层传递,包装为arg参数
     */
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return null;	//根据具体情况,继承该类(获取该参数),并重写该方法,实现具体算法
    }
	
    /* 机房集 setter、getter 方法 */
    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}

基于随机算法选择

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

public class SelectMessageQueueByRandom implements MessageQueueSelector {
    // 获取随机数(随机种子:系统当前时刻)
    private Random random = new Random(System.currentTimeMillis());
	
    /**
     * @param mqs 消息队列queue的list集合
     * @param msg 消息(org.apache.rocketmq.common.message.Message)
     * @param arg 传递参数,即面向用户方法中的hashKey参数,通过方法调用层层传递,包装为arg参数
     */
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // 计算随机值(大小不超过queue数量)
        int value = random.nextInt(mqs.size());
        // 返回此值,作为queue选择值
        return mqs.get(value);
    }
}

① 同步发送消息

同步发送消息:Producer发出一条消息后,会在收到ACK之后才发下一条消息。该方式消息的可靠性最高,但消息发送效率太低。

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

public void syncSendOrderly(String msg) {
    /* 创建各种类型的消息 */
    Message<String> message = MessageBuilder.withPayload(msg).setHeader("KEYS", "1234").build();
    List<Message<String>> list = new LinkedList<Message<String>>() {{
        add(message);
        add(message);
        add(message);
    }};
    User user = new User(123456, "小李", "123456");

    // 同步发送顺序消息
    template.syncSendOrderly("MyTopicD" + ":MyTagD", message, "");
    template.syncSendOrderly("MyTopicD" + ":MyTagD", message, "", 10000);
    // 同步发送顺序消息集合(批量)
    template.syncSendOrderly("MyTopicD" + ":MyTagD", list, "");
    template.syncSendOrderly("MyTopicD" + ":MyTagD", list, "", 10000);
    // 同步发送顺序实体消息
    template.syncSendOrderly("MyTopicD" + ":MyTagD", user, "");
    template.syncSendOrderly("MyTopicD" + ":MyTagD", user, "", 10000);
}

② 异步发送消息

异步发送消息:Producer发出消息后无需等待MQ返回ACK,直接发送下一条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

public void asyncSendOrderly(String msg) {
    /* 创建各种类型的消息 */
    Message<String> message = MessageBuilder.withPayload(msg).setHeader("KEYS", "1234").build();
    List<Message<String>> list = new LinkedList<Message<String>>() {{
        add(message);
        add(message);
        add(message);
    }};
    User user = new User(123456, "小李", "123456");
    /* 创建回调的匿名内部类 */
    SendCallback sendCallback = new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println();
        }

        @Override
        public void onException(Throwable e) {
            e.printStackTrace();
        }
    };

    // 异步发送顺序消息
    template.asyncSendOrderly("MyTopicE" + ":MyTagE", message, "", sendCallback);
    template.asyncSendOrderly("MyTopicE" + ":MyTagE", message, "", sendCallback, 10000);
    // 异步发送顺序实体消息
    template.asyncSendOrderly("MyTopicE" + ":MyTagE", user, "", sendCallback);
    template.asyncSendOrderly("MyTopicE" + ":MyTagE", user, "", sendCallback, 10000);
}

③ 单向发送消息

单向发送消息:Producer仅负责发送消息,不等待、不处理MQ的ACK,该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但可靠性较差。

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

public void sendOneWayOrderly(String msg) {
    /* 创建各种类型的消息 */
    Message<String> message = MessageBuilder.withPayload(msg).setHeader("KEYS", "1234").build();
    User user = new User(123456, "小李", "123456");

    // 单向发送顺序消息
    template.sendOneWayOrderly("MyTopicF" + ":MyTagF", message, "");
    // 单向发送实体消息
    template.sendOneWayOrderly("MyTopicF" + ":MyTagF", user, "");
}

4. 延时消息

当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。

采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超过未支付取消订票的场景。

延时等级

延时消息的延时时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中:

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

即,若指定的延时等级为3,则表示延时时长为10s,即延迟等级是从1开始计数的。

当然,如果需要自定义的延时等级,可以通过在broker加载的配置中新增如下配置(例如下面增加了1天这个等级1d)。配置文件在RocketMQ安装目录下的conf目录中。

添加后等级数字会发生变化

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

① 同步发送消息

public void syncSendSimpleMessageDelay(String msg) {
    /* 创建各种类型的消息 */
    Message<String> message = MessageBuilder.withPayload(msg).setHeader("KEYS", "1234").build();

    // 同步发送延迟消息
    template.syncSend("MyTopicA" + ":MyTagA", message, 10000, 2);
}

② 异步发送消息

public void asyncSendSimpleMessageDelay(String msg) {
    /* 创建各种类型的消息 */
    Message<String> message = MessageBuilder.withPayload(msg).setHeader("KEYS", "1234").build();
    /* 创建回调的匿名内部类 */
    SendCallback sendCallback = new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println();
        }

        @Override
        public void onException(Throwable e) {
            e.printStackTrace();
        }
    };

    // 异步发送延迟消息
    template.asyncSend("MyTopicB" + ":MyTagB", message, sendCallback, 10000, 1);
}

6. 转换发送

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

public void sendByConvert(String msg) {
    /* 创建各种类型的消息 */
    Message<String> message = MessageBuilder.withPayload(msg).setHeader("KEYS", "1234").build();
    User user = new User(123456, "小李", "123456");
    // 消息头
    Map<String, Object> headers = new HashMap<>();
    headers.put("KEYS", "1234");

    // 发送普通消息
    template.convertAndSend("MyTopicD" + ":MyTagD", message);
    template.convertAndSend("MyTopicD" + ":MyTagD", message, headers);
    // 发送实体消息
    template.convertAndSend("MyTopicD" + ":MyTagD", user);
    template.convertAndSend("MyTopicD" + ":MyTagD", user,headers);
}

四、消费消息

1. 消费类型消息

普通消息

@Service
@RocketMQMessageListener(topic = "MyTopicA", consumerGroup = "cg")
public class AcceptMessage implements RocketMQListener<String> {
    // 监听到消息就会执行此方法
    @Override
    public void onMessage(String str) {
        log.info("监听到消息:str={}", str);
    }
}

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

依照这种方式接收实体可接收到JSON字符串:

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

实体消息

@Service
@RocketMQMessageListener(topic = "MyTopicA", consumerGroup = "cg")
public class AcceptMessage implements RocketMQListener<User> {
    // 监听到消息就会执行此方法
    @Override
    public void onMessage(User user) {
        log.info("监听到消息:user={}", JSON.toJSONString(user));
        System.out.println(user);
    }
}

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

2. 消息过滤

我们在发送端Controller层加入以下语句:

System.out.println(topic + ":" + tag + "||" + message);

Tag过滤

@RocketMQMessageListener(topic = "MyTopicA", selectorExpression = "MyTagA", consumerGroup = "cg")
@Service
public class AcceptMessageWithTagFilter implements RocketMQListener<String> {
    // 监听到消息就会执行此方法
    @Override
    public void onMessage(String str) {
        log.info("监听到消息:user={}", str);
    }
}

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

SQL过滤

默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:

enablePropertyFilter = true
@RocketMQMessageListener(topic = "MyTopicA", selectorType = SelectorType.SQL92, selectorExpression = "age between 0 and 6", consumerGroup = "cg")
@Service
public class AcceptMessageWithSQLFilter implements RocketMQListener<User> {
    // 监听到消息就会执行此方法
    @Override
    public void onMessage(User user) {
        log.info("监听到消息:user={}", JSON.toJSONString(user));
    }
}

直接运行代码:

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

重启NameServer、Broker、RocketMQ-Console

nohup sh bin/mqnamesrc &  # 启动NameServer
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &  # 启动Broker,并指定配置文件

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇
至此,功能开启,代码可以成功运行!

F3ZW4=,size_20,color_FFFFFF,t_70,g_se,x_16)

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇

重启NameServer、Broker、RocketMQ-Console

nohup sh bin/mqnamesrc &  # 启动NameServer
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &  # 启动Broker,并指定配置文件

【RocketMQ】SpringBoot整合RocketMQ(上) 基础篇
至此,功能开启,代码可以成功运行!

上一篇:Linux&Windows下安装RocketMQ


下一篇:java-测试rocketmq