package com.itheima.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import com.itheima.domain.User; @Service public class RabbitMQService { /** * Publish/Subscribe工作模式接收,处理邮件业务 * * @param message */ @RabbitListener(queues = "fanout_queue_email") public void psubConsumerEmail(Message message) { byte[] body = message.getBody(); String s = new String(body); System.out.println("邮件业务接收到消息: " + s); } /** * Publish/Subscribe工作模式接收,处理短信业务 * * @param message */ @RabbitListener(queues = "fanout_queue_sms") public void psubConsumerSms(Message message) { byte[] body = message.getBody(); String s = new String(body); System.out.println("短信业务接收到消息: " + s); } /** * Publish/Subscribe工作模式接收,处理邮件业务 * * @param message */ @RabbitListener(queues = "my_fanout_queue_email") public void my_psubConsumerEmail(Message message) { byte[] body = message.getBody(); String s = new String(body); System.out.println("邮件业务接收到消息: " + s); } /** * Publish/Subscribe工作模式接收,处理短信业务 * * @param message */ @RabbitListener(queues = "my_fanout_queue_sms") public void my_psubConsumerSms(Message message) { byte[] body = message.getBody(); String s = new String(body); System.out.println("短信业务接收到消息: " + s); } /** * **使用基于注解的方式实现消息服务 1.1、Publish/Subscribe工作模式接收,处理邮件业务 * * @param user */ @RabbitListener(bindings = @QueueBinding(value = @Queue("my2_fanout_queue_email"), exchange = @Exchange(value = "my2_fanout_exchange", type = "fanout"))) public void psubConsumerEmailAno(User user) { System.out.println("邮件业务接收到消息: " + user); } /** * 1.2、Publish/Subscribe工作模式接收,处理短信业务 * * @param user */ @RabbitListener(bindings = @QueueBinding(value = @Queue("my2_fanout_queue_sms"), exchange = @Exchange(value = "my2_fanout_exchange", type = "fanout"))) public void psubConsumerSmsAno(User user) { System.out.println("短信业务接收到消息: " + user); } /** * 2.1、路由模式消息接收,处理error级别日志信息 * * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_error"), exchange = @Exchange(value = "routing_exchange", type = "direct"), key = "error_routing_key")) public void routingConsumerError(String message) { System.out.println("接收到error级别日志消息: " + message); } /** * 2.2、路由模式消息接收,处理info、error、warning级别日志信息 * * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_all"), exchange = @Exchange(value = "routing_exchange", type = "direct"), key = { "error_routing_key", "info_routing_key", "warning_routing_key" })) public void routingConsumerAll(String message) { System.out.println("接收到info、error、warning等级别日志消息: " + message); } /** * 3.1、通配符模式消息接收,进行邮件业务订阅处理 * * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_email"), exchange = @Exchange(value = "topic_exchange", type = "topic"), key = "info.#.email.#")) public void topicConsumerEmail(String message) { System.out.println("接收到邮件订阅需求处理消息: " + message); } /** * 3.2、通配符模式消息接收,进行短信业务订阅处理 * * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_sms"), exchange = @Exchange(value = "topic_exchange", type = "topic"), key = "info.#.sms.#")) public void topicConsumerSms(String message) { System.out.println("接收到短信订阅需求处理消息: " + message); } }
package com.itheima; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.itheima.domain.User; @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class ApplicationTest { @Autowired private AmqpAdmin amqpAdmin; @Autowired private RabbitTemplate rabbitTemplate; /** * 使用AmqpAdmin管理员API定制消息组件 */ @Test public void amqpAdmin() { // 1、定义fanout类型的交换器 amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange")); // 2、定义两个默认持久化队列,分别处理email和sms amqpAdmin.declareQueue(new Queue("fanout_queue_email")); amqpAdmin.declareQueue(new Queue("fanout_queue_sms")); // 3、将队列分别与交换器进行绑定 amqpAdmin.declareBinding( new Binding("fanout_queue_email", Binding.DestinationType.QUEUE, "fanout_exchange", "", null)); amqpAdmin.declareBinding( new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE, "fanout_exchange", "", null)); } /** * 1、Publish/Subscribe工作模式消息发送端 */ @Test public void psubPublisher() { User user = new User(); user.setId(1); user.setUsername("石头"); rabbitTemplate.convertAndSend("fanout_exchange", "", user); } /** * 1、Publish/Subscribe工作模式消息发送端 */ @Test public void my_psubPublisher() { User user = new User(); user.setId(1); user.setUsername("天生自然"); rabbitTemplate.convertAndSend("my_fanout_exchange", "", user); } @Test public void my2_psubPublisher() { User user = new User(); user.setId(1); user.setUsername("天生自然--天生自然"); rabbitTemplate.convertAndSend("my2_fanout_exchange", "", user); } /** * 2、Routing工作模式消息发送端 */ @Test public void routingPublisher() { rabbitTemplate.convertAndSend("routing_exchange", "error_routing_key", "routing send error message"); } @Test public void info_routingPublisher() { rabbitTemplate.convertAndSend("routing_exchange", "info_routing_key", "routing send 我的info message"); } /** * 3、Topcis工作模式消息发送端 */ @Test public void topicPublisher() { // 1、只发送邮件订阅用户消息 rabbitTemplate.convertAndSend("topic_exchange", "info.email", "topics send email message"); // 2、只发送短信订阅用户消息 rabbitTemplate.convertAndSend("topic_exchange", "info.sms", "topics send sms message"); // 3、发送同时订阅邮件和短信的用户消息 rabbitTemplate.convertAndSend("topic_exchange", "info.email.sms", "topics send email and sms message"); } }