总结
- 消息消费者只需要明确从哪个消息队列获取消息
- exchange创建后 不能再创建相同名字+不同模式的
- 多个模式可结合使用 可以直接发到指定队列,也可以发到交换机由对应策略转发到对应队列
RabbitMQ Java原生api使用
1.HelloWorld 简单模式
添加Virtual Hosts
简单模式消息生产者
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建链接工厂对象
ConnectionFactory factory = new ConnectionFactory();
// 2.设置RabbitMQ服务主机地址,默认localhost
factory.setHost("localhost");
// 3.设置RabbitMQ服务端口,默认5672
factory.setPort(5672);
// 4.设置虚拟主机名字,默认/
factory.setVirtualHost("/demo1");
// 5.设置用户连接名,默认guest
// factory.setUsername("guest");
// 6.设置链接密码,默认guest
// factory.setPassword("guest");
// 7.创建一个新链接
Connection connection = factory.newConnection();
// 8.创建消息通道
Channel channel = connection.createChannel();
// 9.创建队列
channel.queueDeclare("simple_queue",true,false,false,null);
// 10.创建消息
String msg="simple queue demo";
// 11.消息发送
channel.basicPublish("","simple_queue",null,msg.getBytes());
// 12.关闭资源
channel.close();
connection.close();
}
}
简单模式消息消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建链接工厂对象
ConnectionFactory factory = new ConnectionFactory();
// 2.设置RabbitMQ服务主机地址,默认localhost
factory.setHost("localhost");
// 3.设置RabbitMQ服务端口,默认5672
factory.setPort(5672);
// 4.设置虚拟主机名字,默认/
factory.setVirtualHost("/demo1");
// 5.设置用户连接名,默认guest
// 6.设置链接密码,默认guest
// 7.创建一个新链接
Connection connection = factory.newConnection();
// 8.创建消息通道
Channel channel = connection.createChannel();
// 9.创建队列
channel.queueDeclare("simple_queue",true,false,false,null);
// 10.创建消费者,并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 路由
String routingKey = envelope.getRoutingKey();
// 交换机
String exchange = envelope.getExchange();
// 消息id
long deliveryTag = envelope.getDeliveryTag();
// 消息体
String message = new String(body, "UTF-8");
System.out.println("路由:" + routingKey + ",交换机:" + exchange + ",消息id:" + deliveryTag + ",消息体:" + message);
super.handleDelivery(consumerTag, envelope, properties, body);
}
};
// 11.消息监听
channel.basicConsume("simple_queue", true, consumer);
// 12.关闭资源(不建议关闭,建议一直监听消息)
}
}
2.Work queues 工作队列模式
// 与上面一样,不过就是创建多个消息消费者去监听同一个消息队列,消息分配为轮询式
3.Publish/subscribe 发布订阅模式
此模式下呢多出一个概念exchange(交换机)可以在交换机上绑定多个消息队列,而现在消息产生者将消息发送到交换机,由交换机调度给他下面的消息队列,此模式下交换机将会将消息发给所有与他绑定的队列
工作队列模式消息生产者
// 9.创建队列
channel.queueDeclare("simple_queue1",true,false,false,null);
channel.queueDeclare("simple_queue2",true,false,false,null);
// 创建交换机:arg0,交换机名称 arg1,交换机类型(广播)
channel.exchangeDeclare("demo3Exchange", BuiltinExchangeType.FANOUT);
// 将队列绑定到交换机
channel.queueBind("simple_queue1","demo3Exchange","");
channel.queueBind("simple_queue2","demo3Exchange","");
// 10.创建消息 ....
工作队列模式消息消费者
class Consumer1
...
// 11.监听 消息队列simple_queue1
channel.basicConsume("simple_queue1", true, consumer);
...
class Consumer2
...
// 11.监听 消息队列simple_queue2
channel.basicConsume("simple_queue2", true, consumer);
...
4.Routing 路由模式
此模式下多出概念路由,基于上一模式,上一模式交换机会将接收到的消息发给所有绑定了的队列,此模式下接收到的消息会多一个参数Routing,会将消息转发到对应Routing的队列
路由模式消息生产者
// 9.创建队列
channel.queueDeclare("simple_queue1",true,false,false,null);
channel.queueDeclare("simple_queue2",true,false,false,null);
// 创建交换机:arg0,交换机名称 arg1,交换机类型(路由对应)
channel.exchangeDeclare("demo4Exchange", BuiltinExchangeType.DIRECT);
// 将队列绑定到交换机
// simple_queue1只会接收到routingKey 为error的消息
channel.queueBind("simple_queue1","demo4Exchange","error");
// simple_queue2会接收到routingKey 为info,warning,error的消息
channel.queueBind("simple_queue2","demo4Exchange","info");
channel.queueBind("simple_queue2","demo4Exchange","warning");
channel.queueBind("simple_queue2","demo4Exchange","error");
// 10.创建消息
// 11.消息发送
for (int i = 0; i < 100; i++) {
// 创建消息
String message = "routing_key:" + i;
String routingKey = "";
if (i%2 == 0){ // routing_key_queue1、routing_key_queue2 0、2、4、6、8
routingKey = "error";
}else if (i%5 == 0){ // routing_key_queue2:5
routingKey = "info";
}else { // 0、1、5
routingKey = "warning";
}
message += "--->" + routingKey;
// 消息发送
channel.basicPublish("demo4Exchange", routingKey, null, message.getBytes());
}
路由模式消息消费者
// 同上 消息消费者只需要监听对应的消息队列即可
5.Topic 通配符模式
此模式下在上面的基础上将routingKey改为可以使用通配符的模式
通配符规则:
多个单词之间以”.”分割
‘#’:匹配一个或多个词
:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.:只能匹配item.insert
通配符模式消息生产者
// 9.创建队列
channel.queueDeclare("simple_queue1",true,false,false,null);
channel.queueDeclare("simple_queue2",true,false,false,null);
// 创建交换机:arg0,交换机名称 arg1,交换机类型(主题)
channel.exchangeDeclare("demo5Exchange", BuiltinExchangeType.TOPIC);
// 将队列绑定到交换机
channel.queueBind("simple_queue1","demo5Exchange","#");
channel.queueBind("simple_queue2","demo5Exchange","www.#");
channel.queueBind("simple_queue2","demo5Exchange","*.com");
// 10.创建消息
// 11.消息发送
for (int i = 0; i < 100; i++) {
// 创建消息
String message = "routing_key:" + i;
String routingKey = "";
if (i%2 == 0){ // routing_key_queue1、routing_key_queue2 0、2、4、6、8
routingKey = "www.baidu.com";
}else if (i%5 == 0){ // routing_key_queue2:5
routingKey = "jd.com";
}else { // 0、1、5
routingKey = "dqdwwfwevweevwe21e13r23dr2gerfdqw.dqefw122e.23f.2f.23.f.2.f2.f.24.ff2wf2qef.2";
}
message += "--->" + routingKey;
// 消息发送
channel.basicPublish("demo5Exchange", routingKey, null, message.getBytes());
}
通配符模式消息消费者
// 同上 消息消费者只需要监听对应的消息队列即可