最新版本的RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。本文用SpringBoot工程具体介绍四种交换机的使用。
一、Direct Exchange
需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
@Bean
DirectExchange directExchange(){
return new DirectExchange("direct_change");
}
@Bean
Queue queue1(){
return new Queue(Direct_QUEUE_NAME,true,false,false,null);
}
@Bean
Queue queue2(){
return new Queue(Direct_QUEUE_NAME2,true,false,false,null);
}
@Bean
Binding binding(){
return BindingBuilder.bind(queue1()).to(directExchange()).with("a");
}
@Bean
Binding binding2(){
return BindingBuilder.bind(queue2()).to(directExchange()).with("b");
}
二、Fanout Exchange
发送到交换机的消息会被转发到与该交换机绑定的所有队列上。消息广播。Fanout交换机转发消息是最快的。
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout_change");
}
@Bean
Queue queue3(){
return new Queue(Direct_QUEUE_NAME3,true,false,false,null);
}
@Bean
Queue queue4(){
return new Queue(Direct_QUEUE_NAME4,true,false,false,null);
}
@Bean
Binding binding3(){
return BindingBuilder.bind(queue3()).to(fanoutExchange());
}
@Bean
Binding binding4(){
return BindingBuilder.bind(queue4()).to(fanoutExchange());
}
三、Topic Exchange
在路由模式的基础上,支持了对key的通配符匹配(星号以及井号),以满足更加复杂的消息分发场景。。“#” : 匹配一个或者多个
“*”:匹配一个。abc.#会匹配所有abc开头的key
@Bean
TopicExchange topicExchange(){
return new TopicExchange("topic_change");
}
@Bean
Queue queue7(){
return new Queue(Direct_QUEUE_NAME7,true,false,false,null);
}
@Bean
Queue queue8(){
return new Queue(Direct_QUEUE_NAME8,true,false,false,null);
}
@Bean
Binding binding7(){
return BindingBuilder.bind(queue7()).to(topicExchange()).with("abc.#");
}
@Bean
Binding binding8(){
return BindingBuilder.bind(queue8()).to(topicExchange()).with("#.d");
}
四、Headers Exchanges
不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。
匹配规则x-match有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息
@Bean
HeadersExchange headersExchange(){
return new HeadersExchange("head_change");
}
@Bean
Queue queue5(){
return new Queue(Direct_QUEUE_NAME5,true,false,false,null);
}
@Bean
Queue queue6(){
return new Queue(Direct_QUEUE_NAME6,true,false,false,null);
}
@Bean
Binding binding5(){
return BindingBuilder.bind(queue5()).to(headersExchange()).where("age").matches("12");
}
@Bean
Binding binding6(){
return BindingBuilder.bind(queue6()).to(headersExchange()).where("name").matches("luomo");
单元测试
@SpringBootTest
class DemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertSendAndReceive("direct_change","a","hello queue1");
rabbitTemplate.convertSendAndReceive("direct_change","b","hello queue2");
}
@Test
void fanoutExchange() {
rabbitTemplate.convertSendAndReceive("fanout_change","","hello queue");
}
@Test
void headerExchange() {
Message build = MessageBuilder.withBody("hello luomo".getBytes()).setHeader("name", "luomo").build();
rabbitTemplate.convertSendAndReceive("head_change","",build);
Message message = MessageBuilder.withBody("hello ddd".getBytes()).setHeader("age", "12").build();
rabbitTemplate.convertSendAndReceive("head_change","",message);
}
@Test
void topicExchange() {
rabbitTemplate.convertSendAndReceive("topic_change","abc.d","test");
}
}