RocketMQ入门

RocketMQ入门

一、环境准备
1.下载软件

传送门: rocketmq-all-4.5.1-bin-release.zip

环境要求:JDK1.8、4G可用内存

2.Linux安装

解压

unzip rocketmq-all-4.5.1-bin-release.zip

切换到RocketMQ目录

cd rocketmq-all-4.5.1-bin-release

启动Name Server

# 启动nameserver
nohup sh bin/mqnamesrv &

# 验证是否启动ok
tail -f ~/logs/rocketmqlogs/namesrv.log

# 如果成功启动,能看到类似如下的日志:
2019-07-18 17:03:56 INFO main - The Name Server boot success. ...

启动Broker

# 启动broker
nohup sh bin/mqbroker -n localhost:9876 &

# 验证是否启动ok
tail -f ~/logs/rocketmqlogs/broker.log

# 如果启动成功,能看到类似如下的日志:
2019-07-18 17:08:41 INFO main - The broker[itmuchcomdeMacBook-Pro.local, 192.168.124.136:10911] boot success. serializeType=JSON and name server is localhost:9876

验证生产消息是否正常

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

验证消费消息是否正常

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

服务停止

# 1.关闭NameServer
mqshutdown namesrv
# 2.关闭Broker
mqshutdown broker
3.Windows安装

解压

RocketMQ入门

配置环境变量

变量名:ROCKETMQ_HOME

变量值:MQ解压路径\MQ文件夹名

如:D:\develop\rocketmq

启动Name Server

start mqnamesrv.cmd

启动Broker

start mqbroker.cmd -n 127.0.0.1:9876

如果启动过程中出现以下错误找不到或无法加载主类 xxxxxx,打开runbroker.cmd,将"%CLASSPATH%"加上英文双引号。保存并重新执行start语句。

二、Java API使用

同步发送

public class SyncProducerApp {

    public static void main(String[] args) throws InterruptedException, RemotingException,
            MQClientException, MQBrokerException {
        // 创建producer,并指定一个生产组
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // 指定NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 初始化生产者
        producer.start();
        Message message = new Message(
                "topic_producer_01", //指定topic
                "hello world".getBytes(StandardCharsets.UTF_8) //消息内容
        );
        // 发送消息
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult);

        // 关闭producer
        producer.shutdown();
    }
}

异步发送

public class AsyncProducerApp {

    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
        // 创建producer,并指定一个生产组
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // 指定NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 初始化生产者
        producer.start();

        // 定义消息
        for (int i = 0; i < 100; i++) {
            Message message = new Message(
                    "topic_producer_02", //指定topic
                    ("hello world" + i).getBytes(StandardCharsets.UTF_8) //消息内容
            );

            // 发送消息
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("消息发送成功:" + sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println("消息发送失败:" + throwable.getMessage());
                }
            });
        }

        // 休眠10s
        Thread.sleep(10000);

        // 关闭producer
        producer.shutdown();
    }
}

RocketMQ入门

注意: 异步发送需要休眠一段时间,不然关闭producer,会发送失败。

pull模式

public class PullConsumerApp {

    public static void main(String[] args) throws MQClientException, RemotingException, 
            InterruptedException, MQBrokerException {
        // 创建consumer并指定一个消费组
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer_group");
        // 指定NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 初始化消费者
        consumer.start();

        Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("topic_producer_01");
        for (MessageQueue messageQueue : messageQueues) {
            System.out.println("messageQueue:" + messageQueue);
            //
            PullResult result = consumer.pull(
                    messageQueue, // 代表当前主题的一个消息队列
                    "*", // 对接收的消息按照tag进行过滤
                    0, // 消息的偏移量,从这里开始消费
                    10 // 每次最多拉取多少条消息
            );
            System.out.println("result:" + result);
        }

        // 关闭消费者
        consumer.shutdown();
    }
}

RocketMQ入门

push模式

public class PushConsumerApp {

    public static void main(String[] args) throws MQClientException {
        // 创建consumer并指定一个消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        // 指定NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅主题
        consumer.subscribe("topic_producer_02", "*");
        // 添加消息监听器,一旦有消息推送过来,就进行消费
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                MessageQueue messageQueue = context.getMessageQueue();

                System.out.println("messageQueue:" + messageQueue);

                for (MessageExt messageExt : list) {
                    try {
                        System.out.println("message:" + new String(messageExt.getBody(), "utf-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                // 消息消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }
        });
        // 初始化消费者,之后开始消费消息
        consumer.start();
        // 此处只是示例,生产中除非运维关掉,否则不应停掉,长服务
//        Thread.sleep(30_000);
//        // 关闭消费者
//        consumer.shutdown();
    }
}

RocketMQ入门

三、RocketMQ整合Spring Boot
1.创建Spring Boot工程

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.snowsea</groupId>
    <artifactId>rocketmq-springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmq-springboot</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
            </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties

spring.application.name=producer-boot
# 指定NameServer地址
rocketmq.name-server=127.0.0.1:9876
# 指定生产组
rocketmq.producer.group=producer-boot-group

创建一个MessageController

MessageController

@RestController
@RequestMapping("/message")
public class MessageController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @RequestMapping("/send")
    public String send(@RequestParam String msg) {
        rocketMQTemplate.convertAndSend("topic-boot", msg);
        return "ok";
    }
}

创建一个ConsumerListener

ConsumerListener

@Component
@RocketMQMessageListener(topic = "topic-boot", consumerGroup = "consumer-boot-group")
public class ConsumerListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Receive message:" + message);
    }
}

启动应用进行测试

RocketMQ入门

打开浏览器,发送消息:http://localhost:8080/message/send?msg=hello world,查看控制台是否收到消息。

2021-04-15 11:16:47.620  INFO 25528 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2021-04-15 11:16:47.623  INFO 25528 --- [           main] c.s.r.RocketmqSpringbootApplication      : Started RocketmqSpringbootApplication in 4.56 seconds (JVM running for 5.314)
2021-04-15 11:17:03.698  INFO 25528 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-04-15 11:17:03.698  INFO 25528 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-04-15 11:17:03.701  INFO 25528 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 3 ms
Receive message:hello world
Receive message:my name is jerry

如果你喜欢博主的更多文章,请关注博主微信公众号。

RocketMQ入门

上一篇:C#实现登录窗口(不用隐藏)


下一篇:rocketmq的使用及高级特性