RocketMQ入门
一、环境准备
1.下载软件
传送门: rocketmq-all-4.5.1-bin-release.zip
环境要求:JDK1.8、4G可用内存
2.Linux安装
解压
unzip rocketmq-all-4.5.1-bin-release.zip
切换到RocketMQ目录
cd rocketmq-all-4.5.1-bin-release
启动Name Server
# 启动nameserver
nohup sh bin/mqnamesrv &
# 验证是否启动ok
tail -f ~/logs/rocketmqlogs/namesrv.log
# 如果成功启动,能看到类似如下的日志:
2019-07-18 17:03:56 INFO main - The Name Server boot success. ...
启动Broker
# 启动broker
nohup sh bin/mqbroker -n localhost:9876 &
# 验证是否启动ok
tail -f ~/logs/rocketmqlogs/broker.log
# 如果启动成功,能看到类似如下的日志:
2019-07-18 17:08:41 INFO main - The broker[itmuchcomdeMacBook-Pro.local, 192.168.124.136:10911] boot success. serializeType=JSON and name server is localhost:9876
验证生产消息是否正常
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
验证消费消息是否正常
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
服务停止
# 1.关闭NameServer
mqshutdown namesrv
# 2.关闭Broker
mqshutdown broker
3.Windows安装
解压
配置环境变量
变量名:ROCKETMQ_HOME
变量值:MQ解压路径\MQ文件夹名
如:D:\develop\rocketmq
启动Name Server
start mqnamesrv.cmd
启动Broker
start mqbroker.cmd -n 127.0.0.1:9876
如果启动过程中出现以下错误找不到或无法加载主类 xxxxxx
,打开runbroker.cmd,将"%CLASSPATH%"加上英文双引号。保存并重新执行start语句。
二、Java API使用
同步发送
public class SyncProducerApp {
public static void main(String[] args) throws InterruptedException, RemotingException,
MQClientException, MQBrokerException {
// 创建producer,并指定一个生产组
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 指定NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 初始化生产者
producer.start();
Message message = new Message(
"topic_producer_01", //指定topic
"hello world".getBytes(StandardCharsets.UTF_8) //消息内容
);
// 发送消息
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
// 关闭producer
producer.shutdown();
}
}
异步发送
public class AsyncProducerApp {
public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
// 创建producer,并指定一个生产组
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 指定NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 初始化生产者
producer.start();
// 定义消息
for (int i = 0; i < 100; i++) {
Message message = new Message(
"topic_producer_02", //指定topic
("hello world" + i).getBytes(StandardCharsets.UTF_8) //消息内容
);
// 发送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息发送成功:" + sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息发送失败:" + throwable.getMessage());
}
});
}
// 休眠10s
Thread.sleep(10000);
// 关闭producer
producer.shutdown();
}
}
注意: 异步发送需要休眠一段时间,不然关闭producer,会发送失败。
pull模式
public class PullConsumerApp {
public static void main(String[] args) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException {
// 创建consumer并指定一个消费组
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer_group");
// 指定NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 初始化消费者
consumer.start();
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("topic_producer_01");
for (MessageQueue messageQueue : messageQueues) {
System.out.println("messageQueue:" + messageQueue);
//
PullResult result = consumer.pull(
messageQueue, // 代表当前主题的一个消息队列
"*", // 对接收的消息按照tag进行过滤
0, // 消息的偏移量,从这里开始消费
10 // 每次最多拉取多少条消息
);
System.out.println("result:" + result);
}
// 关闭消费者
consumer.shutdown();
}
}
push模式
public class PushConsumerApp {
public static void main(String[] args) throws MQClientException {
// 创建consumer并指定一个消费组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 指定NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅主题
consumer.subscribe("topic_producer_02", "*");
// 添加消息监听器,一旦有消息推送过来,就进行消费
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
MessageQueue messageQueue = context.getMessageQueue();
System.out.println("messageQueue:" + messageQueue);
for (MessageExt messageExt : list) {
try {
System.out.println("message:" + new String(messageExt.getBody(), "utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
// 消息消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 初始化消费者,之后开始消费消息
consumer.start();
// 此处只是示例,生产中除非运维关掉,否则不应停掉,长服务
// Thread.sleep(30_000);
// // 关闭消费者
// consumer.shutdown();
}
}
三、RocketMQ整合Spring Boot
1.创建Spring Boot工程
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.snowsea</groupId>
<artifactId>rocketmq-springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq-springboot</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
spring.application.name=producer-boot
# 指定NameServer地址
rocketmq.name-server=127.0.0.1:9876
# 指定生产组
rocketmq.producer.group=producer-boot-group
创建一个MessageController
MessageController
@RestController
@RequestMapping("/message")
public class MessageController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/send")
public String send(@RequestParam String msg) {
rocketMQTemplate.convertAndSend("topic-boot", msg);
return "ok";
}
}
创建一个ConsumerListener
ConsumerListener
@Component
@RocketMQMessageListener(topic = "topic-boot", consumerGroup = "consumer-boot-group")
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Receive message:" + message);
}
}
启动应用进行测试
打开浏览器,发送消息:http://localhost:8080/message/send?msg=hello world,查看控制台是否收到消息。
2021-04-15 11:16:47.620 INFO 25528 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2021-04-15 11:16:47.623 INFO 25528 --- [ main] c.s.r.RocketmqSpringbootApplication : Started RocketmqSpringbootApplication in 4.56 seconds (JVM running for 5.314)
2021-04-15 11:17:03.698 INFO 25528 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-04-15 11:17:03.698 INFO 25528 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-04-15 11:17:03.701 INFO 25528 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 3 ms
Receive message:hello world
Receive message:my name is jerry
如果你喜欢博主的更多文章,请关注博主微信公众号。