一、引言
消息队列作用:解耦、异步、削峰
为什么使用消息队列?消息对列有什么好处? - 爱笑的Terry - 博客园https://www.cnblogs.com/terry-love/p/11492397.html
二、RabbitMQ介绍
市面上比较火爆的几款MQ:
ActiveMQ,RocketMQ,Kafka,RabbitMQ。
语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
学习成本:RabbitMQ非常简单。
RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。
RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。
三、RabbitMQ安装(例子为阿里云服务器部署)
首先查看一下docker中有没有在运行的容器,是否占用了端口号
接着创建文件夹,文件夹中创建 docker-compose.yaml文件
在ymal文件中配置如下
version: "3.1"
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:management
restart: always
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
volumes:
- ./data:/var/lib/rabbitmq
启动(第一次启动建议前台启动)
docker-compose up
登陆 (注意Google浏览器有兼容问题,使用IE)
注意!我的阿里云的服务器ip为8.130.166.101,大家注意根据自己的ip地址进行更改
后面的15672为端口号
http://8.130.166.101:15672/http://8.130.166.101:15672/
用户名guest 密码guest
输入之后就可以进来了
四、RabbitMQ架构【重点
】
4.1 官方的简单架构图
Publisher - 生产者:发布消息到RabbitMQ中的Exchange
Consumer - 消费者:监听RabbitMQ中的Queue中的消息
Exchange - 交换机:和生产者建立连接并接收生产者的消息
Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
Routes - 路由:交换机以什么样的策略将消息发布到Queue
简单架构图 |
---|
4.2 RabbitMQ的完整架构图
完整架构图
完整架构图 |
---|
五、RabbitMQ的使用【重点
】
5.1 RabbitMQ的通讯方式
通讯方式 |
---|
5.2 Java连接RabbitMQ
5.2.1 创建maven项目
5.2.2 导入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
5.2.3 创建工具类连接RabbitMQ
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMqUtils {
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("8.130.166.101");
connectionFactory.setPort(5672);
//设置登录用户名和密码
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//存储在哪个位置
connectionFactory.setVirtualHost("/");
return connectionFactory.newConnection();
}
}
5.3 Hello-World
一个生产者,一个默认的交换机,一个队列,一个消费者
结构图 |
---|
创建生产者,创建一个channel,发布消息到exchange,指定路由规则。
@Test
public void Publisher() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
// 参数1:指定exchange,使用""。
// 参数2:指定路由的规则,使用具体的队列名称。
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
// 向队列 发送消息 " hello-queue"
channel.basicPublish("","hello-queue",null,"hello-queue".getBytes());
channel.close();
}
@After
public void destroy() throws IOException {
connection.close();
}
创建消费者,创建一个channel,创建一个队列,并且去消费当前队列
/*
* 消费者*/
@Test
public void consumerTest() throws IOException {
//创建管道
Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
//channel和队列绑定
channel.queueDeclare("hello-queue",true,true,false,null);
//抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//从队列中获取信息
System.out.println("接受消息:"+new String(body,"utf-8"));
}
};
// channel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// 参数3 消费者
channel.basicConsume("hello-queue",true,consumer);
//让程序卡住
System.in.read();
}
运行两次生产者
可以在消费者这里获取到消息
手动ack机制
手动ack 机制:保证消息对应的业务 已经真正的处理了,而不是仅仅接收到该消息
/*
* 消费者*/
@Test
public void consumerTest() throws IOException {
//创建管道
Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
//channel和队列绑定
channel.queueDeclare("hello-queue",true,true,false,null);
//抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//从队列中获取信息
System.out.println("接受消息:"+new String(body,"utf-8"));
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// channel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// 参数3 消费者
channel.basicConsume("hello-queue",false,consumer);
//让程序卡住
System.in.read();
}
5.4 Work
一个生产者,一个默认的交换机,一个队列,两个消费者
结构图 |
---|
只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing
消费者指定Qoa和手动ack
两个消费者消费同一个消息队列
public class WorkerQueueTest {
private Connection connection;
@Before
public void init() throws IOException, TimeoutException {
connection = RabbitMqUtils.getConnection();
}
/*
* 消费者*/
@Test
public void consumer1Test() throws IOException {
//创建管道
Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
//channel和队列绑定
channel.queueDeclare("work-queue",true,false,false,null);
//抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//从队列中获取信息
System.out.println("接受消息:"+new String(body,"utf-8"));
}
};
// channel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// 参数3 消费者
channel.basicConsume("work-queue",true,consumer);
//让程序卡住
System.in.read();
}
@Test
public void consumer2Test() throws IOException {
//创建管道
Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
//channel和队列绑定
channel.queueDeclare("work-queue",true,false,false,null);
//抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//从队列中获取信息
System.out.println("接受消息:"+new String(body,"utf-8"));
}
};
// channel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// 参数3 消费者
channel.basicConsume("work-queue",true,consumer);
//让程序卡住
System.in.read();
}
@Test
public void Publisher() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
//参数1: 交换机名称 没有就是默认 ""
// 参数2 :队列名称 或者是 消息类型信息 路由规则进行匹配
// 参数3: 指定传递的消息所携带的properties,使用null。
// 参数4:消息
// 向队列 发送十条消息 " hello-queue"
for (int i = 0; i < 10; i++) {
channel.basicPublish("","work-queue",null,("work-queue"+i).getBytes());
}
channel.close();
}
@After
public void destroy() throws IOException {
connection.close();
}
}
首先运行两个消费者,然后运行一个生产者
可以看到结果如下
以下一定要先启动消费者在启动生产者
5.5 Publish/Subscribe
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
---|
声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。
让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。
public class PublishSubTest {
private Connection connection;
@Before // 在@Test 之前调用初始化数据
public void init() throws IOException, TimeoutException {
connection = RabbitMqUtils.getConnection();
}
/**
* 测试时,一定要先启动 消费者,在启动生产者
*/
@Test // 进行单元测试
public void consumer1Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("pubsub-queue1",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者1 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("pubsub-queue1", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test // 进行单元测试
public void consumer2Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("pubsub-queue2",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者2 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("pubsub-queue2", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
/**
* 生产者 模式
*/
@Test
public void publishTest() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
//将 chanel 和 自定义的交换机 绑定 "pubsub-exchange"
//参数1: exchange的名称
//参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
//FANOUT - pubsub 交换机 会将消息发送到 所有的队列中
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
//参数1 队列名
//参数2 交换机名
//参数3 路由规则
channel.queueBind("pubsub-queue1", "pubsub-exchange", "");
channel.queueBind("pubsub-queue2", "pubsub-exchange", "");
//参数1: 交换机名称 没有就是默认 ""
// 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配
// 参数3: 指定传递的消息所携带的properties,使用null。
// 参数4:消息
// 向队列 发送消息
// 发送10条数据 每个消费者得到5条数据
for (int i = 0; i < 10; i++) {
// 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange
// "pubsub-exchange" 交换机名称
// "" 路由规则
channel.basicPublish("pubsub-exchange", "",null,("pubsub--i:" +i).getBytes());
}
channel.close();
}
@After// 在@Test 之后进行 ,是数据销毁
public void destroy() throws IOException {
connection.close();
}
}
消费者还是正常的监听某一个队列即可。
5.6 Routing
以下一定要先启动消费者在启动生产者
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
---|
生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。
消费者没有变化
public class RoutingTest {
private Connection connection;
@Before
public void getConnection() throws IOException, TimeoutException {
connection = RabbitMqUtils.getConnection();
}
@Test
public void consumer1Test() throws IOException {
final Channel channel = connection.createChannel();
channel.queueDeclare("routing-info-queue",true,false,false,null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("routing-info-queue接收到消息"+new String(body,"utf-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("routing-info-queue",false,consumer);
System.in.read();
}
@Test
public void consumer2Test() throws IOException {
final Channel channel = connection.createChannel();
channel.queueDeclare("routing-error-queue",true,false,false,null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("routing-error-queue接受消息:"+new String(body,"utf-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("routing-error-queue",false,consumer);
System.in.read();
}
@Test
public void publisherTest() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
//将 chanel 和 自定义的交换机 绑定 "Routing-exchange"
//参数1: exchange的名称
//参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
// DIRECT - Routing 交换机 会将消息发送到 所有的队列中
channel.exchangeDeclare("Routing-exchange",BuiltinExchangeType.DIRECT);
//参数1 队列名
//参数2 交换机名
//参数3 路由规则
// 所有消息为 info 的消息都会 由Routing-exchange发送到routing-info-queue 队列中
channel.queueBind("routing-info-queue","Routing-exchange","info");
channel.queueBind("routing-error-queue","Routing-exchange","error");
//参数1: 交换机名称 没有就是默认 ""
// 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配
// 参数3: 指定传递的消息所携带的properties,使用null。
// 参数4:消息
// 向队列 发送消息
// 发送10条数据 每个消费者得到5条数据
for (int i = 0; i < 10; i++) {
if (i%2==1){//奇数
channel.basicPublish("Routing-exchange","info",null,("Routing--i="+i).getBytes());
}else {
channel.basicPublish("Routing-exchange","error",null,("Routing--i="+i).getBytes());
}
}
channel.close();
}
@After
public void destroy() throws IOException {
connection.close();
}
}
运行结果
5.7 Topic
以下一定要先启动消费者在启动生产者
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
---|
生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, * -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。
举个栗子
//2. 创建exchange并指定绑定方式
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
channel.queueBind("topic-queue-2","topic-exchange","fast.#");
channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");
//3. 发布消息到exchange,同时指定路由的规则
channel.basicPublish("topic-exchange","fast.red.monkey",null,"红快猴子".getBytes());
channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
channel.basicPublish("topic-exchange","fast.white.cat",null,"快白猫".getBytes());
消费者只是监听队列,没变化。
public class TopicTest {
private Connection connection;
@Before // 在@Test 之前调用初始化数据
public void init() throws IOException, TimeoutException {
connection = RabbitMqUtils.getConnection();
}
/**
* 测试时,一定要先启动 消费者,在启动生产者
*/
@Test // 进行单元测试
public void consumer1Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("topic-queue-1",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者1 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("topic-queue-1", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
@Test // 进行单元测试
public void consumer2Test() throws IOException {
// channel 管道 连接 消费者和队列
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// chanel 和 队列绑定
channel.queueDeclare("topic-queue-2",true,false,false,null);
// 抱着每次消费者 消费一条数据
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 从队列中获取消息 并处理器
System.out.println("消费者2 接受到消息:"+new String(body,"utf-8") );
// 所有的业务都完成之后 可以手动的ack
// envelope.getDeliveryTag() // 消息标记 0 1 2
// false ack 之后 不删除
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// chanel 和 消费者绑定
// 参数1 队列名称
// 参数2 是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
// false 需要手动ack
// 参数3 消费者
channel.basicConsume("topic-queue-2", false,consumer);
// 让程序一致卡在这里 消费者 可以一致消费消息
System.in.read();// 等待客户端命令行 的输入
}
/**
* 生产者 模式
*/
@Test
public void publishTest() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
//将 chanel 和 自定义的交换机 绑定 "pubsub-exchange"
//参数1: exchange的名称
//参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
// DIRECT - Routing 交换机 会将消息发送到 所有的队列中
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//参数1 队列名
//参数2 交换机名
//参数3 路由规则
// 所有消息为 info 的消息都会 由Routing-exchange发送到topic-queue-1 队列中
channel.queueBind("topic-queue-1", "topic-exchange", "*.orange.*");
channel.queueBind("topic-queue-2", "topic-exchange", "big.*.*");
//参数1: 交换机名称 没有就是默认 ""
// 参数2 :队列名称 或者是 消息类型信息 真的会路由规则进行匹配
// 参数3: 指定传递的消息所携带的properties,使用null。
// 参数4:消息
// 向队列 发送消息
// 发送10条数据 每个消费者得到5条数据
for (int i = 0; i < 10; i++) {
// 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange
//参数1: "pubsub-exchange" 交换机名称
//参数2: "" 路由规则
if (i%2==1){// 奇数 orange
channel.basicPublish("topic-exchange", "xxxasdasd.orange.xfsdf",null,("topic--i:" +i).getBytes());
}else{ // 偶数 error
channel.basicPublish("topic-exchange", "big.xxxx.uii",null,("routing--i:" +i).getBytes());
}
}
channel.close();
}
@After// 在@Test 之后进行 ,是数据销毁
public void destroy() throws IOException {
connection.close();
}
}
运行结果:
六、RabbitMQ整合SpringBoot【重点
】
6.1 SpringBoot整合RabbitMQ
6.1.1 创建SpringBoot工程
6.1.2 导入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
6.1.3 application.properties中编写配置文件
spring.rabbitmq.host=8.130.166.101
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
6.1.4 声明exchange、queue
@Configuration
public class RabbitConfig {
@Bean
public TopicExchange topicExchange(){
TopicExchange topicExchange = new TopicExchange("springboot-topic-exchange", true, false);
return topicExchange;
}
@Bean
public Queue queue(){
Queue queue = new Queue("springboot-queue", true, false, false, null);
return queue;
}
@Bean
public Binding binding(TopicExchange topicExchange,Queue queue){
Binding binding = BindingBuilder.bind(queue).to(topicExchange).with("*.java.*");
return binding;
}
}
6.1.5 发布消息到RabbitMQ,创建测试类,生产者
@SpringBootTest
@RunWith(SpringRunner.class)
public class Mytest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void publisherTest(){
rabbitTemplate.convertAndSend("springboot-topic-exchange","xxxx.java.12dssad","是这样的吗?");
System.out.println("发送消息");
}
}
6.1.6 创建消费者监听消息
@Component
public class Consumer {
@RabbitListener(queues = "springboot-queue")
public void consumer1(String msg, Channel channel, Message message){
System.out.println("消费者得到:"+msg);
System.out.println("msg = "+message);
}
}
结果如下:
6.2 手动Ack
6.2.1 添加配置文件
spring.rabbitmq.listener.simple.acknowledge-mode=manual
6.2.2 手动ack
此时,多次运行Test 则会收到多条未消费的消息
@Component
public class Consumer {
@RabbitListener(queues = "springboot-queue")
public void consumer1(String msg, Channel channel, Message message) throws IOException {
System.out.println("消费者得到:"+msg);
System.out.println("msg = "+message+" "+message.getMessageProperties().getDeliveryTag());
int i = 1/0;
// 手动ack
// 将该消息的序号 ack message.getMessageProperties().getDeliveryTag()
// 多条消息是否一起ack false
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
这是运行了三次,在第三次的时候会直接出来多个未消费的对象
七、RabbitMQ的其他操作
7.1 消息的可靠性
RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。
7.2 SpringBoot实现
7.2.1 编写配置文件
#解决数据安全问题
spring.rabbitmq.publisher-confirm-type=simple
spring.rabbitmq.publisher-returns=true
7.2.2 开启Confirm和Return
@Component
public class ConfirmReturnCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
// 在容器中 加入该bean 会调用 该方法( init 方法 , @PostConstruct 标记方法)
@PostConstruct//相当于bean生命周期的init
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("CorrelationData"+correlationData);
System.out.println("s="+s);
if (b){
System.out.println("消息到达交换机");
}
}
// return 机制 ,一般情况下 不会回调,只有在交换机的消息不能写入队列才会调用
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("message"+message);
}
}
7.3 避免消息重复消费()
重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
重复消费 |
---|
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
生产者,发送消息时,指定messageId
消费者,在消费消息时,根据具体业务逻辑去操作redis
7.4 SpringBoot如何实现消息不重复
7.4.1 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
7.4.2 编写配置文件
#配置redis
spring.redis.host=8.130.166.101
spring.redis.port=6379
7.4.3 修改生产者
@SpringBootTest
@RunWith(SpringRunner.class)
public class Mytest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void publisherTest(){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("springboot-topic-exchange","xxxx.java.12dssad","是这样的吗?"+System.currentTimeMillis(),correlationData);
System.out.println("发送消息");
}
}
7.4.4 修改消费者
@Component
public class Consumer {
@Autowired
private RedisTemplate redisTemplate;
@RabbitListener(queues = "springboot-queue")
public void consumer1(String msg, Channel channel, Message message) throws IOException {
System.out.println("消费者得到:"+msg);
System.out.println("msg = "+message+" "+message.getMessageProperties().getDeliveryTag());
// 得到处理消息的唯一id
String id = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
System.out.println("id="+id);
// 如果能设置成功说明 该消息没有被处理过
// 该 id 值为0 代表 正在处理 1代表处理完成
if (redisTemplate.opsForValue().setIfAbsent(id,"0",10, TimeUnit.SECONDS)){
System.out.println("消费者,处理该业务:"+msg);
//处理完之后,变为1
redisTemplate.opsForValue().set(id,"1",10,TimeUnit.SECONDS);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
// 如果不能设置key 说明 已经有消费者在处理
if (redisTemplate.opsForValue().get(id).equals("1")){
// 手动ack
// 将该消息的序号 ack message.getMessageProperties().getDeliveryTag()
// 多条消息是否一起ack false
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
}
如果运行第一遍出不来的话,就允许第二遍,如果第二遍还不出来就是写错了