在订阅模式的基础上制定一些特定发送规则
创建路由模式的生产者:
注意这些变化,跟之前的订阅模式并不一样
package cn.dzz.routineQueueInProducer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class RoutineInProducer { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.121"); connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672 connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 / connectionFactory.setUsername("test"); // guest connectionFactory.setPassword("123456"); // guest Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); /** * 多了一个创建交换机的过程 * public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException { * return this.exchangeDeclare(exchange, type.getType(), durable, autoDelete, internal, arguments); * } * String exchange 交换机名称 * String type 交换机类型,这里换成枚举类型,方便查找 com.rabbitmq.client.BuiltinExchangeType * DIRECT("direct"), 定向 简单模式 和 工作模式 * FANOUT("fanout"), 扇形 广播(通知给所有和这个交换机绑定的队列) * TOPIC("topic"), 通配符 ? * HEADERS("headers"); 参数匹配, 视频暂不讲解 * boolean durable 持久化 * boolean autoDelete 自动删除 * boolean internal 内部使用 一般false * Map<String, Object> arguments */ String exchangeName = "test_routine"; channel.exchangeDeclare( exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null ); String queueName1 = "routine - exchange - 1"; String queueName2 = "routine - exchange - 2"; channel.queueDeclare(queueName1, true, false, false, null); channel.queueDeclare(queueName2, true, false, false, null); /** * 将交换机和队列绑定 * public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException { * return this.queueBind(queue, exchange, routingKey, (Map)null); * } * String queue 队列名称 * String exchange 交换机名称 * String routingKey 路由键,绑定规则 * 如果是fanout模式, 设置""即可,默认就是给所有队列绑定 * */ channel.queueBind(queueName1, exchangeName, "error"); channel.queueBind(queueName2, exchangeName, "info"); channel.queueBind(queueName2, exchangeName, "warning"); channel.queueBind(queueName2, exchangeName, "error"); // 发送消息 这里制定一些不同的消息 以做出区分 String level = "info"; for (int i = 0; i < 10; i++) { if (i == 4) level = "warning"; else if (i == 7) level = "error"; String body = "sending routine msg " + i + level; channel.basicPublish(exchangeName, level, null, body.getBytes(StandardCharsets.UTF_8)); } // 释放资源 channel.close(); connection.close(); } }
发送到队列之后可以查看队列面板
可以看到队列1 只有3条消息,队列2有10条消息
我们再创建对应的消费者(1 和 2)打印查看
消费和之前的模式一致,只是需要找到对应的队列名称来进行消费
package cn.dzz.routineQueue; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; public class RoutineQueueInConsumer2 { /** * 工作队列 消费者 * @param args */ public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.121"); connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672 connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 / connectionFactory.setUsername("test"); // guest connectionFactory.setPassword("123456"); // guest Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 声明将不在需要 // channel.queueDeclare("work_queue", true, false, false, null); // 从生产者复制过来需要的队列名称 String queueName1 = "routine - exchange - 1"; String queueName2 = "routine - exchange - 2"; Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body(message) " + new String(body, StandardCharsets.UTF_8)); System.out.println("- - - - - over - - - - -"); } }; channel.basicConsume(queueName2, true, consumer); } }
消费者1打印
"C:\Program Files (x86)\Java\jdk1.8.0_291\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2.1\lib\idea_rt.jar=60928:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\charsets.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\deploy.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\access-bridge-32.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\cldrdata.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\dnsns.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\jaccess.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\jfxrt.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\localedata.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\nashorn.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\sunec.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\sunjce_provider.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\sunmscapi.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\sunpkcs11.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\zipfs.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\javaws.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\jce.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\jfr.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\jfxswt.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\jsse.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\management-agent.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\plugin.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\resources.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\rt.jar;C:\Users\Administrator\IdeaProjects\RabbitMQ\ConsumerService\target\classes;C:\Users\Administrator\.m2\repository\com\rabbitmq\amqp-client\5.6.0\amqp-client-5.6.0.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar" cn.dzz.routineQueue.RoutineQueueInConsumer1 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. body(message) sending routine msg 7error - - - - - over - - - - - body(message) sending routine msg 8error - - - - - over - - - - - body(message) sending routine msg 9error - - - - - over - - - - -
消费者2打印
"C:\Program Files (x86)\Java\jdk1.8.0_291\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2.1\lib\idea_rt.jar=60918:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\charsets.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\deploy.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\access-bridge-32.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\cldrdata.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\dnsns.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\jaccess.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\jfxrt.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\localedata.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\nashorn.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\sunec.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\sunjce_provider.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\sunmscapi.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\sunpkcs11.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\ext\zipfs.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\javaws.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\jce.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\jfr.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\jfxswt.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\jsse.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\management-agent.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\plugin.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\resources.jar;C:\Program Files (x86)\Java\jdk1.8.0_291\jre\lib\rt.jar;C:\Users\Administrator\IdeaProjects\RabbitMQ\ConsumerService\target\classes;C:\Users\Administrator\.m2\repository\com\rabbitmq\amqp-client\5.6.0\amqp-client-5.6.0.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar" cn.dzz.routineQueue.RoutineQueueInConsumer2 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. body(message) sending routine msg 0info - - - - - over - - - - - body(message) sending routine msg 1info - - - - - over - - - - - body(message) sending routine msg 2info - - - - - over - - - - - body(message) sending routine msg 3info - - - - - over - - - - - body(message) sending routine msg 4warning - - - - - over - - - - - body(message) sending routine msg 5warning - - - - - over - - - - - body(message) sending routine msg 6warning - - - - - over - - - - - body(message) sending routine msg 7error - - - - - over - - - - - body(message) sending routine msg 8error - - - - - over - - - - - body(message) sending routine msg 9error - - - - - over - - - - -