RabbitMQ工作原理
四大核心概念
- 生产者:产生数据发送消息的程序是生产者
- 交换机:交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
- 队列:队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
- 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
Hello World
在本节中将学习使用RabbitMQ创建一个最简单的例子
引入依赖
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
编写生产者代码
public class Producer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.130");
factory.setUsername("admin");
factory.setPassword("admin");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/**
* 生成一个队列
* 1.队列名称
* 2.队列中的消息是否支持持久化,false:消息存在内存中,true:消息存在硬盘中
* 3.该队列是否只供一个消费者消费,false:只提供一个消费者消费
* 4.最后一个消费者断开连接之后该队列是否自动删除,true为自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发消息
String message = "hello world";
/**
* 发送一个消息
* 1.发送消息到哪个交换机,简单模式所以不需要交换机,填空字符串即可
* 2.路由的key是哪个
* 3.其他参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}
当消息发送完毕后,我们可以查看客户端,发现队列中多了一个名为hello的队列,接下来我们继续编写消费者代码
编写消费者代码
public class Consumer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.130");
factory.setUsername("admin");
factory.setPassword("admin");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag,message) -> {
System.out.println(new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费消息被中断");
};
/**
* 消费者消费消息
* 1.队列名称
* 2.消费成功后是否自动应答,true为自动应答,false不自动
* 3.消费者成功消费的回调
* 4.消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true, deliverCallback,cancelCallback);
}
}
当我们执行消费者代码时,会输出hello world,并且客户端中的消息以及被消费掉了
Work Queues
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。我们通过一张图,来理解一下。
轮流分发消息
在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程 是如何工作的。
编写工具类
从helloworld案例中我们发现了创建工厂、获取信道一系列的工作都是重复的,所以将他整合为一个工具类,方便日后的使用
public class RabbitMqUtils {
public static Channel getChannel(){
Channel channel = null;
try {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.130");
factory.setUsername("admin");
factory.setPassword("admin");
//创建连接
Connection connection = factory.newConnection();
//获取信道
channel = connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return channel;
}
}
编写工作线程
public class Work01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("接收到的消息为:" + new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag) ->{
System.out.println(consumerTag + ":接收消息中断");
};
//接收消息
System.out.println("工作线程B正在等待消息...");
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
idea是支持同一个main方法多次运行的,需要进行一下设置,可以参考这位博主的博客 点击跳转
完成之后我们启动两次,一次为工作线程A,另一次为工作线程B
编写消费者
因为我们需要发送多次消息,可以通过控制台的输入来发送消息
public class Task01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
//生成队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("请求发送成功:" + message);
}
}
}
测试
我们输入AA,BB,CC,DD来看一下工作线程获取的结果,如果是依次获取,就证明了我们一开始的说法
消息应答
作用
举例说明:还是有两个工作线程来处理消息,比如工作线程A处理消息的时间较长,而在处理消息的过程中宕机了,如果不做任何处理,就会出现消息丢失的情况。所以我们需要一个消息应答,告诉RabbitMQ消息已经处理完毕之后,MQ才可以将消息删除,否则回到队列中。
自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用。
消息应答的方法
-
channel.basicAsk()
用于肯定应答,RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
-
channel.basicNack()
用于否定应答
-
Channel.basicReject()
用于否定应答,与channel.basicNack()相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了
编写生产者代码
public class Task02 {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
//生成队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("请求发送成功:" + message);
}
}
}
编写两个工作线程代码
这里我们让一个工作线程完成速度为1秒,另外一个为30秒,模拟工作任务时长的效果
public class Work02 {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("工作线程A正在等待消息,处理较快...");
DeliverCallback deliverCallback = (consumerTag,message) ->{
SleepUtils.sleep(1);
System.out.println("接收到的消息为:" + new String(message.getBody()));
/**
* 手动应答
* 1.消息标记 tag消息标记 tag
* 2.是否批量应答消息,建议不批量
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag) ->{
System.out.println(consumerTag + ":接收消息中断");
};
//接收消息
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
public class Work03 {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("工作线程B正在等待消息,处理较慢...");
DeliverCallback deliverCallback = (consumerTag,message) ->{
SleepUtils.sleep(30);
System.out.println("接收到的消息为:" + new String(message.getBody()));
/**
* 手动应答
* 1.消息标记 tag消息标记 tag
* 2.是否批量应答消息,建议不批量
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag) ->{
System.out.println(consumerTag + ":接收消息中断");
};
//接收消息
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
测试
启动三个方法,正常的我们输入aa,bb,cc分别交由线程A和线程B处理,当我们再次输入dd时,是交给线程B处理的,这时候手动关闭进程,模拟宕机状况,我们发现线程A接替他处理了dd消息,这样就做到了消息不丢失的效果。
RabbitMQ持久化操作
队列持久化
之前我们创建的队列都是非持久化的,rabbitmq如果重启的话,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable参数设置为持久化
public class Task02 {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
//生成队列
boolean durable = true; //设置队列持久化
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
//发送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("请求发送成功:" + message);
}
}
}
更改完成重启之后我们发现,该队列的状态被D标志,即持久化
消息持久化
要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添 加这个属性。
public class Task02 {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
//生成队列
boolean durable = true; //设置队列持久化
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
//发送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
// MessageProperties.PERSISTENT_TEXT_PLAIN 设置消息持久化
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println("请求发送成功:" + message);
}
}
}
不公平分发
在之前的课程中我们说过了,MQ在分发消息时是绝对公平的,从上述例子中我们也可以发现,无论B执行的多慢,所有消息都是A一条,B一条,这个其实照我来说确实不公平的。应该是能者多劳的形式,A做完了应该可以继续做。我们需要修改工作线程代码,如下
public class Work03 {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("工作线程B正在等待消息,处理较慢...");
DeliverCallback deliverCallback = (consumerTag,message) ->{
SleepUtils.sleep(30);
System.out.println("接收到的消息为:" + new String(message.getBody()));
/**
* 手动应答
* 1.消息标记 tag消息标记 tag
* 2.是否批量应答消息,建议不批量
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag) ->{
System.out.println(consumerTag + ":接收消息中断");
};
//接收消息
boolean autoAck = false;
//设置不公平分发,1为不公平分发
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
通过测试可以发现,因为B要工作要沉睡30秒,A完成之后可以获取接下来的工作,而不是继续分发给B
预取值
预取值的概念相当于我作为领导,我已经分配好了,一共7个任务,我给B分配5个任务,给A分配2个任务,通过代码如下设置
public class Work02 {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("工作线程A正在等待消息,处理较快...");
DeliverCallback deliverCallback = (consumerTag,message) ->{
SleepUtils.sleep(1);
System.out.println("接收到的消息为:" + new String(message.getBody()));
/**
* 手动应答
* 1.消息标记 tag消息标记 tag
* 2.是否批量应答消息,建议不批量
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag) ->{
System.out.println(consumerTag + ":接收消息中断");
};
//接收消息
boolean autoAck = false;
//设置不公平分发,1为不公平分发,大于1代表预取值
channel.basicQos(2);
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
这里我们需要通过一张图片说一下这个预取值的概念