1、RabbitMQ的作用
异步处理
应用解耦
流量控制、削峰
2、概述
1)消息中间件有两个重要的概念:消息代理和目的地
当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地
2)消息队列主要有两种形式的目的地
队列(queue)点对点的消息通信(point to point)
主题(topic) 发布(publish) / 订阅(subscribe)消息通信
3)点对点模式
消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列获取消息,消息读取后被移出队列
消息只有唯一的发送者和接收者,并不是说只能有一个接收者
4)发布订阅模式
发送者发送消息到主题,多个接收者监听这个主题,那么会在消息到达时同时收到消息
5)JMS(Java Message Service)
基于JVM消息代理的规范。ActiveMQ是JMS的实现
6)AMQP(Advanced Message Queuing Protocal)
高级消息队列协议,也是一个消息代理的规范,兼容JMS
RabbitMQ是AMQP的实现
JMS和AMQP的区别:
7)Spring支持
spring-jms提供了对JMS的支持
spring-rabbit提供了对AMQP的支持
需要ConnectionFactory的实现来连接消息代理
提供JmsTemplate、RabbitTemplate来发送消息
@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
@EnableJms、@EnableRabbit开启支持
8)Spring Boot自动配置
JmsAutoConfiguration RabbitAutoConfiguration
9)市面的MQ产品
ActiveMQ、RabbitMQ、RocketMQ、Kafka
3、RabbitMQ的概念
1)Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等
2)Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序
3)Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别
4)Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走
5)Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。
6)Connection
网络连接,比如一个TCP连接
7)Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接
8)Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
9)Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /
10)Broker
4、Docker安装RabbitMQ
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
4369, 25672 (Erlang发现&集群端口) 5672, 5671 (AMQP端口)
15672 (web管理后台端口) 61613, 61614 (STOMP协议端口)
1883, 8883 (MQTT协议端口)
5、RabbitMQ中的Exchange类型
1)Direct.Exchange
交换机会根据消息的路由键将消息发送到某个队列上,是完全匹配单播的模式
2)Fanout.Exchange
3)Topic Exchange
6、SpringBoot整合RabbitMQ
1)pom文件中引入spring-boot-starter-amqp
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2)yml文件中配置RabbitMQ的地址和端口
spring: #配置数据源 rabbitmq: host: 121.40.182.123 port: 5672 #RabbitMQ客户端需要连接的端口
3)创建交换机Exchange,队列Queue,绑定 Binding
①、在RabbitMQ管理端进行手动创建
②、通过amqpAdmin在代码中进行创建
/** * 创建交换机 */ @Test public void createExchange(){ DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false); amqpAdmin.declareExchange(directExchange); log.info("Exchange[{}]创建成功","hello-java-exchange"); } /** * 创建队列 */ @Test public void createQueue(){ Queue queue = new Queue("hello-java-queue",false,false,false); amqpAdmin.declareQueue(queue); log.info("Queue[{}]创建成功","hello-java-queue"); } /** * @Description: 将交换机和队列进行绑定 * @param * @author houChen * @date 2022/1/24 20:51 */ @Test public void createBinding(){ // public Binding(String destination【目的地】, // Binding.DestinationType destinationType【目的地的类型:Queue Exchange】, String exchange【交互机】, String routingKey【路由键】, Map<String, Object> arguments【其他参数】) // 整体就是: 将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键 Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null); amqpAdmin.declareBinding(binding); log.info("Binding[{}]创建成功","hello-java-binding");
4)发送消息
@Test public void sendMessage(){ OrderEntity orderEntity = new OrderEntity(); orderEntity.setId(1L); orderEntity.setCreateTime(new Date()); orderEntity.setTotalAmount(new BigDecimal(39)); rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderEntity); log.info("消息发送成功: {}","hello world"); }
5)接收消息
/* 接收消息 参数可以写一下类型 1、Message message : 原生消息详细信息 消息头 + 消息体 2、T<发送的消息的类型> t 3、Channel channel : 当前传输数据的通道 【注意】 Queue: 一个Queue可以很多个客户端来监听,只要有一个客户端收到消息,Queue就会删除消息,有且仅有一个客户端会接收到消息 场景: 1、当订单服务启动多个时,发出一个消息时,有且仅有一个客户端会收到消息 2、一个客户端,只有当一个消息处理完,才会接受另一个消息 */ @RabbitListener(queues = {"hello-java-queue"}) public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel){ System.out.println("接收到消息:" + content); //channel内按顺序递增的 long deliveryTag = message.getMessageProperties().getDeliveryTag(); // deliveryTag: 消息投递标签 multi: 是否批量确认 try { if(deliveryTag%2==0){ System.out.println("手动签收"); //参数: long deliveryTag,boolean multiple channel.basicAck(deliveryTag,false); }else{ System.out.println("拒绝签收"); //参数 : long deliveryTag,boolean multiple,boolean require //require = false : 直接将该消息丢弃 require = true: 将该消息发回服务器,服务器重新入队 channel.basicNack(deliveryTag,false,false); //channel.basicReject(); } } catch (IOException e) { //网络中断 e.printStackTrace(); } }
7、RabbitMQ消息确认机制 - 可靠到达
1)broker收到消息进行回调
①、yml文件中添加配置:
spring.rabbitmq.publisher-confirms= true # 开启发送端抵达broker的确认
②、rabbitTemplate中设置confirmCallback()
@Slf4j @Configuration public class RabbitConfg { @Autowired RabbitTemplate rabbitTemplate; /** * 消息发送的序列化机制 */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 定制RabbitTemplate */ @PostConstruct public void initRabbitTemplate(){ //设置确认回调 RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override /** * @param correlationData 当前消息的唯一关联数据(消息的唯一ID) * @param ack 消息是否成功收到 (生产者的消息是否正确投递到Exchange) * @param cause 消息投递失败的原因 */ public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("confirm...correlationData[{}] ack[{}] cause[{}]",correlationData,ack,cause); } }; rabbitTemplate.setConfirmCallback(confirmCallback); } }
2)消息正确抵达队列进行回调
①、yml文件进行配置
spring.rabbitmq.publisher-returns= true # 开启发送端抵达队列的确认
spring.rabbitmq.template.mandatory= true # 只要抵达队列,以异步发送优先回调我们的这个return
②、设置消息抵达队列的回调
//设置消息抵达队列的确认回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override /** * 只要消息没有投递给指定的队列,就会触发这个失败回调 * @param message 投递失败消息的详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 当时这个消息发送给哪个交换机 * @param routingKey 当时这个消息使用的是哪个路由键 */ public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("Fail message[{}] replyCode[{}] replyText[{}] exchange[{}] routingKey[{}]",message,replyCode,replyText,exchange,routingKey); } });
3)消费端消息确认
yml文件中进行配置
spring.rabbitmq.listener.simple.acknowledge-mode: manual
消费端代码对消息进行处理后,进行手动确认
@RabbitListener(queues = {"hello-java-queue"}) public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel){ System.out.println("接收到消息:" + content); //channel内按顺序递增的 long deliveryTag = message.getMessageProperties().getDeliveryTag(); // deliveryTag: 消息投递标签 multi: 是否批量确认 try { if(deliveryTag%2==0){ System.out.println("手动签收"); //参数: long deliveryTag,boolean multiple channel.basicAck(deliveryTag,false); }else{ System.out.println("拒绝签收"); //参数 : long deliveryTag,boolean multiple,boolean require //require = false : 直接将该消息丢弃 require = true: 将该消息发回服务器,服务器重新入队 channel.basicNack(deliveryTag,false,false); //channel.basicReject(); } } catch (IOException e) { //网络中断 e.printStackTrace(); } }