原创转载请注明出处:https://www.cnblogs.com/agilestyle/p/11795256.html
RMQ Fanout
Project Directory
Maven Dependency
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>org.fool.rmq</groupId> 8 <artifactId>rmq</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <parent> 12 <groupId>org.springframework.boot</groupId> 13 <artifactId>spring-boot-starter-parent</artifactId> 14 <version>2.2.0.RELEASE</version> 15 </parent> 16 17 <dependencies> 18 <dependency> 19 <groupId>org.springframework.boot</groupId> 20 <artifactId>spring-boot-starter-amqp</artifactId> 21 </dependency> 22 23 <dependency> 24 <groupId>org.springframework.boot</groupId> 25 <artifactId>spring-boot-starter-web</artifactId> 26 </dependency> 27 28 <dependency> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter-test</artifactId> 31 <scope>test</scope> 32 </dependency> 33 </dependencies> 34 35 <build> 36 <plugins> 37 <plugin> 38 <groupId>org.springframework.boot</groupId> 39 <artifactId>spring-boot-maven-plugin</artifactId> 40 </plugin> 41 </plugins> 42 </build> 43 </project>View Code
application.properties
1 spring.application.name=rmq 2 server.port=8888 3 4 spring.rabbitmq.host=localhost 5 spring.rabbitmq.port=5672 6 spring.rabbitmq.username=admin 7 spring.rabbitmq.password=admin 8 9 #direct part start 10 rmq.config.exchange=log.direct 11 rmq.config.queue.info=log.info 12 rmq.config.queue.info.routing.key=log.info.routing.key 13 rmq.config.queue.error=log.error 14 rmq.config.queue.error.routing.key=log.error.routing.key 15 #direct part end 16 17 #topic part start 18 rmq.config.exchange.topic=log.topic 19 rmq.config.queue.topic.info=log.topic.info 20 rmq.config.queue.topic.error=log.topic.error 21 rmq.config.queue.topic.all=log.topic.all 22 23 rmq.config.queue.user.info.routing.key=user.log.info 24 rmq.config.queue.user.debug.routing.key=user.log.debug 25 rmq.config.queue.user.warn.routing.key=user.log.warn 26 rmq.config.queue.user.error.routing.key=user.log.error 27 28 rmq.config.queue.product.info.routing.key=product.log.info 29 rmq.config.queue.product.debug.routing.key=product.log.debug 30 rmq.config.queue.product.warn.routing.key=product.log.warn 31 rmq.config.queue.product.error.routing.key=product.log.error 32 33 rmq.config.queue.order.info.routing.key=order.log.info 34 rmq.config.queue.order.debug.routing.key=order.log.debug 35 rmq.config.queue.order.warn.routing.key=order.log.warn 36 rmq.config.queue.order.error.routing.key=order.log.error 37 #topic part end 38 39 #fanout part start 40 rmq.config.exchange.fanout=order.fanout 41 rmq.config.queue.sms=order.sms 42 rmq.config.queue.push=order.push 43 #fanout part end
Note: fanout模式下不需要设置routing key
Source Code
Application.java
1 package org.fool.rmq; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 @SpringBootApplication 7 public class Application { 8 public static void main(String[] args) { 9 SpringApplication.run(Application.class, args); 10 } 11 }
RmqConfig.java
1 package org.fool.rmq.config; 2 3 import org.springframework.amqp.core.Queue; 4 import org.springframework.context.annotation.Bean; 5 import org.springframework.context.annotation.Configuration; 6 7 @Configuration 8 public class RmqConfig { 9 10 @Bean 11 public Queue queue() { 12 return new Queue("hello-rmq"); 13 } 14 }
OrderFanoutProducer.java
1 package org.fool.rmq.fanout; 2 3 import org.springframework.amqp.core.AmqpTemplate; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.beans.factory.annotation.Value; 6 import org.springframework.stereotype.Component; 7 8 import java.util.Date; 9 10 @Component 11 public class OrderFanoutProducer { 12 @Autowired 13 private AmqpTemplate rmqTemplate; 14 15 @Value("${rmq.config.exchange.fanout}") 16 private String exchange; 17 18 public void send() { 19 String message = "Hello " + new Date(); 20 rmqTemplate.convertAndSend(exchange, "", message); 21 } 22 }
SmsConsumer.java
1 package org.fool.rmq.fanout; 2 3 import org.springframework.amqp.core.ExchangeTypes; 4 import org.springframework.amqp.rabbit.annotation.Exchange; 5 import org.springframework.amqp.rabbit.annotation.Queue; 6 import org.springframework.amqp.rabbit.annotation.QueueBinding; 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler; 8 import org.springframework.amqp.rabbit.annotation.RabbitListener; 9 import org.springframework.stereotype.Component; 10 11 @Component 12 @RabbitListener(bindings = @QueueBinding( 13 value = @Queue(value = "${rmq.config.queue.sms}", autoDelete = "true"), 14 exchange = @Exchange(value = "${rmq.config.exchange.fanout}", type = ExchangeTypes.FANOUT) 15 )) 16 public class SmsConsumer { 17 @RabbitHandler 18 public void handle(String message) { 19 System.out.println("consume sms: " + message); 20 } 21 }
PushConsumer.java
1 package org.fool.rmq.fanout; 2 3 import org.springframework.amqp.core.ExchangeTypes; 4 import org.springframework.amqp.rabbit.annotation.Exchange; 5 import org.springframework.amqp.rabbit.annotation.Queue; 6 import org.springframework.amqp.rabbit.annotation.QueueBinding; 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler; 8 import org.springframework.amqp.rabbit.annotation.RabbitListener; 9 import org.springframework.stereotype.Component; 10 11 @Component 12 @RabbitListener(bindings = @QueueBinding( 13 value = @Queue(value = "${rmq.config.queue.push}", autoDelete = "true"), 14 exchange = @Exchange(value = "${rmq.config.exchange.fanout}", type = ExchangeTypes.FANOUT) 15 )) 16 public class PushConsumer { 17 @RabbitHandler 18 public void handle(String message) { 19 System.out.println("consume push: " + message); 20 } 21 }
ApplicationTest.java
1 package org.fool.rmq.test; 2 3 import org.fool.rmq.Application; 4 import org.fool.rmq.direct.LogProducer; 5 import org.fool.rmq.fanout.OrderFanoutProducer; 6 import org.fool.rmq.producer.Producer; 7 import org.fool.rmq.topic.OrderProducer; 8 import org.fool.rmq.topic.ProductProducer; 9 import org.fool.rmq.topic.UserProducer; 10 import org.junit.Test; 11 import org.junit.runner.RunWith; 12 import org.springframework.beans.factory.annotation.Autowired; 13 import org.springframework.boot.test.context.SpringBootTest; 14 import org.springframework.test.context.junit4.SpringRunner; 15 16 @RunWith(SpringRunner.class) 17 @SpringBootTest(classes = Application.class) 18 public class ApplicationTest { 19 20 @Autowired 21 private Producer producer; 22 23 @Autowired 24 private LogProducer logProducer; 25 26 @Autowired 27 private UserProducer userProducer; 28 29 @Autowired 30 private ProductProducer productProducer; 31 32 @Autowired 33 private OrderProducer orderProducer; 34 35 @Autowired 36 private OrderFanoutProducer orderFanoutProducer; 37 38 @Test 39 public void test() { 40 producer.send(); 41 } 42 43 @Test 44 public void testDirect() { 45 logProducer.send(); 46 } 47 48 @Test 49 public void testTopic() { 50 userProducer.send(); 51 productProducer.send(); 52 orderProducer.send(); 53 } 54 55 @Test 56 public void testFanout() { 57 orderFanoutProducer.send(); 58 } 59 }
Console Output
RMQ Management