RocketMQ消息生产者和消息消费者演示

小提示:阿里云打开namesrv和broker的端口
RocketMQ消息生产者和消息消费者演示

创建maven项目演示

0、pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>mq3</artifactId>
    <version>1.0-SNAPSHOT</version>

    <packaging>jar</packaging>

    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.4.4</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
    </dependencies>

</project>

1、启动namesrv

[root@iZ2ze5v2vdwv6veyksylhxZ /]# cd /usr/local/
[root@iZ2ze5v2vdwv6veyksylhxZ local]# ls
aegis                        bin    include       libexec   nginx          rocketmq                       src
apache-maven-3.6.3           curl   jdk1.8.0_281  libiconv  openssl        rocketmq-console-ng-1.0.1.jar  tengine
apache-tomcat-7.0.61         etc    lib           man       redis_bloom    sbin                           yum-3.2.28
apache-tomcat-7.0.61.tar.gz  games  lib64         mysql     redisbloom.so  share                          yum-3.2.28.tar.gz
[root@iZ2ze5v2vdwv6veyksylhxZ local]# cd rocketmq
[root@iZ2ze5v2vdwv6veyksylhxZ rocketmq]# ls
benchmark  bin  conf  lib  LICENSE  NOTICE  README.md
[root@iZ2ze5v2vdwv6veyksylhxZ rocketmq]# cd bin/
[root@iZ2ze5v2vdwv6veyksylhxZ bin]# ls
cachedog.sh          mqadmin             mqbroker.numanode1  mqshutdown      play.sh        runserver.sh
cleancache.sh        mqadmin.cmd         mqbroker.numanode2  mqshutdown.cmd  README.md      setcache.sh
cleancache.v1.sh     mqbroker            mqbroker.numanode3  nohup.out       runbroker.cmd  startfsrv.sh
dledger              mqbroker.cmd        mqnamesrv           os.sh           runbroker.sh   tools.cmd
hs_err_pid20448.log  mqbroker.numanode0  mqnamesrv.cmd       play.cmd        runserver.cmd  tools.sh
[root@iZ2ze5v2vdwv6veyksylhxZ bin]# ./mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

2、启动broker

[root@iZ2ze5v2vdwv6veyksylhxZ bin]# ./mqbroker -n 8.131.84.120:9876 -c ../conf/broker.conf 
The broker[broker-a, 8.131.84.120:10911] boot success. serializeType=JSON and name server is 8.131.84.120:9876

3、启动控制台

[root@iZ2ze5v2vdwv6veyksylhxZ ~]# cd /usr/local/
[root@iZ2ze5v2vdwv6veyksylhxZ local]# ls
aegis                        bin    include       libexec   nginx          rocketmq                       src
apache-maven-3.6.3           curl   jdk1.8.0_281  libiconv  openssl        rocketmq-console-ng-1.0.1.jar  tengine
apache-tomcat-7.0.61         etc    lib           man       redis_bloom    sbin                           yum-3.2.28
apache-tomcat-7.0.61.tar.gz  games  lib64         mysql     redisbloom.so  share                          yum-3.2.28.tar.gz
[root@iZ2ze5v2vdwv6veyksylhxZ local]# java -jar rocketmq-console-ng-1.0.1.jar

4、生产者演示demo

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

/**
 * 生产者、消费者、namesrv、broker 之间关系
 *
 * 1、namesrv 类似注册中心,生产者、消费者、broker 都会注册到namesrv上
 * 2、生产者 发送消息给 namesrv
 * 3、namesrv 将消息转发给 broker的topic上
 * 4、broker的topic 再将消息传给 消费者
 */
public class Producer {
    public static void main(String[] args) throws Exception{
        //1、先创建了生产者
        DefaultMQProducer producer = new DefaultMQProducer("mygroup");
        //2、生产者要主动联系namesrvAddr
        producer.setNamesrvAddr("8.131.84.120:9876");
        //3、连接成功后要启动生产者
        producer.start();
        //4、创建消息类,包含topic和body
        Message message = new Message("mytopic","hello world".getBytes());
        //5、生产者将消息发送出去
        System.out.println(producer.send(message));
        //6、关闭生产者
        producer.shutdown();
    }
}

5、控制台查看结果

RocketMQ消息生产者和消息消费者演示

6、消费者演示demo

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1、创建DefaultMQPushConsumer
        DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("demo-consumer-group");
        //2、设置namesrv地址
        mqPushConsumer.setNamesrvAddr("8.131.84.120:9876");
        //3、设置subscribe读取主题信息
        /**
         * 生产者类似一个作者,namesrv类似杂志社,消费者必须先订阅某家报社,才可以收到生产者给报社写的文章
         * 每个消费者只能订阅一个topic
         * topic:关注消息的地址
         * 过滤器 * :表示不过滤
         */
        mqPushConsumer.subscribe("mytopic","*");
        //4、消费者注册个监听器,这样namesrv里传进来生产者提供的消息后,就可以及时知道了
        //MessageListenerConcurrently 是普通消息接收,MessageListenerOrderly 是顺序消息接收
        mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : msgs) {
                    try {
                        //获取主题
                        System.out.println(msg.getTopic());
                        //获取标签
                        System.out.println(msg.getTags());
                        //获取消息
                        System.out.println(msg.getBody().toString());
                    } catch (Exception e) {
                        e.printStackTrace();
                        //重新再消费一次
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                //5、默认情况下,这条消息只会被一个consumer消费到,点对点消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //开启Consumer
        mqPushConsumer.start();
        System.out.println("消费者启动...");
    }
}

7、控制台查看结果

RocketMQ消息生产者和消息消费者演示
RocketMQ消息生产者和消息消费者演示

上一篇:Windows和Linux环境安装RocketMQ


下一篇:CentOS系统中使用docker安装RocketMQ中间件