RabbitMQ--04--发布订阅模式 (fanout)-案例

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 发布订阅模式 (fanout)---案例
    • 前言
      • @RabbitListener和@RabbitHandler的使用
    • 1.通过Spring官网快速创建一个RabbitMQ的生产者项目
    • 2.导入项目后在application.yml文件中配置
    • 3.创建一个RabbitMqConfig配置类
    • 4. 生产者
    • 5.测试生产者创建mq是否成功
        • 访问网址: http://localhost:15672/#/queues
    • 6.再创建一个消费者项目
    • 7. 创建消费者
    • 8.测试消费者接收信息


发布订阅模式 (fanout)—案例

前言

在这里插入图片描述
在这里插入图片描述

@RabbitListener和@RabbitHandler的使用

在这里插入图片描述
在这里插入图片描述

1.通过Spring官网快速创建一个RabbitMQ的生产者项目

在这里插入图片描述

2.导入项目后在application.yml文件中配置

# 服务端口
server:
  port: 8081
  
#配置rabbitmq服务 测试不用写,默认本机
spring:
  rabbitmq:
      username: guest #默认账号
      password: guest #默认密码
      virtual-host: /
      host: localhost
      port: 5672
      #消息确认配置项
      #确认消息已发送到交换机: Exchange
      publisher-confirm-type: correlated
      #确认消息已发送到队列: Queue
      publisher-returns: true


3.创建一个RabbitMqConfig配置类

package com.exam.RebbitMQ.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

	//1:声明注册fanout模式的交换机,参数1:对应的service的fanoutName,参数2:持久化(true,false),参数3:自动删除(false/true)
	@Bean
	public FanoutExchange fanoutExchange(){
		return new  FanoutExchange("fanout_order_exchang", true, false);
	}
	//2:声明队列 sms.fanout.queue,email.fanout.queue,duanxin.fanout.queue
	//参数1:名字,参数2:持久化队列true
	//短信队列
	@Bean
	public Queue smsQueue() {
		System.err.println("执行了sms");
		return new Queue("sms.fanout.queue",true);
	}
	@Bean
	public Queue duanxinQueue() {
		System.err.println("执行了duanxin");
		return new Queue("duanxin.fanout.queue",true);
	}
	//邮箱队列
	@Bean
	public Queue emailQueue() {
		System.err.println("执行了email");
		return new Queue("email.fanout.queue",true);
	}
	//3:完成绑定关系(队列和交换机完成绑定关系)
	@Bean
	public Binding smsBinding() {
		//把smsQueue放到fanoutExchange交换机上面
		return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
	}
	@Bean
	public Binding duanxinBinding() {
		//把duanxinQueue放到fanoutExchange交换机上面
		return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
	}
	@Bean
	public Binding emailBinding() {
		//把emailQueue放到fanoutExchange交换机上面
		return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
	}

}

4. 生产者

  • OrderService
package com.exam.RebbitMQ.service;

public interface OrderService {
	void makeOrder(String userid,String productid,int num);
}

  • OrderServiceImpl
    在这里插入图片描述
package com.exam.RebbitMQ.service.Impl;

import java.util.UUID;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.exam.RebbitMQ.service.OrderService;

@Service
public class OrderServiceImpl implements OrderService{
	
	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	/**
	 * 模拟用户下单 
	 **/
	public void makeOrder(String userid,String productid,int num) {
		//1.根据商品ID查询商品是否充足
		//2.保存订单
		String  orderId = UUID.randomUUID().toString();
		System.err.println("订单生成成功"+orderId);
		//3.通过MQ来完成消息的分发
		//参数1:交换机 参数二:路由key/queue队列名称  参数三:消息内容
		String  exchangName ="fanout_order_exchang";
		String routingKey = "";
		rabbitTemplate.convertAndSend(exchangName, routingKey, orderId);
	}

}


5.测试生产者创建mq是否成功

  • 在项目的test中发送请求
package com.huyi.rabbitmq;

import com.huyi.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitMqApplicationTests {

	@Autowired
	private OrderService orderService;

	@Test
	void contextLoads() {
		orderService.makeOrder("1","1", 18);
	}

}


在这里插入图片描述

访问网址: http://localhost:15672/#/queues

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6.再创建一个消费者项目

在这里插入图片描述

  • yml配置
# 服务端口
server:
  port: 8082

#配置rabbitmq服务 测试不用写,默认本机
spring:
  rabbitmq:
    username: guest #默认账号
    password: guest #默认密码
    virtual-host: /
    host: localhost
    port: 5672
    #消息确认配置项
    #确认消息已发送到交换机: Exchange
    publisher-confirm-type: correlated
    #确认消息已发送到队列: Queue
    publisher-returns: true


7. 创建消费者

  • SmsConsumerService、SmsConsumerServiceImpl
package com.huyi.rabbitmq_consumber.service;

public interface SmsConsumerService {
    void reviceMessage(String message);
}
package com.huyi.rabbitmq_consumber.service.Impl;

import com.huyi.rabbitmq_consumber.service.SmsConsumerService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Component
public class SmsConsumerServiceImpl implements SmsConsumerService {

    //注意:这里要和生产者RabbitMqConfig文件中的名字对应起来
    @RabbitListener(queues = {"sms.fanout.queue"})
    public void reviceMessage(String message) {
        System.err.println("sms_fanout--接收到了订单信息");
    }
}


  • EmailConsumerService、EmailConsumerServiceImpl
package com.huyi.rabbitmq_consumber.service;

public interface EmailConsumerService {
    void reviceMessage(String message);
}

package com.huyi.rabbitmq_consumber.service.Impl;

import com.huyi.rabbitmq_consumber.service.EmailConsumerService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Component
@RabbitListener(queues = {"email.fanout.queue"})
public class EmailConsumerServiceImpl implements EmailConsumerService {
    @RabbitHandler
    public void reviceMessage(String message) {
        System.err.println("Email_fanout--接收到了订单信息"+message);
    }
}


  • DuanxinConsumerService、DuanxinConsumerServiceImpl
package com.huyi.rabbitmq_consumber.service;

public interface DuanxinConsumerService {
    void reviceMessage(String message);
}


package com.huyi.rabbitmq_consumber.service.Impl;


import com.huyi.rabbitmq_consumber.service.DuanxinConsumerService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Component
@RabbitListener(queues = {"duanxin.fanout.queue"})
public class DuanxinConsumerServiceImpl implements DuanxinConsumerService {
    @RabbitHandler
    public void reviceMessage(String message) {
        System.err.println("Duanxin_fanout--接收到了订单信息"+message);
    }

}


8.测试消费者接收信息

  • 启动消费者项目
    在这里插入图片描述
  • 启动生产者项目
    在这里插入图片描述
  • 查看消费者项目是否监听到了生产者的信息
    在这里插入图片描述
上一篇:9.包和工具【go】


下一篇:关于rabbitmq的prefetch机制