目录
1、什么是AMQP
2、什么是SpringAMQP
SpringAMQP 的特点:
3、Basic Queue简单队列模型案例
3.1、消息发送
3.2、消息接收
4、WordQueue
4.1、消息发送
4.2、消息接收
4.3、测试
4.4、能者多劳
4.5、总结
5、发布/订阅
6、Fanout
6.1、声明队列和交换机
编辑
6.2、消息发送
6.3、消息接收
6.4、总结
7、Direct
7.1、基于注解声明队列和交换机
编辑
7.2、消息发送
7.3、总结
8、Topic
8.1、说明
8.2、消息发送
8.3、消息接收
编辑
8.4、总结
9、消息转换器
9.1、测试默认转换器
9.2、配置JSON转换器
1、什么是AMQP
AMQP(Advanced Message Queuing Protocol)是一种消息传递协议,它用于不同应用程序和
系统之间传递消息。想象一下,AMQP 就像是不同系统之间的“邮递员”或者“快递服务”,帮助它们
可靠地发送和接收消息。
为什么需要 AMQP?在一个现代应用中,可能有多个系统或服务需要相互通信。例如,一个电商
网站的支付系统、库存系统和订单系统可能需要交换信息。AMQP 通过提供一个标准化的消息传
递方式,帮助这些系统之间安全、可靠地交换信息。
AMQP 协议的特点包括:
- 可靠性:即使网络发生故障,AMQP 会确保消息不丢失,保证消息按顺序送达。
- 异步通信:发送方可以在消息发送后继续做其他事,接收方可以在方便时处理消息。
- 灵活的消息队列:消息可以排队等待处理,支持点对点和发布/订阅等多种消息模型。
2、什么是SpringAMQP
SpringAMQP 是 Spring 框架提供的一个模块,用于简化基于 AMQP 协议的消息通信。它是
Spring 系列的一部分,旨在帮助 Java 开发者更轻松地在应用中实现 AMQP 消息传递的功能。通
过 SpringAMQP,开发者无需深入理解 AMQP 协议的细节,就能快速搭建一个消息队列系统,发
送和接收消息。
通俗点说,SpringAMQP 就是一个让你在 Java 应用中“玩转” AMQP 协议的工具箱。它封装了与消
息队列的连接、消息的发送和接收等繁琐操作,开发者只需要关注业务逻辑,其他的交给
SpringAMQP。
SpringAMQP 的特点:
-
简化配置和开发:SpringAMQP 提供了很多方便的工具和配置选项,让开发者可以快速创建和管理消息队列、发送消息、接收消息,而无需手动管理复杂的连接和消息协议。
-
与 RabbitMQ 集成:RabbitMQ 是最常用的 AMQP 实现,SpringAMQP 可以很方便地与 RabbitMQ 集成,帮助开发者轻松地进行消息传递。
-
消息监听机制:SpringAMQP 提供了消息监听器,开发者只需要定义处理逻辑,Spring 会自动帮你处理从队列中接收消息的工作。
3、Basic Queue简单队列模型案例
3.1、消息发送
创建一个SpringBoot项目 创建两个子模块一个生产者 一个消费者
父工程中引入spring-boot-start-amqp依赖
在生产者applicaiton.yml配置连接mq的配置
向simple.queue队列发送消息
在mq控制台可以看到有一条消息 我们可以查看发送的消息
总结:
1、引入spring-boot-start-amqp依赖
2、配置文件中配置rabbitmq依赖
3、向mq发送消息
3.2、消息接收
同样的我们需要在消费者这边引依赖、配置mq配置
生产者使用RabbitTemplate来发送消息到队列
消费者那么就监听这个队列 如果有消息就接收处理
编写监听类 使用@RabbitListener类来监听simple.queue队列
消息需要spring来给它 所以我们启动项目
项目启动 监听到队列的消息 消费消息
总结:
1、消费者监听队列 使用@RabbitListener
2、项目需要启动 Spring把消息给监听类
4、WordQueue
Word Queue也称为任务模型 就是让多个消费者绑定到同一个队列 共同消费队列中的消息
当生产者发送消息的速度>消费者消费消息的速度 消息就会出现堆积 这时候我们就可以使用多个
消费者共同处理消息 避免消息堆积
4.1、消息发送
这次我们循环发送,模拟大量消息堆积现象。
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
4.2、消息接收
要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的
方法:
注意到这个消费者sleep了1000秒,模拟任务耗时。
4.3、测试
启动ConsumerApplication后,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。
可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。
4.4、能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文
件,添加配置:
只有消费完了消息才能继续消费
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
4.5、总结
Word模型的作用:
1、多个消费者绑定到同一个队列 同一条消息只会被一个消费者消费
2、通过设置prefetch来控制消费者预取的消息数量
5、发布/订阅
发布订阅的模型如图:
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
-
Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
-
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
-
Fanout:广播,将消息交给所有绑定到交换机的队列
-
Direct:定向,把消息交给符合指定routing key 的队列
-
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
-
-
Consumer:消费者,与以前一样,订阅队列,没有变化
-
Queue:消息队列也与以前一样,接收消息、缓存消息。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与
Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
6、Fanout
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:
-
1) 可以有多个队列
-
2) 每个队列都要绑定到Exchange(交换机)
-
3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
-
4) 交换机把消息发送给绑定过的所有队列
-
5) 订阅队列的消费者都能拿到消息
我们的计划是这样的:
-
创建一个交换机 itcast.fanout,类型是Fanout
-
创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout
6.1、声明队列和交换机
Spring提供了一个接口Exchange,来表示所有不同类型的交换机:
在consumer中创建一个类,声明队列和交换机:
Spring会自动创建交换机和队列并且绑定
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 第一个队列
* @return
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列交换机
* @param fanoutQueue1
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingExchangeQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange);
}
/**
* 第二个队列
* @return
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列交换机
* @param fanoutQueue2
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingExchangeQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange);
}
}
6.2、消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testFanoutExchange(){
//交换机
String exchangeName = "itcast.fanout";
//消息
String message = "hello,everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
6.3、消息接收
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg){
System.err.println("消费者1接收到的消息:"+msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg){
System.err.println("消费者2接收到的消息:"+msg);
}
6.4、总结
交换机的作用是什么?
-
接收publisher发送的消息
-
将消息按照规则路由到与之绑定的队列
-
不能缓存消息,路由失败,消息丢失
-
FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
-
Queue
-
FanoutExchange
-
Binding
7、Direct
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的
消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
-
队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) -
消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 -
Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
案例需求如下:
-
利用@RabbitListener声明Exchange、Queue、RoutingKey
-
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
-
在publisher中编写测试方法,向itcast. direct发送消息
7.1、基于注解声明队列和交换机
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到的消息:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到的消息:"+msg);
}
7.2、消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testDirectExchange(){
//交换机
String exchangeName = "direct.exchange";
//消息
String message = "hello,red!";
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
给red发送消息
给blue发送消息
给yellow发送消息
7.3、总结
描述下Direct交换机与Fanout交换机的差异?
-
Fanout交换机将消息路由给每一个与之绑定的队列
-
Direct交换机根据RoutingKey判断路由给哪个队列
-
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
-
@Queue
-
@Exchange
8、Topic
8.1、说明
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过
Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
图示:
解释:1
-
Queue1:绑定的是
china.#
,因此凡是以china.
开头的routing key
都会被匹配到。包括china.news和china.weather -
Queue2:绑定的是
#.news
,因此凡是以.news
结尾的routing key
都会被匹配。包括china.news和japan.news
案例需求:
实现思路如下:
-
并利用@RabbitListener声明Exchange、Queue、RoutingKey
-
在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
-
在publisher中编写测试方法,向itcast. topic发送消息
8.2、消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testTopicExchange(){
//交换机
String exchangeName = "itcast.topic";
//消息
String message = "陶然带你学Java";
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
8.3、消息接收
在consumer服务的SpringRabbitListener中添加方法:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key = {"china.#"}
))
public void listenTopicQueue1(String msg){
System.out.println("消费者2接收到的消息:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到的消息:"+msg);
}
8.4、总结
描述下Direct交换机与Topic交换机的差异?
-
Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**
分割 -
Topic交换机与队列绑定时的bindingKey可以指定通配符
-
#
:代表0个或多个词 -
*
:代表1个词
9、消息转换器
之前说过,Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序
列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
-
数据体积过大
-
有安全漏洞
-
可读性差
我们来测试一下。
9.1、测试默认转换器
我们修改消息发送的代码,发送一个Map对象:
@Test
public void testQueue(){
String queue = "jack.queue";
//消息
Map<String, String> message = new HashMap<>();
message.put("name", "陶然同学");
message.put("age", "22");
rabbitTemplate.convertAndSend(queue, message);
}
停止consumer服务
发送消息后查看控制台:
9.2、配置JSON转换器
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON
方式来做序列化和反序列化。
在publisher和consumer两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
配置消息转换器。
在启动类中添加一个Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}