苦心人,天不负,卧薪尝胆,三千越甲可吞吴
Success
自述
RabbitMQ的第五个场景------主题(topic),基于路由之上,该文章可以相对简洁,因为它是基于路由之上,实际上就是将交换机的类型进行改变。
Exchange类型描述
扇出(fanout):发布/订阅场景中使用,将消息无条件推送给已绑定的队列。
直接(direct):路由场景中使用,将消息推送给满足条件已绑定的队列,用于精准匹配。
主题(topic):路由场景中使用,将消息推送给满足条件已绑定的队列,用于模糊匹配。
注意:交换机类型在某个场景下都可以任意使用,不存在固定场景固定类型。
主题场景模型解析
在发布/订阅场景下的时候,交换机设置的类型是扇出(fanout),但并没有一点灵活性,只是一味的无脑传递消息,在路由场景下的时候,交换机设置的类型是直接(direct),虽然做到了对消息的限制,但是还是存在局限性-----不能基于多个标准进行路由,所以在路由的基础之上,我们再次改变交换机的类型,实现主题场景。
主题场景
*:星号代表一个词或一个匹配项。
#:井号代表n个匹配项。
①当为“#”符号的时候,就类似于交换机的扇出(fanout)类型。
②当为“*”符号的时候,就类似于交换机的直接(direct)类型。
③当一个路由键中满足多个类型的时候,是无法与多个类型匹配的,只会匹配第一个类型。
④路由键可以为任意字,最大长度为255个字节,当路由键不在跟符号“*”或“#”符号的话,那么它的匹配方式就会和路由完全一致。
ExSend.java
/**
* //第五个场景:主题
* 生产者
*/
public class ExSend {
//exchangename
private static final String EX_CHANGENAME = "ex_name1";
public static void main(String[] args) {
try {
//1.connection
Connection connection = ConnectionUtil.getConnection();
//2.channel
Channel channel = connection.createChannel();
//3.exchange
//paramone:exchangename
//paramtwo:exchangetype ---->topic主题
channel.exchangeDeclare(EX_CHANGENAME, "topic");
//sendtype
String[] sendtype = new String[]{"frog.consumer1", "Elephant.consumer1", "frog", "lion", "lion.consumer1", "lion.frog.Elephant.frog"};
for (int i = 0; i < sendtype.length; i++) {
channel.basicPublish(EX_CHANGENAME, sendtype[i], null, (sendtype[i] + "=====msg send suc").getBytes());
}
System.out.println("消息发送成功");
} catch (Exception e) {
e.printStackTrace();
}
}
}
ExRecv.java
/**
* topic
* 青蛙系列消费者
*/
public class ExRecv {
//exchangename
private static final String EX_CHANGENAME = "ex_name1";
public static void main(String[] args) {
try {
//1.connection
Connection connection = ConnectionUtil.getConnection();
//2.channel
Channel channel = connection.createChannel();
//3.exchange
//paramone:exchangename
//paramtwo:exchangetype ---->topic主题
channel.exchangeDeclare(EX_CHANGENAME, "topic");
//4.queue
String queue = channel.queueDeclare().getQueue();
//5.binding
channel.queueBind(queue, EX_CHANGENAME, "frog.#");
//6.get msg
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(new String(delivery.getBody()));
}
};
//7.consumer
channel.basicConsume(queue, true, deliverCallback, cancel -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
ExRecv2.java
/**
* topic
* 大象系列消费者
*/
public class ExRecv2 {
//exchangename
private static final String EX_CHANGENAME = "ex_name1";
public static void main(String[] args) {
try {
//1.connection
Connection connection = ConnectionUtil.getConnection();
//2.channel
Channel channel = connection.createChannel();
//3.exchange
//paramone:exchangename
//paramtwo:exchangetype ---->topic主题
channel.exchangeDeclare(EX_CHANGENAME, "topic");
//4.queue
String queue = channel.queueDeclare().getQueue();
//5.binding
channel.queueBind(queue, EX_CHANGENAME, "Elephant.#");
//6.get msg
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(new String(delivery.getBody()));
}
};
//7.consumer
channel.basicConsume(queue, true, deliverCallback, cancel -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
ExRecv3.java
/**
* topic
* 狮子系列消费者
*/
public class ExRecv3 {
//exchangename
private static final String EX_CHANGENAME = "ex_name1";
public static void main(String[] args) {
try {
//1.connection
Connection connection = ConnectionUtil.getConnection();
//2.channel
Channel channel = connection.createChannel();
//3.exchange
//paramone:exchangename
//paramtwo:exchangetype ---->topic主题
channel.exchangeDeclare(EX_CHANGENAME, "topic");
//4.queue
String queue = channel.queueDeclare().getQueue();
//5.binding
channel.queueBind(queue, EX_CHANGENAME, "lion.#");
//6.get msg
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(new String(delivery.getBody()));
}
};
//7.consumer
channel.basicConsume(queue, true, deliverCallback, cancel -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
Ending
加油,每天进步一点点!