1 概念
RabbitMQ是个消息代理人message broker。它接受,存储,转发消息。x消息队列最早产生在金融领域,是为解决金融业务的IT系统中产生的一些问题而应运而生的。随着互联网和电子商务的发展,消息队列在不同行业、不同场景下得到了广泛运用。消息队列主要能解决三个问题:
1.1 异步解藕
如果在分布式系统中,不同应用直接相互调用,势必会引起发起调用的应用阻塞等待,被调用应用处理完成。这样浪费时间。故而需要引用一个中间人,就像一个邮筒,来收发信件,而不必发件人和收件人直接交换信息。消息队列就这样应运而生了,消息队列利用发布-订阅模式工作,消息发送者发布消息,一个或者多个消息接受者订阅消息。消息发送者是消息源,在对消息进行处理后将消息发送至分布式消息队列,消息接受者从非分布式消息队列获取该信息后继续进行处理。可以看到,消息发送者和消息接受者之间没有直接的耦合关系。
1.2 流量削峰填谷
无论在互联网还是传统行业中,IT系统的访问量、业务请求与时间的关系都不是均匀分布的。比如超市促销时,客流激增,商家会组织顾客排队购买、排队支付。其实消息队列,也可以看做这种排队的思想的引入。通过将大量请求放入消息队列,进行排队,让消费者挨个处理请求,而不超出消费者程序的承载上限,较好地削平了流量高峰,在请求量低的时候又把之前积压的请求继续处理。它就像一根可以稳定压力的、让水流均匀流动的水管,从而保证整个系统中的运转效率保持均匀稳定。
1.3 RPC(Remote Procedure Call)
RPC即所谓远程过程调用,前面将消息队列来实现异步解藕的时候已经说明,它可以做不同应用程序相互调用的“中间人”。但作RPC时就还必须考虑到,消费方接收到消息,并且处理完该消息包含的请求后,还必须有个应答的过程。所以,消息生产者(请求方)在消息头里必须指定这条消息的编号(correlation_id)和消费者(接受请求方)应答消息应该写入那个队列(如图,reply_to信息),消息消费者在处理完之后,它又会转换成一个消息生产者的角色,向消息队列指定的队列中写入一条带correlation_id的营地消息。当RPC请求方接收到这条消息时,就这知道编号correlation_id消息包含的调用已完成。
1.4 基本元素
- Producer 生产者,生产发送消息。
- Consumer 消费者,接受消息。
- broker 消息中间件,将消息由生产者传递给消费者的中介软件,就是我们常说的消息中间件(MQ)。包括知名的RabbitMQ、Kafka、Redis(NOSQL,也用作消息中间件)、RacketMQ、ZeroMQ。
- Virtual host 虚拟机,考虑到多租户情景下的安全因素,对broker进行虚拟的资源划分,类似于VLAN。同一个消息中间件,可以划分出多个vhost,供不同用户在其中创建使用exchange和Queue,以保持数据的隔离性和安全性。
- Channel 管道,应用程序对broker的访问,必须先建立起连接,但是应用程序需要发送/接受大量消息,如果每次发生/接受消息的操作都去建立一次连接,那么对部署broker的机器资源的开销是巨大的。所以必须做到多次访问对一次连接的复用,这就需要应用程序用多线程的方式访问broker。同时还得对每一次消息收发,能保证数据的隔离性,那就必须由broker对消息传递的渠道做一定的分隔。channel就是为了实现这个目的而产生的,相当于在broker内部对一个connection做了切分,实现了多个轻量级的connection,以极大地节约系统开销。
- Exchange 路由器,负责分发消息的组件,相当于网路中的路由器。message进入broker首先由Exchange处理,根据分发规则,查询表中routing key,来讲消息分发到对应的Queue中去。常用的分发规则有direct(point to point),topic(publish-subscrible),fanout(mutilcast)。
- Queue 队列,只受系统内存和硬盘大小限制。存储消息,生产者往队列里面发送,消费者监听读取。一个message可以拷贝到多个Queue上去。
- Binding 路由规则,exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
- Message 消息,分为消息头和消息体,消息头主要记录供broker处理分发的信息,消息体才是真正被消费者需要的信息。
2 生产数据
#!/usr/bin/env python
import pika
auth = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672))
channel = connection.channel()
channel.queue_declare(queue='TEST01')
channel.basic_publish(exchange='',
routing_key='TEST01',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
Note:最近项目还用不到AMQP,这部分写的比较简陋,只是做一些简单了解。
参考1:消息队列RabbitMQ原理及其Python客户端pika的使用
参考2:新建远程连接
参考3:RabbitMQ 入门系列(9)— Python 的 pika 库常用函数及参数说明
参考4:python使用pika操作rabbitmq总结(一)
参考5:RabbitMQ使用教程(超详细)
参考6:RabbitMQ CLI 管理工具 rabbitmqadmin(管理和监控)
参考7:RabbitMQ中 exchange、route、queue的关系