RabbitMQ

RabbitMQ

概念

RabbitMQ(message queue)从字面意思既可以看出属于一个消息队列,既然属于一个队列即拥有队列的特点先进先出,只不过在队列中存放的属于消息,其主要用于跨进程通信,用于在上下游之间进行通信,在常见的架构中RabbitMQ属于一种常见的逻辑解藕+物理解藕消息通信服务,在消息的上游只需要负责产生消息即可,由RabbitMQ进行存储转发消息,无需依赖其它服务。

MQ优点

流量削峰

在正常的订单系统中假设我们服务器正常情况最多一秒钟可以处理一万笔订单,正常情况下我们下单基本立马产生响应的结果,但是假如此时我们处于优惠活动中,此时一秒钟拥有两万笔订单,我们的服务器此时无法处理这么多的请求会面临宕机的危险,如果此时我们使用缓冲队列,把一秒钟的订单存入RabbitMQ中进行分散,分成一段时间来进行处理,此时用户虽然在十几秒之后才可以收到响应结果,但是我们成功的完成订单支付,服务器同时正常运行,保证了业务的可靠性。

RabbitMQ

应用解藕

在电商系统中,包含物流系统,库存系统,支付系统等,在耦合调用物流系统,库存系统,支付系统等,假如任意一个系统出现问题,都会导致用户无法下单进而导致下单失败,如果使用RabbitMQ消息队列下单的方式,当某个系统出现故障,需要通过几分钟的时间进行系统修复,此时物流请求被存放在RabbitMQ队列之中,用户可以完成下单操作,当系统恢复正常,从队列之中读取消息进一步进行处理即可,在整个下单流程之中,用户是无法感知系统出现故障,可以保证业务的可靠性。

RabbitMQ

异步调用

在某些服务调用中是进行异步处理,例如A调用B服务,但是B耗时较长,同时A又需要获取B的执行结果,此时A可以主动调用B的查询API进行主动查询,或者A提供一个回调API由B执行完毕之后调用回调API进行结果通知,但是上述两种方式增加了代码复杂度,如果使用RabbitMQ可以很好的解决上述执行结果问题,当B执行完毕的时候将执行结果发送给RabbitMQ进行存储,染红RabbitMQ将消息转发给A服务,此时A可以很轻松的获取B的执行结果。

RabbitMQ

核心组件

生产者

产生数据发送消息的程序属于生产者

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者

交换机

接收生产者产生的消息同时将消息转发到队列中,交换机必须准确的知如何处理其接收的消息,是推送到特定的队列还是推送到多个队列或者将数据丢弃,其有交换机的类型决定

队列

队列是RabbitMQ的一种数据结构,数据虽然由应用程序与RabbitMQ之间进行交互,但是数据的存储则是存储在RabbitMQ队列之中,队列受主机硬盘与内存大小限制,本质属于一个消息缓冲区,生产者可以将数据发送到队列,消费者可以从队列进行数据读取。

工作原理

图解

RabbitMQ

名词解释

Broker

接收消息和分发消息的应用,一个RabbitMQ服务就是Broker在Broker中可以包含多个交换机与队列。

Vhost

出于安全因素考虑把AMQP基本组件划分到虚拟分组中类似于网络中的名称空间,当多个用户连接同一个RabbitMQ服务时候,可以划分多个Vhost,每个Vhost拥有自己独立的交换机,队列等。

Connection

生产者与消费者与Broker之间的TCP连接

Channel

如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销。

Exchange

消息到达Broker的第一站,根据分发规则,匹配查找routing key转发到相对应的队列中

Queue

存放生产者产生的数据,等待消费者读取转发数据

Binding

exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

工作模式

hello world

简介

最简单的工作队列,其中一个消息生产者,一个消息处理者,一个工作队列,其中一个消费者对应一个生产者属于点对点模式,RabbitMQ就是消息代理,它接受并推动消息流动。你可以把它想象成一个邮局:当你把一封信塞进邮箱,你需要确保它能送到收信人的手里。而RabbitMQ就是一个邮箱,邮局,邮递员。不同于真实的邮局(处理信件),RabbitMQ处理接受、存储、推动消息。

此处我们使用Python编写生产者以及消费者

RabbitMQ

抽象公共代码

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

# @File    :base.py
# @Author  :SR
# @Date    :2021/8/20 下午10:44
import pika


class Base:
    # 抽象公共代码 连接mq 创建信道

    def __init__(self, username=‘guest‘, password=‘guest‘, host=‘localhost‘, port=5672, virtual_host=‘/‘, ):
        ‘‘‘

        :param username: rabbitmq连接用户
        :param password: rabbitmq连接密码
        :param host: rabbitmq连接地址
        :param port: rabbitmq连接端口
        :param virtual_host: 虚拟用户
        ‘‘‘
        self.username = username
        self.password = password
        self.host = host
        self.port = port
        self.virtual_host = virtual_host

    def connection(self):
        credential = pika.PlainCredentials(self.username, self.password)

        parameters = pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtual_host,
                                               credentials=credential)

        connection = pika.BlockingConnection(parameters=parameters)

        return connection

    def channel(self):
        connection = self.connection()

        channel = connection.channel()
        
        return channel


生产者

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

# @File    :produce1.py
# @Author  :SR
# @Date    :2021/8/18 下午5:30

import pika

from .base import Base


class Produce(Base):

    def producer(self):
        # 生成一个通信信道
        channel = self.channel()

        # 声明队列 并且设置队列持久化 防止rabbitmq重启导致队列丢失
        channel.queue_declare("hello", durable=True)
        
        # 获取mq链接
        connection = self.connection()


        # 创建消息
        for i in range(10):
            message = "hello world_%s" % i

            # 产生数据发送给mq
            ‘‘‘
            exchange:指定交换机为空
            routing_key:指定队列 需要与上述生成的队列一致
            body:需要发送的数据
            properties:设置消息持久化 因为消息存放在内存中 当mq重启时候数据会丢失 delivery_mode = 2表示消息持久化
            ‘‘‘
            
            properties=pika.BasicProperties(delivery_mode=2)
            
            channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=message.encode(),properties=properties)
         
         # 关闭链接
		 connection.close()


if __name__ == ‘__main__‘:
    produce = Produce()

    produce.producer()

消费者

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

# @File    :consume1.py
# @Author  :SR
# @Date    :2021/8/18 下午5:30

from .base import Base


class Consumer(Base):

    def consumer(self):
        channel = self.channel()

        # 声明队列 并且设置队列持久化 防止rabbitmq重启导致队列丢失
        channel.queue_declare(‘hello‘, durable=True)
        
        # 使用auto_ack = False 当消息处理失败的时候会将消息返回给mq队列 防止消息丢失
        channel.basic_consume(‘hello‘, self.callback, auto_ack=False)

        channel.start_consuming()

    def callback(self, ch, method, properties, body):
        print(body.decode())


if __name__ == ‘__main__‘:
    consume = Consumer()

    consume.consumer()

RabbitMQ

work queue

简介

在简单队列模式之中一个生产者对应一个消费者进行消息处理,如果消费者处理比较耗时的任务,需要大量的时间来进行处理完毕,显然此种应用效率低下,无法满足高并发需求。

在工作队列模式之中,如果处于资源密集型任务,有了队列我们将任务安排在队列之后执行,我们将任务封装成消息发送到队列之中,一个工作进程就可以从队列之中读取消息进行处理,如果启动了多个进程,多个进程轮询从队列之中读取消息进行处理。

RabbitMQ

生产者

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

# @File    :Produce.py
# @Author  :SR
# @Date    :2021/8/20 下午11:43
import pika

from .base import Base


class Produce(Base):

    def producer(self):
        # 生成一个通信信道
        channel = self.channel()
        
        connection = self.connection()


        # 声明队列 并且设置队列持久化 防止rabbitmq重启导致队列丢失
        channel.queue_declare("hello", durable=True)
        
        properties = pika.BasicProperties(delivery_mode=2)

        for i in range(1, 11):
            message = "hello_world_%s" % i

            channel.basic_publish(‘‘, routing_key=‘hello‘, body=message, properties=properties)
            
        connection.close()  
                                                                                                         
if __name__ == ‘__main__‘:
    produce = Produce()

    produce.producer()

消费者

消费者开启两个worker用来进行轮询测试

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

# @File    :Consumer.py
# @Author  :SR
# @Date    :2021/8/20 下午11:43
import time

from .base import Base


class Consumer(Base):

    def consumer(self):
        channel = self.channel()

        # 声明队列 并且设置队列持久化 防止rabbitmq重启导致队列丢失
        channel.queue_declare(‘hello‘, durable=True)

        # 只有工作完成之后才会继续接受任务
        channel.basic_qos(prefetch_count=1)

        channel.basic_consume("hello", on_message_callback=self.callback)

        channel.start_consuming()

    def callback(self, ch, method, properties, body):

        print("当前消息为:" + body.decode())
		

if __name__ == ‘__main__‘:
    consume = Consumer()

    print("worker2开始等待接收消息。。。。。")

    consume.consumer()

RabbitMQ

消息应答

概念

消费者处理生产者产生的消息需要一定的时间,当消费者在处理生产者产生的消息时候只完成部分功能的时候由于某些原因导致消费者突然挂掉导致消息没有被完全处理成功,当RabbitMQ消息被处理的时候会立即将消息从队列中删除,但是此时我们的消息没有被完全处理同时又被删除,导致消息丢失处理失败。

为了解决上述消息丢失的问题,在RabbitMQ中引入了消息应答概念,当消费者成功的处理生产者产生的消息请求的时候,会给RabbitMQ产生一个应答消息,告知MQ此时消费者的消息已经成功被处理,可以将消息从队列中删除。

消息应答分类

自动应答

在自动应答方面当RabbitMQ将消息发送完毕之后会立即认为消息成功处理将该消息从队列之中删除,这种模式需要在大量数据与安全性之间做权衡,当有大量数据被发送的时候,此时消息还未发送给消费者由于某种原因导致信道断开此时消息无法传达给消费者但是MQ已经将消息给删除了一样会导致消息丢失,另一种情况当消息量过大且发送给消费者的时候,但是消费者的数量有限处理能力有限,导致大量的消息在内从之中堆积导致内存耗尽,最终导致消费者被操作系统杀死,此时生产者产生的消息依旧没有被处理成功,但是该消息已经被删除了导致消息丢失。

手动应答

不同于上述自动应答消息发送之后立马被删除的概念,在手动应答方面当我们成功的处理了消费者的消息,手动给RabbitMQ发送一个应答消息,告知RabbitMQ该消息已经成功处理,可以将该消息从队列之中删除。

在手动应答时候又分为批量应答以及非批量应答,当选择批量应答的时候会将信道之内的所有消息都自动应答,显然此种方式不安全,因为不能确保信道内的所有消息都成功被处理,在非批量应答的时候只会应答当前被处理的消息。

RabbitMQ

消息重入队

当消费者成功接收消息并且进行处理的时候,由于某些原因(通道已关闭,TCP连接丢失),导致消费者未能发送ACK确认消息给RabbitMQ,此时RabbitMQ会将消息重新加入队列,如果此时有其余的消费者可以进行消费,则RabbitMQ会将该消息转发给其余的消费者,确保我们即使偶尔某个消费者挂掉消息也不会丢失。

如图所示,c1/c2两个消费者分别处理消息1与消息2,此时由于某些原因导致c1与队列连接断开未发送ACK消息,此时消息1重新加入了队列,但是由于c2是正常工作的,因此c2将可以处理消息1,这样保证消息1既不会丢失同时也被处理。

RabbitMQ

生产者

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

# @File    :Produce.py
# @Author  :SR
# @Date    :2021/8/20 下午11:43
import pika

from mq_one.base import Base


class Produce(Base):
    def producer(self):
        connection = self.connection()

        channel = self.channel()

        channel.queue_declare("hello", durable=True)

        properties = pika.BasicProperties(delivery_mode=2)

        for i in range(1, 11):
            message = "hello_world_%s" % i

            channel.basic_publish(‘‘, routing_key=‘hello‘, body=message, properties=properties)

        connection.close()


if __name__ == ‘__main__‘:
    produce = Produce()

    produce.producer()

RabbitMQ

消费者

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

# @File    :Consumer.py
# @Author  :SR
# @Date    :2021/8/20 下午11:43
import time

from mq_one.base import Base


class Consumer(Base):
    def consumer(self):
        channel = self.channel()

        channel.queue_declare(‘hello‘, durable=True)

        # 只有工作完成之后才会继续接受任务
        channel.basic_qos(prefetch_count=1)

        channel.basic_consume(‘‘, on_message_callback=self.callback, auto_ack=False)

        channel.start_consuming()

    def callback(self, channel, method, properties, body):
        time.sleep(5)

        print("当前消息为:%s" % body.decode())

        # 手动应答
        channel.basic_ack(delivery_tag=method.delivery_tag)


if __name__ == ‘__main__‘:
    consume = Consumer()

    print("worker2开始等待接收消息。。。。。")

    consume.consumer()

当worker1运行一段时间主动关闭该程序

RabbitMQ

查看worker2是否能接受worker1中消息5并且处理

RabbitMQ

RabbitMQ

上一篇:BZOJ 1984: 月下“毛景树” [树链剖分 边权]


下一篇:PIL.Image.save() 保存图片压缩问题