最近项目上在使用rocektmq,特此记录一下
一、pom依赖
<!-- rocketmq --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
二、配置
#rocketmq配置
rocketmq:
produceGroupName: unique_group_name
consumerGroupName: unique_group_name
namesrvAddr: 127.0.0.1:9876
topic: test-service
producer:
maxMessageSize: 4096
sendMsgTimeout: 3000
retryTimesWhenSendFailed: 2
consumer:
consumeThreadMin: 5
consumeThreadMax: 32
consumeMessageBatchMaxSize: 1
三、代码
1、读取配置
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class RocketMqConfig {
//发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
public static String produceGroupName;
//消费者分组
public static String consumerGroupName;
// mq的nameserver地址
public static String namesrvAddr;
// 主题
public static String topic;
//消息最大长度 默认1024*4(4M)
public static Integer producerMaxMessageSize;
//发送消息超时时间,默认3000
public static Integer producerSendMsgTimeout;
//发送消息失败重试次数,默认2
public static Integer producerRetryTimesWhenSendFailed;
//消费者线程数量
public static Integer consumeThreadMin;
public static Integer consumeThreadMax;
//设置一次消费消息的条数,默认为1条
public static Integer consumeMessageBatchMaxSize;
@Value("${rocketmq.produceGroupName}")
public void setProduceGroupName(String produceGroupName) {
RocketMqConfig.produceGroupName = produceGroupName;
}
@Value("${rocketmq.consumerGroupName}")
public void setConsumerGroupName(String consumerGroupName) {
RocketMqConfig.consumerGroupName = consumerGroupName;
}
@Value("${rocketmq.namesrvAddr}")
public void setNamesrvAddr(String namesrvAddr) {
RocketMqConfig.namesrvAddr = namesrvAddr;
}
@Value("${rocketmq.topic}")
public void setTopic(String topic) {
RocketMqConfig.topic = topic;
}
@Value("${rocketmq.producer.maxMessageSize}")
public void setProducerMaxMessageSize(Integer producerMaxMessageSize) {
RocketMqConfig.producerMaxMessageSize = producerMaxMessageSize;
}
@Value("${rocketmq.producer.sendMsgTimeout}")
public void setProducerSendMsgTimeout(Integer producerSendMsgTimeout) {
RocketMqConfig.producerSendMsgTimeout = producerSendMsgTimeout;
}
@Value("${rocketmq.producer.retryTimesWhenSendFailed}")
public void setProducerRetryTimesWhenSendFailed(Integer producerRetryTimesWhenSendFailed) {
RocketMqConfig.producerRetryTimesWhenSendFailed = producerRetryTimesWhenSendFailed;
}
@Value("${rocketmq.consumer.consumeThreadMin}")
public void setConsumeThreadMin(Integer consumeThreadMin) {
RocketMqConfig.consumeThreadMin = consumeThreadMin;
}
@Value("${rocketmq.consumer.consumeThreadMax}")
public void setConsumeThreadMax(Integer consumeThreadMax) {
RocketMqConfig.consumeThreadMax = consumeThreadMax;
}
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
public void setConsumeMessageBatchMaxSize(Integer consumeMessageBatchMaxSize) {
RocketMqConfig.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
}
}
2、生产者定义
import xxx.configuration.RocketMqConfig;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* RocketMQ 生产着
* @Author YHL
* @Date 2020/7/23 16:14
* @Version 1.0
*/
public class BaseServiceMqProducer {
private static Logger log = LoggerFactory.getLogger(BaseServiceMqProducer.class);
private static DefaultMQProducer producer = new DefaultMQProducer(RocketMqConfig.produceGroupName);
private static int initialState = 0;
private BaseServiceMqProducer() {
}
public static DefaultMQProducer getDefaultMQProducer(){
if(producer == null){
producer = new DefaultMQProducer(RocketMqConfig.produceGroupName);
}
if(initialState == 0){
producer.setNamesrvAddr(RocketMqConfig.namesrvAddr);
//消息最大长度 默认1024*4(4M)
producer.setMaxMessageSize(RocketMqConfig.producerMaxMessageSize);
//发送消息超时时间
producer.setSendMsgTimeout(RocketMqConfig.producerSendMsgTimeout);
// 如果发送消息失败,设置重试次数,默认为2次
producer.setRetryTimesWhenSendAsyncFailed(RocketMqConfig.producerRetryTimesWhenSendFailed);
try {
producer.start();
log.info("rocketmq-producer 启动成功---------------------------------------");
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
initialState = 1;
}
return producer;
}
}
3、生产者工具类
import xxx.component.BaseServiceMqProducer;
import xxx.configuration.RocketMqConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
public class RocketMqProducerUtil {
private static Logger log = LoggerFactory.getLogger(RocketMqProducerUtil.class);
private static String tag = "videoTag";
/**
* 发送正常消息
* @param msg
*/
public static String sendMsg(String msg){
// 获取消息生产者
DefaultMQProducer producer = BaseServiceMqProducer.getDefaultMQProducer();
try {
Message message = new Message(RocketMqConfig.topic,tag, msg.getBytes("UTF-8")); // body
// 日志收集,要求没有那么高,只需要单项发送即可
SendResult sendResult = producer.send(message);
SendStatus sendStatus = sendResult.getSendStatus();
if (sendStatus.equals(SendStatus.SEND_OK)) {
log.info("消息发送成功,msg:{}",msg);
return "成功";
} else if (sendStatus.equals(SendStatus.FLUSH_DISK_TIMEOUT)) {
log.info("消息发送失败,消息刷盘失败,msg:{}",msg);
return "消息发送失败,消息刷盘失败";
} else if (sendStatus.equals(SendStatus.FLUSH_SLAVE_TIMEOUT)) {
log.info("消息发送失败,主从服务器同步超时,msg:{}",msg);
return "消息发送失败,主从服务器同步超时";
} else if (sendStatus.equals(SendStatus.SLAVE_NOT_AVAILABLE)) {
log.info("消息发送失败,Broker不可用,msg:{}",msg);
return "消息发送失败,Broker不可用";
} else {
log.info("消息发送返回未知状态,msg:{}",msg);
}
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RemotingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
// producer.shutdown();
return "消息发送返回未知状态";
}
}
4、消费者定义
import xxx.configuration.RocketMqConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
public class BaseServiceMqConsumer {
private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConfig.consumerGroupName);
private static int initialState = 0;
private BaseServiceMqConsumer() {
}
public static DefaultMQPushConsumer getDefaultMQPushConsumer(){
if(consumer == null){
consumer = new DefaultMQPushConsumer(RocketMqConfig.consumerGroupName);
}
if(initialState == 0){
consumer.setNamesrvAddr(RocketMqConfig.namesrvAddr);
//消费者线程数量
consumer.setConsumeThreadMin(RocketMqConfig.consumeThreadMin);
consumer.setConsumeThreadMax(RocketMqConfig.consumeThreadMax);
//设置一次消费消息的条数,默认为1条
consumer.setConsumeMessageBatchMaxSize(RocketMqConfig.consumeMessageBatchMaxSize);
//一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
initialState = 1;
}
return consumer;
}
}
5、消费者工具类
import xxx.component.BaseServiceMqConsumer; import xxx.configuration.RocketMqConfig; import xxx.service.v1.VideoConsumerService; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; @Component public class RocketMqConsumerUtil { private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class); @Autowired private VideoConsumerService videoConsumerService; private static String tag = "videoTag"; /** * 接收消息 */ public void listener(){ // 获取消息生产者 DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer(); // 订阅主体 try { consumer.subscribe(RocketMqConfig.topic, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 */ public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); String msg = null; try { msg = new String(messageExt.getBody(),"utf-8"); } catch (UnsupportedEncodingException e) { log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody())); e.printStackTrace(); } log.info("消费者-bornHost:{},storeHost:{}. groupName:{},topic:{}",messageExt.getBornHost(),messageExt.getStoreHost(),consumer.getConsumerGroup(),RocketMqConfig.topic); log.info("消费开始-MsgBody:{}",msg); // String msg = new String(messageExt.getBody()); // log.info("MsgBody:{}",new String(messageExt.getBody())); if (messageExt.getTopic().equals(RocketMqConfig.topic)) { // topic的消费逻辑 if (messageExt.getTags() != null && messageExt.getTags().equals(tag)) { // 根据Tag消费消息,具体消费消息的业务方法 videoConsumerService.dealVideoMsg(msg); } } else if (messageExt.getTopic().equals("TopicTest2")) { // 执行TopicTest2的消费逻辑 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> */ consumer.start(); log.info("rocketmq-consumer 启动成功---------------------------------------"); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
四、调用说明
1、生产者工具类可以直接调用
@RequestMapping(value = "/test", method = RequestMethod.POST)
public void tes(@RequestParam("msg") String msg) {
RocketMqProducerUtil.sendMsg(msg);
}
2、消费者工具类需要在项目启动时候初始化一下才可以调用,可以在主启动类上直接调用
public static void main(String[] args) {
SpringApplication.run(Jnwsn4residentapiApplication.class, args);
RocketMqConsumerUtil rocketMqConsumerUtil = new RocketMqConsumerUtil();
rocketMqConsumerUtil.listener();
}
也可以自定义初始化,需要实现CommandLineRunner接口
import xxx.util.RocketMqConsumerUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class TestRunner implements CommandLineRunner {
@Autowired
private RocketMqConsumerUtil rocketMqConsumerUtil;
@Override
public void run(String... args) throws Exception {
rocketMqConsumerUtil.listener();
}
}
最后,就可以在消费者工具类中注入自己需要的业务处理Service了 ,把VideoConsumerServie注入替换掉就好