关键字
-
Exchange :交换机
-
routing key:路由键
四种Exchange 模式
-
direct :需要生产者和消费者绑定相同的Exchange和routing key。
-
fanout:广播模式需要生产者消费者绑定相同的Exchange。
-
topic:支持模糊匹配的广播模式以点分隔,
*
表示一个单词,#
表示任意数量(零个或多个)单词。 - header:根据生产者和消费者的header中信息进行匹配性能较差 ,x-match [all 匹配所有/any 任意一个]。
1:首先定义一个 ConnectionUtil类
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
//端口
factory.setHost("192.168.26.134");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
2:定义一个公共的Produce 类和Consumer 类
import com.mq.rabbit.Test.ConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* Created by Administrator on 2019/7/7.
*/
public class Produce {
/**
*
* @param exchanageType 交换器类型
* @param exchanageName 交换器名称
* @param direct 路由键
* @param properties AMQP.BasicProperties
* @param message 要发送的消息
* @throws Exception
*/
public static void senInfo(String exchanageType, String exchanageName, String direct, AMQP.BasicProperties properties, String message) throws Exception {
System.out.println("生产者:" + message);
Connection connection = ConnectionUtil.getConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个交换器
channel.exchangeDeclare(exchanageName, exchanageType);
// 发送消息
channel.basicPublish(exchanageName, direct, properties, message.getBytes());
// 关闭频道和连接
channel.close();
connection.close();
}
}
import com.mq.rabbit.Test.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
/**
* Created by Administrator on 2019/6/30.
*/
public class Consumer {
/**
*
* @param exchanageType 交换器类型
* @param exchanageName 交换器名称
* @param direct 路由键
* @param headers 头信息
* @throws Exception
*/
public static void readInfo(String exchanageType,String exchanageName,String direct,Map<String,Object> headers ) throws Exception{
// 创建连接
Connection connection = ConnectionUtil.getConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个交换器
channel.exchangeDeclare(exchanageName, exchanageType);
// 创建一个非持久的、唯一的、自动删除的队列
String queueName = channel.queueDeclare().getQueue();
// queue:对列名称 exchange :交换器名 routingKey :路由键名
channel.queueBind(queueName, exchanageName, direct,headers);
// 创建队列消费者
final com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者收到消息: " + message + "");
}
};
// 自动响应ack
channel.basicConsume(queueName, true, consumer);
}
}
3:direct fanout topic header 实例
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import org.junit.jupiter.api.Test;
import java.util.Hashtable;
import java.util.Map;
/**
* Created by Administrator on 2019/7/7.
*/
public class TestExchange {
@Test
public void direct() throws Exception {
String exchanageType = BuiltinExchangeType.DIRECT.getType(); // direct 类型
String exchanageName = "direct_logs1"; // 交换机名称
String direct = "direct_01"; // 路由键
String message = "direct 类型消息"; // 消息
Map<String, Object> headers = null; // 头信息
AMQP.BasicProperties properties = null; // AMQP.BasicProperties
Consumer.readInfo(exchanageType, exchanageName, direct, headers);
for (int i = 0; i < 5; i++) {
Produce.senInfo(exchanageType, exchanageName, direct, properties, message);
}
}
@Test
public void fanout() throws Exception {
String exchanageType = BuiltinExchangeType.FANOUT.getType(); // direct 类型
String exchanageName = "fanout_logs1"; // 交换机名称
String direct = "fanout_01"; // 路由键
String message = "fanout 类型消息"; // 消息
Map<String, Object> headers = null; // 头信息
AMQP.BasicProperties properties = null; // AMQP.BasicProperties
// fanout 广播类型 我们启动多个消费者
Consumer.readInfo(exchanageType, exchanageName, direct, headers);
Consumer.readInfo(exchanageType, exchanageName, direct, headers);
Produce.senInfo(exchanageType, exchanageName, direct, properties, message);
}
@Test
public void topic() throws Exception {
String exchanageType = BuiltinExchangeType.TOPIC.getType(); // direct 类型
String exchanageName = "topic_logs1"; // 交换机名称
Map<String, Object> headers = null; // 头信息
AMQP.BasicProperties properties = null; // AMQP.BasicProperties
// topic 模式 * 匹配一个单词 # 匹配所有(必须是.分割 比如 com.*.test com.#)
Consumer.readInfo(exchanageType, exchanageName, "*.my", headers);
Consumer.readInfo(exchanageType, exchanageName, "#.com", headers);
Consumer.readInfo(exchanageType, exchanageName, "#", headers);
Produce.senInfo(exchanageType, exchanageName, "a.my", properties, "消息a.my");
Produce.senInfo(exchanageType, exchanageName, "www.baidu.com", properties, "消息www.baidu.com");
Produce.senInfo(exchanageType, exchanageName, "taobao.com", properties, "消息www.baidu.com");
}
@Test
public void header() throws Exception {
String exchanageType = BuiltinExchangeType.DIRECT.getType(); // direct 类型
String exchanageName = "header_logs1"; // 交换机名称
String direct = "header_01"; // 路由键
String message = "header 类型消息"; // 消息
// 设置消费者
Map<String, Object> constomerHeaders = new Hashtable<>();
; // 头信息
constomerHeaders.put("x-match", "any");//all:匹配所有 any:匹配一个
constomerHeaders.put("type", "json");
constomerHeaders.put("name", "baidu");
Consumer.readInfo(exchanageType, exchanageName, direct, constomerHeaders);
// 设置生产者
Map<String, Object> productHeaders = new Hashtable<>();
productHeaders.put("type", "json");
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.headers(productHeaders)
.build(); // AMQP.BasicProperties
Produce.senInfo(exchanageType, exchanageName, direct, props, message);
}
}