说明:老项目,使用的是spring 3项目,需要对接RocketMQ,配置完之后,在消费者监听方法中,发现业务处理service注入不进来,最后检查发现是因为消费者监听工具类没有被正确的初始化,所以它里边的业务service注入之后是个null,于是各种折腾,特此记录一下
方式一:
解决:对需要初始化的类实现InitializingBean接口,重写afterPropertiesSet()方法,在afterPropertiesSet方法中调用需要被初始化的方法
代码如下:
import xx.xxx.component.BaseServiceMqConsumer; import xx.xxx.service.VideoConsumerService; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; @DependsOn("RocketMqConfig") @Component public class RocketMqConsumerUtil implements InitializingBean { private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class); @Autowired private VideoConsumerService videoConsumerService; /** * 接收消息 */ public void listener(){ // 获取消息生产者 DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer(); // 订阅主体 try { consumer.subscribe(RocketMqUtil.topic, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 */ public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); String msg = null; try { msg = new String(messageExt.getBody(),"utf-8"); } catch (UnsupportedEncodingException e) { log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody())); e.printStackTrace(); } log.info("消费开始-MsgBody:{}",msg); // String msg = new String(messageExt.getBody()); // log.info("MsgBody:{}",new String(messageExt.getBody())); if (messageExt.getTopic().equals(RocketMqUtil.topic)) { // topic的消费逻辑 if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) { // 根据Tag消费消息,具体消费消息的业务方法 videoConsumerService.dealVideoMsg(msg); } } else if (messageExt.getTopic().equals("TopicTest2")) { // 执行TopicTest2的消费逻辑 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> */ consumer.start(); log.info("rocketmq-consumer 启动成功---------------------------------------"); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void afterPropertiesSet() throws Exception { listener();//调用需要被初始化的方法 } }
方式二:
使用注解@PostContruct 指定需要被初始化执行的方法
package net.greatsoft.xxx.utils; import xxx.xxx.component.BaseServiceMqConsumer; import net.greatsoft.xxx.service.VideoConsumerService; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.UnsupportedEncodingException; import java.util.List; @DependsOn("RocketMqConfig") @Component public class RocketMqConsumerUtil { private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class); @Autowired private VideoConsumerService videoConsumerService; /** * 接收消息8 */ @PostConstruct public void listener(){ // 获取消息生产者 DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer(); // 订阅主体 try { consumer.subscribe(RocketMqUtil.topic, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 */ public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); String msg = null; try { msg = new String(messageExt.getBody(),"utf-8"); } catch (UnsupportedEncodingException e) { log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody())); e.printStackTrace(); } log.info("消费开始-MsgBody:{}",msg); if (messageExt.getTopic().equals(RocketMqUtil.topic)) { // topic的消费逻辑 if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) { // 根据Tag消费消息,具体消费消息的业务方法 videoConsumerService.dealVideoMsg(msg); } } else if (messageExt.getTopic().equals("TopicTest2")) { // 执行TopicTest2的消费逻辑 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> */ consumer.start(); log.info("rocketmq-consumer 启动成功---------------------------------------"); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
方式三:
在spring的xml配置文件中使用 <Bean>的init 属性来执行初始化的Bean
<bean id="rocketMqConsumerUtil" class="xx.xxx.utils.RocketMqConsumerUtil" scope="singleton" init-method="listener"/>
package net.greatsoft.jinNanHealth.utils; import net.greatsoft.jinNanHealth.component.BaseServiceMqConsumer; import net.greatsoft.jinNanHealth.service.VideoConsumerService; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.UnsupportedEncodingException; import java.util.List; /** * @author xc * @date 2020-07-23 */ @DependsOn("RocketMqUtil") @Component public class RocketMqConsumerUtil { private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class); @Autowired private VideoConsumerService videoConsumerService; /** * 接收消息8 */ public void listener(){ // 获取消息生产者 DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer(); // 订阅主体 try { consumer.subscribe(RocketMqUtil.topic, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 */ public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); String msg = null; try { msg = new String(messageExt.getBody(),"utf-8"); } catch (UnsupportedEncodingException e) { log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody())); e.printStackTrace(); } log.info("消费开始-MsgBody:{}",msg); if (messageExt.getTopic().equals(RocketMqUtil.topic)) { // topic的消费逻辑 if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) { // 根据Tag消费消息,具体消费消息的业务方法 videoConsumerService.dealVideoMsg(msg); } } else if (messageExt.getTopic().equals("TopicTest2")) { // 执行TopicTest2的消费逻辑 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> */ consumer.start(); log.info("rocketmq-consumer 启动成功---------------------------------------"); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }