方案1:spring-cloud-starter-stream-rabbit
一、添加依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
二、添加配置(application.yml)
spring: application: name: demo1 rabbitmq: host: 10.10.10.10 port: 5672 username: guest password: guest virtualHost: /dev-test
三、创建操作类
生产者:
import lombok.RequiredArgsConstructor; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor public class MqSender { final AmqpTemplate template; /** * 发送消息的方法 * * @param msg */ public void send(String msg) { //向消息队列发送消息 //参数一:交换器名称。 //参数二:路由键 //参数三:消息 this.template.convertAndSend("test", "hzq", msg); } }View Code
消费者:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Slf4j @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "test", autoDelete = "false"), exchange = @Exchange(value = "test", type = ExchangeTypes.DIRECT), key = "test" )) public class MqListener { @RabbitHandler public void process(String msg) { log.info("mq接收到信息:{}", msg); } @RabbitHandler public void process(byte[] content) { try { log.info("mq接收到信息:{}", new String(content,"UTF-8")); } catch (Exception ex){ log.error("mq接收到信息异常"); } } }View Code
四、使用
@Slf4j @RestController @RequestMapping("/test") public class TestController { @Resource MqSender mqSender; // rabbit测试 @GetMapping("/rabbit/set") public String rabbitSet(){ log.info("添加rabbit开始。"); String msg = String.format("发送信息,当前时间戳:{%s}", System.currentTimeMillis()); mqSender.send(msg); log.info("添加rabbit结束。"); return "添加rabbit结束。"; } }View Code
方案2:spring-boot-starter-amqp
一、添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二、添加配置(application.yml)
2.1 添加连接配置信息(application.yml) 允许连接多个mq服务
rabbitmq: default: host: 10.10.10.10 port: 5672 username: guest password: guest vhost: /dev-test other: host: 10.10.10.11 port: 5672 username: guest password: guest vhost: /sit-test
2.2 创建配置类
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; /** * rabbitMq 配置 */ @Configuration @Slf4j public class RabbitMqConfig { // 默认系统的MQ连接配置信息 @Value("${rabbitmq.default.host}") private String host; @Value("${rabbitmq.default.port}") private int port; @Value("${rabbitmq.default.username}") private String username; @Value("${rabbitmq.default.password}") private String password; @Value("${rabbitmq.default.vhost}") private String vhost; // other系统的MQ连接配置信息 @Value("${rabbitmq.other.host}") private String otherHost; @Value("${rabbitmq.other.port}") private int otherPort; @Value("${rabbitmq.other.username}") private String otherUserName; @Value("${rabbitmq.other.password}") private String otherPassWord; @Value("${rabbitmq.other.vhost}") private String otherVhost; /** * 重试配置-初始间隔 */ @Value("${spring.rabbitmq.listener.simple.retry.initial-interval:6000}") private Long initialInterval; /** * 重试配置-最大尝试次数 */ @Value("${spring.rabbitmq.listener.simple.retry.max-attempts:3}") private Integer maxAttempts; // 默认系统的MQ工厂 /** * 默认系统的MQ工厂 * @param configurer * @param connectionFactory * @return */ @Bean(name = "defaultFactory") public SimpleRabbitListenerContainerFactory defaultFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); // factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } @Bean(name="defaultRabbitTemplate") @Primary public RabbitTemplate defaultRabbitTemplate() { RabbitTemplate template = new RabbitTemplate(defaultConnectionFactory()); template.setRetryTemplate(retryTemplate()); // template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } /** * 设置成手动模式的原因, 不在使用rabbit的默认分发方式(轮询的方式), 而使用公平的方式。 使用公平的方式可以实现负载均衡。 * 具体实现:basicQos( prefetchCount=1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。 * * @return ConnectionFactory */ @Bean(name="defaultConnectionFactory") @Primary public ConnectionFactory defaultConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); //设置手动确认模式 connectionFactory.setPublisherConfirms(true); return connectionFactory; } // vbs系统的MQ工厂 /** * vbs系统的MQ工厂 * @param configurer * @param connectionFactory * @return */ @Bean(name = "otherFactory") public SimpleRabbitListenerContainerFactory vbsFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("otherConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(name="otherRabbitTemplate") public RabbitTemplate vbsrabbitTemplate() { RabbitTemplate template = new RabbitTemplate(otherConnectionFactory()); template.setRetryTemplate(retryTemplate()); return template; } @Bean(name="otherConnectionFactory") public ConnectionFactory otherConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(otherHost, otherPort); connectionFactory.setUsername(otherUserName); connectionFactory.setPassword(otherPassWord); connectionFactory.setVirtualHost(otherVhost); //设置手动确认模式 connectionFactory.setPublisherConfirms(true); return connectionFactory; } /** * 构建rabbitMQ重试模板 * @return RetryTemplate */ private RetryTemplate retryTemplate(){ RetryTemplate retryTemplate = new RetryTemplate(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(initialInterval); retryTemplate.setBackOffPolicy(backOffPolicy); retryTemplate.setRetryPolicy(new SimpleRetryPolicy(maxAttempts)); return retryTemplate; } // // /** // * 初始化默认系统MQ路由配置 // */ // @Bean // public RabbitAdmin rabbitAdmin() { // RabbitAdmin rabbitAdmin = new RabbitAdmin(defaultConnectionFactory()); // rabbitAdmin.setAutoStartup(true); // //创建队列和交换机,并绑定 // creactQueueAndChange(rabbitAdmin); // return rabbitAdmin; // } // // /** // * 默认系统中所有的MQ路由配置信息 // */ // @Value("${rabbitInfo:{}}") // private String rabbitInfo; // // /** // * 默认交换机名称 // */ // private static final String defualtExchange = "test"; // /** // * 创建队列和交换机,并绑定 // * @param rabbitAdmin // */ // private void creactQueueAndChange(RabbitAdmin rabbitAdmin){ // rabbitAdmin.declareExchange(new DirectExchange(defualtExchange,true,false)); // Map<String, Object> arguments = new HashMap<>(); // arguments.put("x-max-priority",10); // 设置队列优先级级别 // List<RabbitObject> rabbitObjectDTOList = JSONObject.parseArray(rabbitInfo,RabbitObject.class); // for(RabbitObject rabbitObjectDTO:rabbitObjectDTOList){ // rabbitAdmin.declareQueue(new Queue(rabbitObjectDTO.queueName,true,false,false,arguments)); // rabbitAdmin.declareBinding(new Binding(rabbitObjectDTO.queueName, Binding.DestinationType.QUEUE, rabbitObjectDTO.exchangeName, rabbitObjectDTO.routingKey, null)); // } // } }View Code
三、创建操作类
生产者:
import com.alibaba.fastjson.JSONObject; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import java.util.UUID; /** * rabbit 发送实例 */ @Service @Log4j2 public class RabbitSendServiceImpl implements RabbitSendService, RabbitTemplate.ConfirmCallback { private RabbitTemplate rabbitTemplate; /** * 构造方法注入rabbitTemplate */ @Autowired public RabbitSendServiceImpl(@Qualifier("defaultRabbitTemplate") RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); } @Override public void sendMsg(String exchangeName, String routeName, Object message, String messageId) { CorrelationData correlationId = new CorrelationData(messageId); rabbitTemplate.convertAndSend(exchangeName, routeName, JSONObject.toJSONString(message), correlationId); } // @Override // public void sendMsg(MqInfoEnum mqInfoEnum, Object message, String messageId) { // CorrelationData correlationId = new CorrelationData(messageId); // rabbitTemplate.convertAndSend(mqInfoEnum.getExchangeName(), mqInfoEnum.getRouteName(), JSONObject.toJSONString(message), // correlationId); // } @Override public void sendMsg(String exchangeName, String routeName, Object message) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(exchangeName, routeName, JSONObject.toJSONString(message), correlationId); } // @Override // public void sendMsg(MqInfoEnum mqInfoEnum, Object message) { // CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); // rabbitTemplate.convertAndSend(mqInfoEnum.getExchangeName(), mqInfoEnum.getRouteName(), JSONObject.toJSONString(message), // correlationId); // } // @Override // public void sendDefaultMsg(Object message, String messageId) { // CorrelationData correlationId = new CorrelationData(messageId); // rabbitTemplate.convertAndSend(MqInfoEnum.VTS_DEFAULT.getExchangeName(), MqInfoEnum.VTS_DEFAULT.getRouteName(), // JSONObject.toJSONString(message), // correlationId); // } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if (b) { log.info("回调[{}]结果:消息成功消费", correlationData); } else { log.info("回调[{}]结果:消息消费失败:{}", correlationData, s); } } }View Code
消费者:
公共类
import hzq.maven.demo.service.rabbit.RabbitListenService; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.stereotype.Service; import java.io.UnsupportedEncodingException; @Service public abstract class RabbitListenServiceImpl implements RabbitListenService { /** * 监听 * * @param content mq消息 */ @RabbitHandler public void process(String content) { try { execute(content); } catch (Exception e) { // 保存异常消息到消息补偿表中,根据具体要求实现 // 必须将异常抛出去,不然是不会触发rabbit unack throw e; } } @RabbitHandler public void process(byte[] content) { try { String result = new String(content,"UTF-8"); execute(result); } catch (Exception e) { // 保存异常消息到消息补偿表中,根据具体要求实现 // 必须将异常抛出去,不然是不会触发rabbit unack try { throw e; } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } } } /** * 接受消息 * @param message 消息体 */ public abstract void execute(String message); }View Code 子类
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import hzq.maven.demo.model.dto.TestMqDataDTO; import hzq.maven.demo.service.business.TestService; import hzq.maven.demo.service.rabbit.impl.RabbitListenServiceImpl; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 测试消息队列 */ @Slf4j @Component @RabbitListener(queues = "test",containerFactory="defaultFactory") public class TestListen extends RabbitListenServiceImpl { @Resource private TestService testService; public void execute(String message) { TestMqDataDTO rabbitMqDataDTO; if(JSON.isValid(message)){ JSONObject jsonObject = JSON.parseObject(message); rabbitMqDataDTO = jsonObject.toJavaObject(TestMqDataDTO.class); } else{ rabbitMqDataDTO = new TestMqDataDTO(); rabbitMqDataDTO.setName(message); } testService.mqListen(rabbitMqDataDTO); } }View Code
四、使用
import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.Resource; @Slf4j @Service @RequiredArgsConstructor public class TestServiceImpl implements TestService { @Resource private RabbitSendService rabbitSendService; @Override public void mqSend(TestMqDataDTO testMqDataDTO){ //推送信息到MQ rabbitSendService.sendMsg("test","hzq",testMqDataDTO); log.info("mq发送数据成功:" + testMqDataDTO.toString()); } @Override public void mqListen(TestMqDataDTO testMqDataDTO){ //从MQ接收信息 log.info("mq消费数据成功:" + testMqDataDTO.toString()); // do... } }View Code