RabbitMQ简单应用の主题模式(topic)

Topic exchange(主题转发器)

发送给主题转发器的消息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符。这些标识符可以是随意,但是通常跟消息的某些特性相关联。一些合法的路由选择键比如“socket.usd.nyse”,"nyse.vmw","quick.orange.rabbit",你愿意用多少单词都可以,只要不超过上限的255个字节。

绑定键也必须以相同的格式。主题转发器的逻辑类似于direct类型的转发器。消息通过一个特定的路由键发送到所有与绑定键匹配的队列中。需要注意的是,关于绑定键有两种特殊的情况:*(星号)可以代替一个任意标识符 ;#(井号)可以代替零个或多个标识符。如下图:

RabbitMQ简单应用の主题模式(topic)

MQ工厂连接类Connection

 package com.mmr.rabbitmq.util;

 import java.io.IOException;

 import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtils {
/**
* @desc 获取Mq 的链接
* @author zp
* @throws IOException
* @date 2018-7-19
*/
public static Connection getConnection() throws IOException {
// 1.定义一个链接工厂
ConnectionFactory factroy = new ConnectionFactory(); // 2.设置服务地址
factroy.setHost("127.0.0.1"); // 3.设置端口号
factroy.setPort(5672); // 4.vhost 设置数据库
factroy.setVirtualHost("vhtest"); // 5.设置用户名
factroy.setUsername("jerry"); // 6. 设置密码
factroy.setPassword("123456"); // 7.返回链接
return factroy.newConnection();
}
}

MQ消息生产者类Send

 package com.mmr.rabbitmq.topic;

 import java.io.IOException;

 import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class Send {
private static final String EXCHANGE_TOPIC_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException {
// 创建连接
Connection connection = ConnectionUtils.getConnection(); // 获取通道
Channel channel = connection.createChannel(); // 设置路由键
channel.exchangeDeclare(EXCHANGE_TOPIC_NAME, "topic"); // 发送消息
String msg = "商品......."; // String routingKey = "goods.delete"; channel.basicPublish(EXCHANGE_TOPIC_NAME, "goods.add", null, msg.getBytes());
System.out.println("send ---"+msg);
channel.close();
connection.close(); }
}

MQ消息消费者类Recv1只处理good.add good.edit  ;Recv2处理good.#

 package com.mmr.rabbitmq.topic;

 import java.io.IOException;

 import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class Recv1 {
private static final String QUEUE_NAME="test_queue_topic1";
private static final String EXCHANGE_TOPIC_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException {
// 创建连接
Connection connection = ConnectionUtils.getConnection(); // 创建通道
final Channel channel = connection.createChannel(); // 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_TOPIC_NAME, "goods.add");
channel.queueBind(QUEUE_NAME, EXCHANGE_TOPIC_NAME, "goods.edit");
// 每次
channel.basicQos(1); // 消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
// TODO Auto-generated method stub
// 接收消息
String msg = new String(body,"utf-8");
System.out.println("【1】Recv1 msg"+msg);
try {
Thread.sleep(2000);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}finally{
System.out.print("【1】Recv1 done");
//回执单
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck,consumer );
}
}
 package com.mmr.rabbitmq.topic;

 import java.io.IOException;

 import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class Recv2 {
private static final String QUEUE_NAME="test_queue_topic2";
private static final String EXCHANGE_TOPIC_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException {
// 创建连接
Connection connection = ConnectionUtils.getConnection(); // 创建通道
final Channel channel = connection.createChannel(); // 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_TOPIC_NAME, "goods.#"); // 每次
channel.basicQos(1); // 消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
// TODO Auto-generated method stub
// 接收消息
String msg = new String(body,"utf-8");
System.out.println("【2】Recv2 msg"+msg);
try {
Thread.sleep(2000);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}finally{
System.out.println("【2】Recv2 done");
//回执单
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck,consumer );
}
}

运行上述代码,可以获得  当消息产生发送过来是good.add/good.edit 那么消息消费者1和消费者2都会接收到消息并且处理,但是当消息生产者发送过来的是good.delete或者good.select等的时候只有消费者2(Recv2)接收到消息处理通知。

上一篇:F#之旅6 - 简单AV推荐系统


下一篇:ACM——简单排序