消息队列使用

一、什么是消息队列

消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列。

消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。

消息队列使用

Producer:消息生产者,负责产生和发送消息到 Broker;

Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;

Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;


二、为什么需要消息队列

1、屏蔽异构平台的细节:发送方、接收方系统之间不需要了解双方,只需认识消息。

2、异步:消息堆积能力;发送方接收方不需同时在线,发送方接收方不需同时扩容(削峰)。

3、解耦:防止引入过多的API给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。

4、复用:一次发送多次消费。

5、可靠:一次保证消息的传递。如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它。

6、提供路由:发送者无需与接收者建立连接,双方通过消息队列保证消息能够从发送者路由到接收者,甚至对于本来网络不易互通的两个服务,也可以提供消息路由。


三、消息队列有什么优点和缺点

1、核心优点

   解耦、异步、削峰

2、缺点

   1)系统可用性降低:系统引入的外部依赖越多,越容易挂掉。

   2系统复杂度提高

   3一致性问题:消息传递给多个系统,部分执行成功,部分执行失败,容易导致数据不一致


四、消息队列对比

消息队列使用

各种对比之后,有如下建议:

    ActiveMQ,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以不推荐

     RabbitMQ,虽然erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是毕竟是开源的,比较稳定的支持,活跃度也高,推荐中小型公司使用推荐

    RocketMQ,阿里出品,Java语言编写,经过了阿里多年双十一大促的考验,性能和稳定性得到了充分的严重。目前在业界被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binlog分发等场景强烈推荐

    kafka,如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。


五、RabbitMQ介绍

1、RabbitMQ基础概念

通常我们谈到消息队列服务, 会有三个概念: 发消息者、消息队列、收消息者。

RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。

RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和队列之间, 加入了交换器 (Exchange)。这样发消息者和消息队列就没有直接联系,转而变成发消息者把消息发给交换器,交换器根据调度策略再把消息转发给消息队列。

消息生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange的Channel,将消息发送给Exchange。Exchange根据路由规则,将消息转发给指定的消息队列。消息队列储存消息,等待消费者取出消息。消费者通过建立与消息队列相连的Channel,从消息队列中获取消息。

消息队列使用

1.Channel(信道):多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道。

2.Producer(消息的生产者):向消息队列发布消息的客户端应用程序。

3.Consumer(消息的消费者):从消息队列取得消息的客户端应用程序。

4.Message(消息):消息由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)等。

5.Routing Key(路由键):消息头的一个属性,用于标记消息的路由规则,决定了交换机的转发路径。最大长度255 字节。

6.Queue(消息队列):存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将消息取走。需要注意,当多个消费者订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,每一条消息只能被一个订阅者接收。

7.Exchange(交换器|路由器):提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。交换器用于转发消息,它不会存储消息 ,如果没有 Queue绑定到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。交换器有四种消息调度策略(下面会介绍),分别是fanout, direct, topic, headers。

8.Binding(绑定):用于建立Exchange和Queue之间的关联。一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表。

6.Binding Key(绑定键):Exchange与Queue的绑定关系,用于匹配Routing Key。最大长度255 字节。

7.Broker:RabbitMQ Server,服务器实体。

2、Exchange消息调度策略

调度策略是指Exchange在收到生产者发送的消息后依据什么规则把消息转发到一个或多个队列中保存。调度策略与三个因素相关:Exchange Type(Exchange的类型),Binding Key(Exchange和Queue的绑定关系),消息的标记信息(Routing Key和headers)。


Exchange根据消息的Routing Key和Exchange绑定Queue的Binding Key分配消息。生产者在将消息发送给Exchange的时候,一般会指定一个Routing Key,来指定这个消息的路由规则,而这个Routing Key需要与Exchange Type及Binding Key联合使用才能最终生效。


在Exchange Type与Binding Key固定的情况下(一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定Routing Key来决定消息流向哪里。


  1. Fanout (订阅模式|广播模式)


交换器会把所有发送到该交换器的消息路由到所有与该交换器绑定的消息队列中。订阅模式

与Binding Key和Routing Key无关,交换器将接受到的消息分发给有绑定关系的所有消息队列队列(不论Binding Key和Routing Key是什么)。类似于子网广播,子网内的每台主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。


2、Direct(路由模式)


精确匹配:当消息的Routing Key与 Exchange和Queue 之间的Binding Key完全匹配,如果匹配成功,将消息分发到该Queue。只有当Routing Key和Binding Key完全匹配的时候,消息队列才可以获取消息。Direct是Exchange的默认模式。

RabbitMQ默认提供了一个Exchange,名字是空字符串,类型是Direct,绑定到所有的Queue(每一个Queue和这个无名Exchange之间的Binding Key是Queue的名字)。所以,有时候我们感觉不需要交换器也可以发送和接收消息,但是实际上是使用了RabbitMQ默认提供的Exchange。


3、Topic (通配符模式)


按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的Binding Key进行模糊匹配,如果匹配成功,将消息分发到该Queue。

Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding Key中可以存在两种特殊字符“ ”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。


4、Headers(键值对模式)


Headers不依赖于Routing Key与Binding Key的匹配规则来转发消息,交换器的路由规则是通过消息头的Headers属性来进行匹配转发的,类似HTTP请求的Headers。

在绑定Queue与Exchange时指定一组键值对,键值对的Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或all,代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)。

当消息发送到Exchange时,交换器会取到该消息的headers,对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。Headers交换机的优势是匹配的规则不被限定为字符串(String),而是Object类型。


如下图所示,RabbitMQdirect模式通过RoutingKey来精准匹配,RoutingKey为red的投递到Queue1,RoutingKey为black和white的投递到Queue2。

消息队列使用

我们可以假设一个场景,我们要做一个日志模块来收集处理不同的日志,日志区分包含三个维度的标准:模块、日志紧急程度、日志重要程度。模块分为:red、black、white;紧急程度分为:critical、normal;把重要程度分为:medium、low、high在RoutingKey字段中我们把这三个维度通过两个“.“连接起来。

现在我们需要对black模块,紧急程度为critical,重要程度为high的日志分配到队列1打印到屏幕;对所以模块重要程度为high的日志和white紧急程度为critical的日志发送到队列2持久化到硬盘。如下示例

消息队列使用

RoutingKey为“black.critical.high”的日志会投递到queue1和queue2,。

RoutingKey为“red.critical.high”的日志会只投递到queue2。

RoutingKey为“white.critical.high”的日志会投递到queue2,并且虽然queue2的两个匹配规则都符合但只会向queue2投递一份。


3、RabbitMQ其他概念

1rpc

生产者根据消费者返回的值,针对不同的消息(有唯一标识 或 类型标识)执行相应的逻辑

2、消息应答

消费者拿到消息并处理完相应逻辑后,必须应答,否则队列中的消息会不断堆积

3、消息持久化

队列可以设置将消息持久化,防止服务器突然宕机,导致消息丢失,当然这样会损耗性能,占用磁盘空间

4、分发机制

①轮询分发:队列给每一个消费者发送数量一样的数据

②公平分发:消费者设置每次从队列中取一条数据,并且消费完后手动应答,继续从队列取下一个数据。

5、事务

对信道(Channel)设置,保证了生产者确实将消息推送到了服务器中,但背离了消息队列解耦的本质

6、Confirm机制

对信道(Channel)设置,保证了生产者确实将消息推送到了服务器中,是异步的,性能比事务好很多

7、Alternate Exchange(代替交换器)

是Rabbitmq自己扩展的功能,不是AMQP协议定义的,可以将没有正确分发的消息交由该 交换机 再次分发,再次分发失败,则消息丢失

8、TTL(生存时间)

9、Queue Length Limit(队列长度限制)

10、Dead Letter Exchange(死信交换器)

11、priority queue(优先级队列)

12、延迟队列

①延迟消费。比如:

用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。

用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

②延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。

通过设置TTL(生存时间) 和 Dead Letter Exchange(死信交换器),将消息重新分发。


六、RabbitMQ使用

1、添加maven依赖

<dependency>

   <groupId>org.springframework.boot</groupId>

  <artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

2、添加rabbitmq java config 配置类

消息队列使用

消息队列使用

消息队列使用

3、发送消息

消息队列使用

4、消费消息

消息队列使用

5、禁用spring boot 自动配置(约定大于配置)

消息队列使用

以上就是基于spring boot maven项目 集成rabbitmq 的所有配置。

剩下的就是交换机的灵活运用了。

上一篇:iFolder Novell开源文件服务器


下一篇:KVM 介绍(7):使用 libvirt 做 QEMU/KVM 快照和 Nova 实例的快照 (Nova Instances Snapshot Libvirt)