一、topics通配符模式:
Topic 通配符类型与 Direct路由 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
1.1 topic模式的Routingkey
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
# : 匹配一个或多个词
* : 匹配不多不少恰好1个词
举例:
item.# :能够匹配 item.insert.abc 或者 item.insert
item.* :只能匹配 item.insert
1.2 topic模式示例图
图解:
红色Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
黄色Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
二、topic通配符模式需求
编写生产者、消费者代码并测试了解Topics通配符模式的特点
2.1 步骤:
- 生产者:发送包含有item.insert、item.update,item.delete的3中路由key消息
- 消费者1:监听的队列绑定到交换机的路由key为:item.update,item.delete
- 消费者2:监听的队列绑定到交换机的路由key为:item.*
2.2 代码示例
生产者代码Producer:
package com.study.rabbitmq.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.study.rabbitmq.util.ConnectionUtil;
//topic路由模式 发送消息
public class Producer {
//交换机名称
static final String TOPIC_EXCHANGE = "topic_exchange";
//队列名称
static final String TOPIC_QUEUE_1 = "topic_queue_1";
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws Exception {
// 1. 创建连接
Connection connection = ConnectionUtil.getConnection();
// 2. 创建频道;
Channel channel = connection.createChannel();
// 3.声明交换机,参数1:交换机名称,参数2:交换机类型(FANOUT广播类型、direct 定向路由,topic 通配符)
channel.exchangeDeclare(TOPIC_EXCHANGE , BuiltinExchangeType.TOPIC);
//4. 发送消息;
String message = "新增商品(通配符模式)---routingkey 为 insert";
/**
* 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
* 参数2:路由key,简单模式中可以使用队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(TOPIC_EXCHANGE,"item.insert",null,message.getBytes());
System.out.println("已发送消息:" + message);
message = "更新商品(通配符模式)---routingkey 为 update";
channel.basicPublish(TOPIC_EXCHANGE,"item.update",null,message.getBytes());
System.out.println("已发送消息:" + message);
message = "删除商品(通配符模式)---routingkey 为 delete";
channel.basicPublish(TOPIC_EXCHANGE,"item.delete",null,message.getBytes());
System.out.println("已发送消息:" + message);
//5. 关闭资源
channel.close();
connection.close();
}
}
消费者1Consumer1-代码:
package com.study.rabbitmq.topic;
import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
/**
* topic路由模式;消费者接收消息
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
// 1. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
// 2. 创建频道;
Channel channel = connection.createChannel();
//3 声明交换机
channel.exchangeDeclare(com.study.rabbitmq.topic.Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);
// 4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(com.study.rabbitmq.topic.Producer.TOPIC_QUEUE_1, true, false, false, null);
//5 队列绑定到交换机上
channel.queueBind(com.study.rabbitmq.topic.Producer.TOPIC_QUEUE_1, com.study.rabbitmq.topic.Producer.TOPIC_EXCHANGE,"item.update");
channel.queueBind(com.study.rabbitmq.topic.Producer.TOPIC_QUEUE_1, com.study.rabbitmq.topic.Producer.TOPIC_EXCHANGE,"item.delete");
// 6. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("消费者1---接收到的消息为:" + new String(body, "utf-8"));
}
};
// 6. 监听队列 (需要持续监听队列消息,所以不要关闭资源)
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.TOPIC_QUEUE_1,true,defaultConsumer);
//不关闭资源,应该一直监听消息
// channel.close();
// connection.close();
}
}
消费者2-Consumer2 代码:
package com.study.rabbitmq.topic;
import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
/**
* topic路由模式;消费者接收消息
*/
public class Consumer2 {
public static void main(String[] args) throws Exception {
// 1. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
// 2. 创建频道;
Channel channel = connection.createChannel();
//3 声明交换机
channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);
// 4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
//5 队列绑定到交换机上
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE,"item.*");
// 6. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("消费者2---接收到的消息为:" + new String(body, "utf-8"));
}
};
// 6. 监听队列 (需要持续监听队列消息,所以不要关闭资源)
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.TOPIC_QUEUE_2,true,defaultConsumer);
//不关闭资源,应该一直监听消息
// channel.close();
// connection.close();
}
}
先运行消费者1 、2 再运行生产者,
idea结果: 消费者1队列 接收:路由key为:item.update,item.delete;消费者2队列 接收:路由key为:item.update,item.delete,item.insert
rabbitmq后台管理界面结果
三、topic通配符总结:
Topics通配符模式:可以根据路由key将消息传递到对应路由key的队列;
队列绑定到交换机的路由key可以有多个;
通配符模式中路由key可以使用 *
和 #
;使用了通配符模式之后对于路由Key的配置更加灵活。