小提示:阿里云打开namesrv和broker的端口
创建maven项目演示
0、pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mq3</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.4.4</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
</dependencies>
</project>
1、启动namesrv
[root@iZ2ze5v2vdwv6veyksylhxZ /]# cd /usr/local/
[root@iZ2ze5v2vdwv6veyksylhxZ local]# ls
aegis bin include libexec nginx rocketmq src
apache-maven-3.6.3 curl jdk1.8.0_281 libiconv openssl rocketmq-console-ng-1.0.1.jar tengine
apache-tomcat-7.0.61 etc lib man redis_bloom sbin yum-3.2.28
apache-tomcat-7.0.61.tar.gz games lib64 mysql redisbloom.so share yum-3.2.28.tar.gz
[root@iZ2ze5v2vdwv6veyksylhxZ local]# cd rocketmq
[root@iZ2ze5v2vdwv6veyksylhxZ rocketmq]# ls
benchmark bin conf lib LICENSE NOTICE README.md
[root@iZ2ze5v2vdwv6veyksylhxZ rocketmq]# cd bin/
[root@iZ2ze5v2vdwv6veyksylhxZ bin]# ls
cachedog.sh mqadmin mqbroker.numanode1 mqshutdown play.sh runserver.sh
cleancache.sh mqadmin.cmd mqbroker.numanode2 mqshutdown.cmd README.md setcache.sh
cleancache.v1.sh mqbroker mqbroker.numanode3 nohup.out runbroker.cmd startfsrv.sh
dledger mqbroker.cmd mqnamesrv os.sh runbroker.sh tools.cmd
hs_err_pid20448.log mqbroker.numanode0 mqnamesrv.cmd play.cmd runserver.cmd tools.sh
[root@iZ2ze5v2vdwv6veyksylhxZ bin]# ./mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
2、启动broker
[root@iZ2ze5v2vdwv6veyksylhxZ bin]# ./mqbroker -n 8.131.84.120:9876 -c ../conf/broker.conf
The broker[broker-a, 8.131.84.120:10911] boot success. serializeType=JSON and name server is 8.131.84.120:9876
3、启动控制台
[root@iZ2ze5v2vdwv6veyksylhxZ ~]# cd /usr/local/
[root@iZ2ze5v2vdwv6veyksylhxZ local]# ls
aegis bin include libexec nginx rocketmq src
apache-maven-3.6.3 curl jdk1.8.0_281 libiconv openssl rocketmq-console-ng-1.0.1.jar tengine
apache-tomcat-7.0.61 etc lib man redis_bloom sbin yum-3.2.28
apache-tomcat-7.0.61.tar.gz games lib64 mysql redisbloom.so share yum-3.2.28.tar.gz
[root@iZ2ze5v2vdwv6veyksylhxZ local]# java -jar rocketmq-console-ng-1.0.1.jar
4、生产者演示demo
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
/**
* 生产者、消费者、namesrv、broker 之间关系
*
* 1、namesrv 类似注册中心,生产者、消费者、broker 都会注册到namesrv上
* 2、生产者 发送消息给 namesrv
* 3、namesrv 将消息转发给 broker的topic上
* 4、broker的topic 再将消息传给 消费者
*/
public class Producer {
public static void main(String[] args) throws Exception{
//1、先创建了生产者
DefaultMQProducer producer = new DefaultMQProducer("mygroup");
//2、生产者要主动联系namesrvAddr
producer.setNamesrvAddr("8.131.84.120:9876");
//3、连接成功后要启动生产者
producer.start();
//4、创建消息类,包含topic和body
Message message = new Message("mytopic","hello world".getBytes());
//5、生产者将消息发送出去
System.out.println(producer.send(message));
//6、关闭生产者
producer.shutdown();
}
}
5、控制台查看结果
6、消费者演示demo
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
//1、创建DefaultMQPushConsumer
DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("demo-consumer-group");
//2、设置namesrv地址
mqPushConsumer.setNamesrvAddr("8.131.84.120:9876");
//3、设置subscribe读取主题信息
/**
* 生产者类似一个作者,namesrv类似杂志社,消费者必须先订阅某家报社,才可以收到生产者给报社写的文章
* 每个消费者只能订阅一个topic
* topic:关注消息的地址
* 过滤器 * :表示不过滤
*/
mqPushConsumer.subscribe("mytopic","*");
//4、消费者注册个监听器,这样namesrv里传进来生产者提供的消息后,就可以及时知道了
//MessageListenerConcurrently 是普通消息接收,MessageListenerOrderly 是顺序消息接收
mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : msgs) {
try {
//获取主题
System.out.println(msg.getTopic());
//获取标签
System.out.println(msg.getTags());
//获取消息
System.out.println(msg.getBody().toString());
} catch (Exception e) {
e.printStackTrace();
//重新再消费一次
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
//5、默认情况下,这条消息只会被一个consumer消费到,点对点消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//开启Consumer
mqPushConsumer.start();
System.out.println("消费者启动...");
}
}