前言
上篇介绍了AMQP的基本概念,组成及其与rabbitmq的关系。了解了这些东西后,下面我们开始学习rabbitmq(消息队列)的作用以及用java代码和rabbitmq通讯进行消息发布和接收。因为消息的的接收以及路由都是通过交换机实现的,所以接下来我们要学习如何利用不同的交换机进行消息的发布。最后会再学习如何利用rabbitmq进行rpc的调用。
一、rabbitmq(消息队列)的作用
1.异步处理消息
假设用户在网站注册成功后,需要向用户发送邮件和信息提示其注册成功。正常的做法是,后台将注册信息写入数据库,然后再给用户发邮件发短信。
该流程的问题在于,我们其实只需要将注册信息写入数据库之后就可以告知用户注册成功,并不需要等待发送邮件和发送短信成功后再去告知。这样会延长请求处理的时间。利用消息队列我们可以异步的解决这个问题。
我们可以在将注册信息写入数据库之后,把要发送注册邮件和发送短信的消息写入消息队列,然后就告知用户注册成功。发送邮件和短信将由订阅了消息的应用异步的去执行。这样将节省请求处理的时间。
2.系统解耦
购物网站通常会将订单系统和库存系统分成两个不同的应用。正常情况下用户下单后订单系统会调用库存系统,然后返回给用户信息。
这里有两个问题:1.如果库存系统挂了,那么下单就会失败。
2.订单系统和库存系统耦合度太高
为了解决这个问题就可以引入消息队列
订单系统在处理完业务逻辑后,将订单消息写入消息队列,库存系统订阅订单消息,消息队列就会将订单消息推送给库存系统。库存系统再去处理。这样就解决了上述两个问题。即使库存系统挂了了,消息队列也会将订单消息持久化,保证库存系统正常后,可以正确的处理库存。
3.流量削峰
流量削峰在秒杀活动中应用广泛
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前段加入消息队列。此时起到两个作用:
a.可以控制活动人数,超过一定阈值的订单直接丢弃
b.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
用户发来请求,服务器收到之后,先写入消息队列,加入消息超过队列的最大长度,则直接丢掉用户请求,或跳转到错误页面。秒杀业务再根据队列中的消息,做后续处理。
二、准备
在写代码之前,还需要我们先安装rabbitmq,因为rabbitmq是用erlang开发的,所以还需要我们下载erlang。具体下载的教程,这里不再讲解,网站有很多教程,可以学习一下。这里说一下比较关键的几个点。
1.rabbitmq有个管理后台,访问地址为localhost:15672,默认的用户名:guest,默认的密码:guest
2.client端通信口5672
3.管理口15672
4.server间内部通信口25672
5.erlang发现口:4369
三、直连交换机代码实现
1.生产者
/**
* 消息发送者
*/
public class LogProducer {
//交换机名字
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"info" ,"warning", "error"}; public static void main(String[] args) {
//创建连接工厂并设置连接信息,这些连接信息都是默认的,所以这里注释掉了,打开也是可以的。
ConnectionFactory connectionFactory = new ConnectionFactory();
// connectionFactory.setHost("localhost");
// connectionFactory.setUsername("guest");
// connectionFactory.setPassword("guest");
// connectionFactory.setPort(5672);
Connection connection = null;
Channel channel = null;
try {
//获取连接
connection = connectionFactory.newConnection();
//连接中打开通道
channel = connection.createChannel();
//声明交换机
//参数1:交换机名字,参数2:交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); for (String routingKey : routingKeys){
//将消息发送给交换机,我们这里发送的消息就是routingKey
//参数1:交换机名字,参数2:消息路由键,参数3:消息属性,参数4:消息体
channel.basicPublish(EXCHANGE_NAME, routingKey,null, routingKey.getBytes());
System.out.println("RoutingSendDirect -> routingkey: " + routingKey + ", send message " + routingKey);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} }
}
}
2.消费者
消费者1
/**
* 消息消费者1
*/
public class Consumer1 {
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"info", "warning"}; public static void main(String[] args) {
//创建连接工厂并设置连接信息,这些连接信息都是默认的,所以这里注释掉了,打开也是可以的。
ConnectionFactory connectionFactory = new ConnectionFactory();
// connectionFactory.setHost("localhost");
// connectionFactory.setUsername("guest");
// connectionFactory.setPassword("guest");
// connectionFactory.setPort(5672);
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
//声明队列,这里队列的名字由代理自动生成
String queueName = channel.queueDeclare().getQueue();
//声明交换机
//参数1:交换机名字,参数2:交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); for (String routingKey : routingKeys) {
//将交换机和队列用routing key绑定起来
//参数1:队列名,参数2:交换机名,参数3:队列和交换机之间绑定的路由键
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("ReceveLogDirect1 -> queue: " + queueName + ", exchange_name: " + EXCHANGE_NAME + ", routingKey: " + routingKey);
} //声明消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定从哪个消费者从哪个通道获取消息,并指明自动确认的机制
//参数1:队列名,参数2:确认机制,true表示自动确认,false代表手动确认,参数3:消费者
channel.basicConsume(queueName, true, consumer);
System.out.println("ReceveLogDirect1 waitting for message"); while (true){
//获取消息,这一步会一直阻塞,直到收到消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//获取消息
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("ReceveLogDirect1 receive message " + message);
} } catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者2
/**
* 消息消费者2
*/
public class Consumer2 {
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"error"}; public static void main(String[] args) {
//创建连接工厂并设置连接信息,这些连接信息都是默认的,所以这里注释掉了,打开也是可以的。
ConnectionFactory connectionFactory = new ConnectionFactory();
// connectionFactory.setHost("localhost");
// connectionFactory.setUsername("guest");
// connectionFactory.setPassword("guest");
// connectionFactory.setPort(5672);
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
//声明队列,这里队列的名字由代理自动生成
String queueName = channel.queueDeclare().getQueue();
//声明交换机
//参数1:交换机名字,参数2:交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); for (String routingKey : routingKeys) {
//将交换机和队列用routing key绑定起来
//参数1:队列名,参数2:交换机名,参数3:队列和交换机之间绑定的路由键
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("ReceveLogDirect2 -> queue: " + queueName + ", exchange_name: " + EXCHANGE_NAME + ", routingKey: " + routingKey);
} //声明消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定从哪个消费者从哪个通道获取消息,并指明自动确认的机制
//参数1:队列名,参数2:确认机制,true表示自动确认,false代表手动确认,参数3:消费者
channel.basicConsume(queueName, true, consumer);
System.out.println("ReceveLogDirect2 waitting for message"); while (true){
//获取消息,这一步会一直阻塞,直到收到消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//获取消息
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("ReceveLogDirect2 receive message " + message);
} } catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
先运行Consumer1,Consumer2。再运行LogProducer。运行结果如下:
LogProducer:
Consumer1:
Consumer2:
从运行结果中可以看出,消息是根据路由键被交换机路由到对应的队列上的。
代码gitbu地址:https://github.com/wutianqi/rabbitmq-learn.git
参考资料:https://www.cnblogs.com/LipeiNet/p/5978276.html