1.MQ基础介绍
同步调用
OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才
能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用
异步调用
异步调用通常是基于消息通知的方式,包含三个角色:
消息发送者:投递消息的人,就是原来的调用者
消息接收者:接收和处理消息的人,就是原来的服务提供者
消息代理:管理、暂存、转发消息,就是原来的服务提供方
不同的MQ介绍
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
本文讲解RabbitMQ
2.RabbitMQ
1.安装部署
在docker安装rabbit,代码如下:
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hm-net\
-d \
rabbitmq:3.8-management
安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。
2.收发消息
交换机
点开amq.fanout交换机,将交换机和队列做绑定
队列
指定队列的名字添加队列
3.数据隔离
问题:要实现不同虚拟主机之间不同的交换机之间的隔离,需要用到数据隔离的技术。
添加用户
添加自己的虚拟主机
3.SpringAMQP
RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。
而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且
还基于SpringBoot对其实现了自动装配,使用起来非常方便。
1.快速入门
导入依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置信息:
spring:
rabbitmq:
host: 192.168.145.129 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
编写消息发送者和接收者的代码
发送者:写了一个测试类,注入了RabbitTemplate对象调用convertAndSend方法把队列名和消息发送出去。
@SpringBootTest
public class amqp {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testsimplequeue(){
//队列名
String queue="simple.queue";
//消息
String message="hello";
//发送消息
rabbitTemplate.convertAndSend(queue,message);
}
}
接收者:在方法上加上 @RabbitListener(queues = "simple.queue")即可实现对消息的接收。
@Component
public class listen {
//队列的名称
@RabbitListener(queues = "simple.queue")
//发送者发送的什么类型,接收者用什么类型接收
public void listen1(String message){
System.out.println("接收到消息"+message);
}
}
2.WorkQueues模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
模拟此环境再多加一个消费者即可。
AMQP默认是采用类似轮询的机制,性能慢的机器会拖慢速度,因此需做如下配置实现能者多劳:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
3.交换机
1.Fanout交换机
fanout交换机是将消息全部发送到绑定的队列中。
使用fanout交换机发送消息要使用如下函数:
@Test
public void testfanout(){
//交换机名称
String exchange="hmall.fanout";
//消息
String message="hello,everyone";
//fanout交换机发送消息 交换机 队列 消息
rabbitTemplate.convertAndSend(exchange,null,message);
}
2.Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey
(路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key
进行判断,只有 队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息
绑定路由key实例:
代码示例:
rabbitTemplate.convertAndSend(exchange,"white",message);指定路由key,发送到指定的队列。
@Test
public void testdirect(){
//交换机名称
String exchange="hmall.direct";
//消息
String message="hello,everyone,direct";
//发送消息 指定路由key
rabbitTemplate.convertAndSend(exchange,"white",message);
}
3.Topic交换机
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。
只不过Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以.
分割,例如: item.insert
通配符规则:
-
#
:匹配一个或多个词 -
*
:匹配不多不少恰好1个词
举例:
-
item.#
:能够匹配item.spu.insert
或者item.spu
-
item.*
:只能匹配item.spu
绑定示例:
代码示例:
@Test
public void testdirect(){
//交换机名称
String exchange="hmall.topic";
//消息
String message="hello,everyone,topic";
//发送消息 topic
rabbitTemplate.convertAndSend(exchange,"#.news",message);
}
4.基于bean声明队列交换机
在java代码中注册交换机和队列
fanout交换机代码如下:
@Configuration
public class fanout {
//注册交换机
@Bean
public FanoutExchange fanoutExchange(){
//return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
return new FanoutExchange("hmall.fanout");
}
//注册队列
@Bean
public Queue queue1(){
//return QueueBuilder.durable("fanout.queue1").build();
return new Queue("fanout.queue1");
}
//绑定
@Bean
public Binding fanoutbindingqueue1(Queue queue1,FanoutExchange fanout){
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Queue queue2(){
return new Queue("fanout.queue2");
}
@Bean
public Binding fanoutbindingqueue2(Queue queue2,FanoutExchange fanout){
return BindingBuilder.bind(queue2).to(fanout);
}
}
direct交换机代码如下:
@Configuration
public class DirectConfig {
/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}
/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
5.基于注解声明队列交换机
基于bean声明太过复杂,尤其是direct交换机绑定key时,因此要用到注解声明。
在注解下同时创建队列交换机并完成绑定,代码如下:
@RabbitListener(bindings = @QueueBinding(
//注册队列 名称 持久化
value = @Queue(name = "direct.queue",durable = "ture"),
//注册交换机 名称 类型为direct
exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
//交换机绑定队列属性有red和white
key = {"red","white"}
))
public void listen(String message){
System.out.println("接收到消息"+message);
}
4.消息转化器
spring在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:(比如用rabbitmq发送map集合,数据被序列化后字节太多而且可读性差)
-
数据体积过大
-
有安全漏洞
-
可读性差
因此我们使用JSON方式来做序列化和反序列化。
导入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
配置消息转换器,在publisher
和consumer
两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
RabbitMQ高级
未完待续