<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tech</groupId>
<artifactId>tech-rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>tech-rocketmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
依赖中RocketMQ客户端版本要与服务端的安装版本一致
package com.tech.rocketmq.jms;
/**
* @author lw
* @since 2021/11/15
*/
public class JmsConfig {
public static final String NAME_SERVER = "192.168.50.135:9876";
public static final String TOPIC = "tech_pay_test_topic_666";
}
公共配置,nameServer和Topic
package com.tech.rocketmq.jms;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;
/**
* @author lw
* @since 2021/11/15
*/
@Component
public class PayProducer {
private String producerGroup="pay_group";
private DefaultMQProducer producer;
public PayProducer(){
producer=new DefaultMQProducer(producerGroup);
//指定NameServer地址,多个地址以;隔开
//如producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877")
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
start();
}
public DefaultMQProducer getProducer() {
return producer;
}
/**
* 对象使用之前必须调用一次,只能初始化一次
*/
private void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
private void shutDown(){
this.producer.shutdown();
}
}
package com.tech.rocketmq.controller;
import com.tech.rocketmq.jms.JmsConfig;
import com.tech.rocketmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
/**
* @author lw
* @since 2021/11/15
*/
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
@GetMapping("send")
Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message message = new Message(JmsConfig.TOPIC, "taga", ("hello word = " + text).getBytes());
SendResult sendResult = payProducer.getProducer().send(message);
System.out.println(sendResult);
return new HashMap<>();
}
}
调用send接口,进行消息发送
package com.tech.rocketmq.jms;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author lw
* @since 2021/11/15
*/
@Slf4j
@Component
public class PayConsumer {
private DefaultMQPushConsumer consumer;
private String consumerGroup="pay_consumer_group";
public PayConsumer() throws MQClientException {
consumer=new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(JmsConfig.TOPIC,"*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
Message message = list.get(0);
log.info("Receive New Message: {}",new String(message.getBody()));
String topic = message.getTopic();
String tags = message.getTags();
String keys = message.getKeys();
log.info("topic={} tags={} keys={}",topic,tags,keys);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
System.out.println("consumer start ...");
}
}
监听类,监听消息。