作者:俏巴
概述
本文介绍如何在 Spring 框架下用消息队列 RocketMQ 收发消息。主要包括以下两部分内容:
- 普通消息生产者和 Spring 集成
- 消息消费者和 Spring 集成
测试流程
资源创建
1、管理门户创建实例、Topic及Group;
2、注意:如果程序在本地测试运行,请选择在公网区域创建。
代码测试
1、pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>MavenSpringDemoMQ</groupId>
<artifactId>MavenSpringDemoMQ</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<org.springframework.version>5.0.8.RELEASE</org.springframework.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework/org.springframework.context -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${org.springframework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${org.springframework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${org.springframework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-expression</artifactId>
<version>${org.springframework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${org.springframework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-indexer</artifactId>
<version>${org.springframework.version}</version>
</dependency>
<!--spring core end-->
<!--spring aop start-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${org.springframework.version}</version>
</dependency>
<!--spirng aop end-->
<!--spring aspects start-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${org.springframework.version}</version>
</dependency>
<!--spring aspects end-->
<!--spring instrumentation start -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-instrument</artifactId>
<version>${org.springframework.version}</version>
</dependency>
<!--spring instrumentation end-->
<!--RocketMQ jar依赖-->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.7.9.Final</version>
</dependency>
</dependencies>
</project>
2、发送端配置文件producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
<!-- Spring 接入方式支持 Java SDK 支持的所有配置项 -->
<property name="properties" > <!--生产者配置信息-->
<props>
<prop key="AccessKey">********</prop>
<prop key="SecretKey">********</prop>
<prop key="NAMESRV_ADDR">http://MQ_INST_********_BaQUuiNE.mq-internet-access.mq-internet.aliyuncs.com:80</prop>
</props>
</property>
</bean>
</beans>
3、发送端代码
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProduceWithSpring {
public static void main(String[] args) {
/**
* 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中
*/
ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
Producer producer = (Producer) context.getBean("producer");
//循环发送消息
for (int i = 0; i < 100; i++) {
Message msg = new Message( //
// Message 所属的 Topic
"yutopic",
// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤
"TagSpring",
// Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预
// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一
// 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发
// 注意:不设置也不会影响消息正常收发
msg.setKey("ORDERID_100");
// 发送消息,只要不抛异常就是成功
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
System.out.println("send success: " + sendResult.getMessageId());
}catch (ONSClientException e) {
System.out.println("发送失败");
}
}
//关系producer
producer.shutdown();
}
}
4、消费端配置文件consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener 配置-->
<!-- Group ID 订阅同一个 Topic,可以创建多个 ConsumerBean-->
<bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" > <!--消费者配置信息-->
<props>
<prop key="AccessKey">********</prop>
<prop key="SecretKey">********</prop>
<prop key="NAMESRV_ADDR">http://MQ_INST_********_BaQUuiNE.mq-internet-access.mq-internet.aliyuncs.com:80</prop>
<prop key="GROUP_ID">GID_Spring</prop>
<!--将消费者线程数固定为 50 个
<prop key="ConsumeThreadNums">50</prop>
-->
</props>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="msgListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="yutopic"/>
<property name="expression" value="*"/><!--expression 即 Tag,可以设置成具体的 Tag,如 taga||tagb||tagc,也可设置成 *。 * 仅代表订阅所有 Tag,不支持通配-->
</bean>
</key>
</entry>
<!--更多的订阅添加 entry 节点即可,如下所示-->
<!--<entry value-ref="msgListener">-->
<!--<key>-->
<!--<bean class="com.aliyun.openservices.ons.api.bean.Subscription">-->
<!--<property name="topic" value="TopicTestMQ-Other"/> <!–订阅另外一个 Topic –>-->
<!--<property name="expression" value="taga||tagb"/> <!– 订阅多个 Tag –>-->
<!--</bean>-->
<!--</key>-->
<!--</entry>-->
</map>
</property>
</bean>
5、DemoMessageListener
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
public class DemoMessageListener implements MessageListener {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message.getMsgID());
try {
//do something..
return Action.CommitMessage;
}catch (Exception e) {
//消费失败
return Action.ReconsumeLater;
}
}
}`
6、消费端启动程序
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ConsumeWithSpring {
public static void main(String[] args) {
/**
* 消费者 Bean 配置在 consumer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中
*/
ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
System.out.println("Consumer Started");
}
}
测试效果
- 发送端
- 消费端
更多参考
Spring 集成