SpringBoot整合RocketMQ

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.tech</groupId>
    <artifactId>tech-rocketmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>tech-rocketmq</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

依赖中RocketMQ客户端版本要与服务端的安装版本一致

package com.tech.rocketmq.jms;

/**
 * @author lw
 * @since 2021/11/15
 */
public class JmsConfig {
    public static final String NAME_SERVER = "192.168.50.135:9876";
    public static final String TOPIC = "tech_pay_test_topic_666";
}

公共配置,nameServer和Topic

package com.tech.rocketmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

/**
 * @author lw
 * @since 2021/11/15
 */
@Component
public class PayProducer {
    private String producerGroup="pay_group";
    private DefaultMQProducer producer;
    public PayProducer(){
        producer=new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以;隔开
        //如producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877")
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        start();
    }

    public DefaultMQProducer getProducer() {
        return producer;
    }

    /**
     * 对象使用之前必须调用一次,只能初始化一次
     */
    private void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    private void shutDown(){
        this.producer.shutdown();
    }
}
package com.tech.rocketmq.controller;

import com.tech.rocketmq.jms.JmsConfig;
import com.tech.rocketmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

/**
 * @author lw
 * @since 2021/11/15
 */
@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;


    @GetMapping("send")
    Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Message message = new Message(JmsConfig.TOPIC, "taga", ("hello word = " + text).getBytes());
        SendResult sendResult = payProducer.getProducer().send(message);
        System.out.println(sendResult);
        return new HashMap<>();
    }
}

调用send接口,进行消息发送

package com.tech.rocketmq.jms;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author lw
 * @since 2021/11/15
 */
@Slf4j
@Component
public class PayConsumer {
    private DefaultMQPushConsumer consumer;
    private String consumerGroup="pay_consumer_group";
    public PayConsumer() throws MQClientException {
        consumer=new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        
        consumer.subscribe(JmsConfig.TOPIC,"*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    Message message = list.get(0);
                    log.info("Receive New Message: {}",new String(message.getBody()));
                    String topic = message.getTopic();
                    String tags = message.getTags();
                    String keys = message.getKeys();
                    log.info("topic={} tags={} keys={}",topic,tags,keys);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        consumer.start();
        System.out.println("consumer start ...");
    }
}

监听类,监听消息。

上一篇:图文详解,源码启动RocketMq,超详细


下一篇:关于 RocketMQ 事务消息的正确打开方式 → 你学废了吗