提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 发布订阅模式 (fanout)---案例
- 前言
- @RabbitListener和@RabbitHandler的使用
- 1.通过Spring官网快速创建一个RabbitMQ的生产者项目
- 2.导入项目后在application.yml文件中配置
- 3.创建一个RabbitMqConfig配置类
- 4. 生产者
- 5.测试生产者创建mq是否成功
- 访问网址: http://localhost:15672/#/queues
- 6.再创建一个消费者项目
- 7. 创建消费者
- 8.测试消费者接收信息
发布订阅模式 (fanout)—案例
前言
@RabbitListener和@RabbitHandler的使用
1.通过Spring官网快速创建一个RabbitMQ的生产者项目
2.导入项目后在application.yml文件中配置
# 服务端口
server:
port: 8081
#配置rabbitmq服务 测试不用写,默认本机
spring:
rabbitmq:
username: guest #默认账号
password: guest #默认密码
virtual-host: /
host: localhost
port: 5672
#消息确认配置项
#确认消息已发送到交换机: Exchange
publisher-confirm-type: correlated
#确认消息已发送到队列: Queue
publisher-returns: true
3.创建一个RabbitMqConfig配置类
package com.exam.RebbitMQ.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
//1:声明注册fanout模式的交换机,参数1:对应的service的fanoutName,参数2:持久化(true,false),参数3:自动删除(false/true)
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout_order_exchang", true, false);
}
//2:声明队列 sms.fanout.queue,email.fanout.queue,duanxin.fanout.queue
//参数1:名字,参数2:持久化队列true
//短信队列
@Bean
public Queue smsQueue() {
System.err.println("执行了sms");
return new Queue("sms.fanout.queue",true);
}
@Bean
public Queue duanxinQueue() {
System.err.println("执行了duanxin");
return new Queue("duanxin.fanout.queue",true);
}
//邮箱队列
@Bean
public Queue emailQueue() {
System.err.println("执行了email");
return new Queue("email.fanout.queue",true);
}
//3:完成绑定关系(队列和交换机完成绑定关系)
@Bean
public Binding smsBinding() {
//把smsQueue放到fanoutExchange交换机上面
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding duanxinBinding() {
//把duanxinQueue放到fanoutExchange交换机上面
return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding() {
//把emailQueue放到fanoutExchange交换机上面
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
}
4. 生产者
- OrderService
package com.exam.RebbitMQ.service;
public interface OrderService {
void makeOrder(String userid,String productid,int num);
}
- OrderServiceImpl
package com.exam.RebbitMQ.service.Impl;
import java.util.UUID;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.exam.RebbitMQ.service.OrderService;
@Service
public class OrderServiceImpl implements OrderService{
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 模拟用户下单
**/
public void makeOrder(String userid,String productid,int num) {
//1.根据商品ID查询商品是否充足
//2.保存订单
String orderId = UUID.randomUUID().toString();
System.err.println("订单生成成功"+orderId);
//3.通过MQ来完成消息的分发
//参数1:交换机 参数二:路由key/queue队列名称 参数三:消息内容
String exchangName ="fanout_order_exchang";
String routingKey = "";
rabbitTemplate.convertAndSend(exchangName, routingKey, orderId);
}
}
5.测试生产者创建mq是否成功
- 在项目的test中发送请求
package com.huyi.rabbitmq;
import com.huyi.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitMqApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoads() {
orderService.makeOrder("1","1", 18);
}
}
访问网址: http://localhost:15672/#/queues
6.再创建一个消费者项目
- yml配置
# 服务端口
server:
port: 8082
#配置rabbitmq服务 测试不用写,默认本机
spring:
rabbitmq:
username: guest #默认账号
password: guest #默认密码
virtual-host: /
host: localhost
port: 5672
#消息确认配置项
#确认消息已发送到交换机: Exchange
publisher-confirm-type: correlated
#确认消息已发送到队列: Queue
publisher-returns: true
7. 创建消费者
- SmsConsumerService、SmsConsumerServiceImpl
package com.huyi.rabbitmq_consumber.service;
public interface SmsConsumerService {
void reviceMessage(String message);
}
package com.huyi.rabbitmq_consumber.service.Impl;
import com.huyi.rabbitmq_consumber.service.SmsConsumerService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Component
public class SmsConsumerServiceImpl implements SmsConsumerService {
//注意:这里要和生产者RabbitMqConfig文件中的名字对应起来
@RabbitListener(queues = {"sms.fanout.queue"})
public void reviceMessage(String message) {
System.err.println("sms_fanout--接收到了订单信息");
}
}
- EmailConsumerService、EmailConsumerServiceImpl
package com.huyi.rabbitmq_consumber.service;
public interface EmailConsumerService {
void reviceMessage(String message);
}
package com.huyi.rabbitmq_consumber.service.Impl;
import com.huyi.rabbitmq_consumber.service.EmailConsumerService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Component
@RabbitListener(queues = {"email.fanout.queue"})
public class EmailConsumerServiceImpl implements EmailConsumerService {
@RabbitHandler
public void reviceMessage(String message) {
System.err.println("Email_fanout--接收到了订单信息"+message);
}
}
- DuanxinConsumerService、DuanxinConsumerServiceImpl
package com.huyi.rabbitmq_consumber.service;
public interface DuanxinConsumerService {
void reviceMessage(String message);
}
package com.huyi.rabbitmq_consumber.service.Impl;
import com.huyi.rabbitmq_consumber.service.DuanxinConsumerService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Component
@RabbitListener(queues = {"duanxin.fanout.queue"})
public class DuanxinConsumerServiceImpl implements DuanxinConsumerService {
@RabbitHandler
public void reviceMessage(String message) {
System.err.println("Duanxin_fanout--接收到了订单信息"+message);
}
}
8.测试消费者接收信息
- 启动消费者项目
- 启动生产者项目
- 查看消费者项目是否监听到了生产者的信息