Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象,是一个把消息传递封装成统一接口的库。其特点是支持多种的符合APMQ协议的消息队列系统。通过本系列,大家可以了解 Kombu 是如何实现 AMQP。本文先介绍相关概念和整体逻辑架构。
[源码解析] 消息队列 Kombu 之 基本架构
目录
- [源码解析] 消息队列 Kombu 之 基本架构
0x00 摘要
从本文开始,我们通过一个系列来介绍消息队列 Kombu(为后续Celery分析打基础)。
Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象,是一个把消息传递封装成统一接口的库。其特点是支持多种的符合APMQ协议的消息队列系统。不仅支持原生的AMQP消息队列如RabbitMQ、Qpid,还支持虚拟的消息队列如redis、mongodb、beantalk、couchdb、in-memory等。
通过本系列,大家可以了解 Kombu 是如何实现 AMQP。本文先介绍相关概念和整体逻辑架构。
0x01 AMQP
介绍 AMQP 是因为 Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。
1.1 基本概念
AMQP的基本概念如下:
- 生产者和消费者:生产者创建消息,然后发布到代理服务器的队列中,代理服务器会把消息发送给感兴趣的接受方。消费者连接到代理服务器,并订阅到队列上,从而接收消息。
- 通道 channel:信道是 “真实的” TCP连接内的虚拟连接,AMQP的命令都是通过通道发送的。在一条TCP连接上可以创建多条信道。
- 有些应用需要与 AMQP 代理建立多个连接。同时开启多个 TCP 连接不合适,因为会消耗掉过多的系统资源并且使得防火墙的配置更加困难。AMQP 0-9-1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接。
- 在涉及多线程 / 进程的应用中,为每个线程 / 进程开启一个通道(channel)是很常见的,并且这些通道不能被线程 / 进程共享。
- 一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。
- 队列:存放消息的地方,队列通过路由键绑定到交换机,生产者通过交换机将消息发送到队列中。我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。
- Exchange 和 绑定:生产者发布消息时,先将消息发送到Exchange,通过Exchange与队列的绑定规则将消息发送到队列。
- 交换机是用来发送消息的 AMQP 实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的。
- 交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
- 常见的Exchange有topic、fanout、direct:
- direct Exchange:direct交换机是包含空白字符串的默认交换机,当声明队列时会主动绑定到默认交换机,并且以队列名称为路由键;
- fanout Exchange:这种交换机会将收到的消息广播到绑定的队列;
- topic Exchange:topic交换机可以通过路由键的正则表达式将消息发送到多个队列;
1.2 工作过程
工作过程是:
- 发布者(Publisher)发布消息(Message),经由交换机(Exchange)。消息从来不直接发送给队列,甚至 Producers 都可能不知道队列的存在。 消息是发送给交换机,给交换机发送消息时,需要指定消息的 routing_key 属性;
- 交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。交换机收到消息后,根据 交换机的类型,或直接发送给队列 (fanout), 或匹配消息的 routing_key 和 队列与交换机之间的 banding_key。 如果匹配,则递交消息给队列;
- 最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。Consumers 从队列取得消息;
基本如下图:
+----------------------------------------------+ | AMQP Entity | | | | | | | +-----------+ | +------------+ binding +---------+ | +------------+ | | | | | | | | | | | Publisher | +------> | Exchange | +---------> | Queue | +--------> | Consumer | | | | | | | | | | | +-----------+ | +------------+ +---------+ | +------------+ | | | | +----------------------------------------------+
0x02 Poll系列模型
Kombu 利用了 Poll 模型,所以我们有必要介绍下。这就是IO多路复用。
IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用比如当客户处理多个描述字时(一般是交互式输入和网络套接口)。
与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。
2.1 select
select 通过一个select()系统调用来监视多个文件描述符的数组(在linux中一切事物皆文件,块设备,socket连接等)。
当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位(变成ready),使得进程可以获得这些文件描述符从而进行后续的读写操作(select会不断监视网络接口的某个目录下有多少文件描述符变成ready状态【在网络接口中,过来一个连接就会建立一个'文件'】,变成ready状态后,select就可以操作这个文件描述符了)。
2.2 poll
poll 和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll() 的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
2.3 epoll
epoll由内核直接支持,可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表 就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了 这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描 述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调 机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
2.4 通俗理解
2.4.1 阻塞I/O模式
阻塞I/O模式下,内核对于I/O事件的处理是阻塞或者唤醒,一个线程只能处理一个流的I/O事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create),很不幸这两种方法效率都不高。
2.4.2 非阻塞模式
非阻塞忙轮询的I/O方式可以同时处理多个流。我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费CPU。
2.4.2.1 代理模式
非阻塞模式下可以把I/O事件交给其他对象(select以及epoll)处理甚至直接忽略。
为了避免CPU空转,可以引进一个代理(一开始有一位叫做select的代理,后来又有一位叫做poll的代理,不过两者的本质是一样的)。这个代理比较厉害,可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流(于是我们可以把“忙”字去掉了)。代码长这样:
while true { select(streams[]) for i in streams[] { if i has data read until unavailable } }
于是,如果没有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道了,有I/O事件发生了,但却并不知道是那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。
2.4.2.2 epoll
epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll只会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的(复杂度降低到了O(1))。
epoll版服务器实现原理类似于select版服务器,都是通过某种方式对套接字进行检验其是否能收发数据等。但是epoll版的效率要更高,同时没有上限。
在select、poll中的检验,是一种被动的轮询检验,而epoll中的检验是一种主动地事件通知检测,即:当有套接字符合检验的要求,便会主动通知,从而进行操作。这样的机制自然效率会高一点。
同时在epoll中要用到文件描述符,所谓文件描述符实质上是数字。
epoll的主要用处在于:
epoll_list = epoll.epoll()
如果进程在处理while循环中的代码时,一些套接字对应的客户端如果发来了数据,那么操作系统底层会自动的把这些套接字对应的文件描述符写入该列表中,当进程再次执行到epoll时,就会得到了这个列表,此时这个列表中的信息就表示着哪些套接字可以进行收发了。因为epoll没有去依次的查看,而是直接拿走已经可以收发的fd,所以效率高!
0x03 Kombu 基本概念
Kombu的最初的实现叫做carrot,后来经过重构才成了Kombu。
3.1 用途
Kombu 主要用途如下:
- Celery是Python中最流行的异步消息队列框架,支持RabbitMQ、Redis、ZoopKeeper等作为Broker,而对这些消息队列的抽象,都是通过Kombu实现的:
- Celery一开始先支持的RabbitMQ,也就是使用 AMQP 协议。由于要支持越来越多的消息代理,但是这些消息代理是不支持 AMQP 协议的,需要一个东西把所有的消息代理的处理方式统一起来,甚至可以理解为把它们「伪装成支持AMQ协议」。
- Kombu实现了对AMQP transport和non-AMQP transports(Redis、Amazon SQS、ZoopKeeper等)的兼容。
- OpenStack 默认 是使用kombu连接rabbitmq服务器。OpenStack使用kombu作为消息队列使用的client库而没有用广泛使用的pika库有两个原因:
- kombu除了支持纯AMQP的实现还支持虚拟AMQP的实现作为消息队列系统,如redis、mongodb、beantalk等。
- kombu可以通过配置设置AMQP连接的底层库,比如librabbitmq或者pyamqp。前者是一个python嫁接C库的实现,后者是一个纯python的实现。如果用纯python实现的AMQP库,就可以应用eventlet的框架将设计网络IO的部分变为协程,提高整体的网络IO性能。如openstack内部使用的就是eventlet的框架。
3.2 术语
在 Kombu 中,存在多个概念(部分和AMQP类似),他们分别是:
Message:消息,发送和消费的主体,生产消费的基本单位,其实就是我们所谓的一条条消息;
Connection:对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接;Connection 是 AMQP 对 连接的封装;
Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接;Channel 是 AMQP 对 MQ 的操作的封装;
-
Transport:kombu 支持将不同的消息中间件以插件的方式进行灵活配置,使用transport这个术语来表示一个具体的消息中间件,可以认为是对broker的抽象:
- 对 MQ 的操作必然离不开连接,但是,Kombu 并不直接让 Channel 使用 Connection 来发送/接受请求,而是引入了一个新的抽象 Transport,Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行。引入transport这个抽象概念可以使得后续添加对non-AMQP的transport非常简单;
- Transport是真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例;
- 当前Kombu中build-in支持有Redis、Beanstalk、Amazon SQS、CouchDB,、MongoDB,、ZeroMQ,、ZooKeeper、SoftLayer MQ和Pyro;
Producers: 发送消息的抽象类;
Consumers:接受消息的抽象类。consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息;
-
Exchange:MQ 路由,这个和 RabbitMQ 差不多,支持 5 类型。消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。用于路由消息(消息发给exchange,exchange发给对应的queue)。路由就是比较routing-key(这个message提供)和binding-key(这个queue注册到exchange的时候提供)。使用时,需要指定exchange的名称和类型(direct,topic和fanout)。
交换机通过匹配消息的 routing_key 和 binding_key来转发消息,binding_key 是consumer 声明队列时与交换机的绑定关系。
Queue:对应的 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息。
-
Routing keys: 每个消息在发送时都会声明一个routing_key。routing_key的含义依赖于exchange的类型。一般说来,在AMQP标准里定义了四种默认的exchange类型,此外,vendor还可以自定义exchange的类型。最常用的三类exchange为:
- Direct exchange: 如果message的routing_key和某个consumer中的routing_key相同,就会把消息发送给这个consumer监听的queue中。
- Fan-out exchange: 广播模式。exchange将收到的message发送到所有与之绑定的queue中。
- Topic exchange: 该类型exchange会将message发送到与之routing_key类型相匹配的queue中。routing_key由一系列“.”隔开的word组成,“*”代表匹配任何word,“#”代表匹配0个或多个word,类似于正则表达式。
0x04 概念具体说明
4.1 概述
以 redis 为 broker,我们简要说明:
- 发送消息的对象,称为生产者Producer。
- connections建立 redis 连接,channel 是一次连接会话。
- Exchange 负责交换消息,消息通过channel发送到Exchange,由于Exchange绑定Queue和routing_key。消息会被转发到 redis 中匹配routing_key的Queue中。
- 在Queue另一侧的消费者Consumer 一直对Queue进行监听,一旦Queue中存在数据,则调用callback方法处理消息。
4.2 Connection
Connection是对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接。现在就是对 'redis://localhost:6379'
连接进行抽象。
conn = Connection('redis://localhost:6379')
由之前论述可知,Connection是到broker的连接。从具体代码可以看出,Connection更接近是一个逻辑概念,具体功能都委托给别人完成。
Connection主要成员变量是:
- _connection:kombu.transport.redis.Transport 类型,就是真正用来负责具体的 MQ 的操作,也就是说对 Channel 的操作都会落到 Transport 上执行。
- _transport:就是上面提到的对 broker 的抽象。
- cycle:与broker交互的调度策略。
- failover_strategy:在连接失效时,选取其他hosts的策略。
- heartbeat:用来实施心跳。
精简版定义如下:
class Connection: """A connection to the broker""" port = None _connection = None _default_channel = None _transport = None #: Iterator returning the next broker URL to try in the event #: of connection failure (initialized by :attr:`failover_strategy`). cycle = None #: Additional transport specific options, #: passed on to the transport instance. transport_options = None #: Strategy used to select new hosts when reconnecting after connection #: failure. One of "round-robin", "shuffle" or any custom iterator #: constantly yielding new URLs to try. failover_strategy = 'round-robin' #: Heartbeat value, currently only supported by the py-amqp transport. heartbeat = None failover_strategies = failover_strategies
4.3 Channel
Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接。就是真正的连接。
- Connection 是 AMQP 对 连接的封装;
- Channel 是 AMQP 对 MQ 的操作的封装;
Channel 可以认为是 redis 操作和连接的封装。每个 Channel 都可以与 redis 建立一个连接,在此连接之上对 redis 进行操作,每个连接都有一个 socket,每个 socket 都有一个 file,从这个 file 可以进行 poll。
4.3.1 定义
简化版定义如下:
class Channel(virtual.Channel): """Redis Channel.""" QoS = QoS _client = None _subclient = None keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX) keyprefix_fanout = '/{db}.' sep = '\x06\x16' _fanout_queues = {} unacked_key = '{p}unacked'.format(p=KEY_PREFIX) unacked_index_key = '{p}unacked_index'.format(p=KEY_PREFIX) unacked_mutex_key = '{p}unacked_mutex'.format(p=KEY_PREFIX) unacked_mutex_expire = 300 # 5 minutes unacked_restore_limit = None visibility_timeout = 3600 # 1 hour max_connections = 10 queue_order_strategy = 'round_robin' _async_pool = None _pool = None from_transport_options = ( virtual.Channel.from_transport_options + ('sep', 'ack_emulation', 'unacked_key', ...... 'max_connections', 'health_check_interval', 'retry_on_timeout', 'priority_steps') # <-- do not add comma here! ) connection_class = redis.Connection if redis else None self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}
4.3.2 redis消息回调函数
关于上面成员变量,这里需要说明的是
handlers = {dict: 2} { 'BRPOP':<bound method Channel._brpop_read of >, 'LISTEN':<bound method Channel._receive of > }
这是redis有消息时的回调函数,即:
- BPROP 有消息时候,调用 Channel._brpop_read;
- LISTEN 有消息时候,调用 Channel._receive;
大约如下:
+---------------------------------------------------------------------------------------------------------------------------------------+ | +--------------+ 6 parse_response | | +--> | Linux Kernel | +---+ | | | +--------------+ | | | | | | | | | event | | | 1 | | | | | 2 | | | | | +-------+---+ socket + | | | redis | port +--> fd +--->+ v | | | | +------+--------+ | | | socket | | Hub | | | | port +--> fd +--->----------> | | | | port=6379 | | | | | | | socket | | readers +-----> Transport.on_readable | | | port +--> fd +--->+ | | + | +-----------+ +---------------+ | | | | 3 | | +----------------------------------------------------------------------------------------+ | | v | _receive_callback | 5 +-------------+ +-----------+ +------------+------+ +-------------------------+ 'BRPOP' = Channel._brpop_read +-----> | Channel | +------------------> | Consumer | | Transport | | MultiChannelPoller | +------> channel . handlers 'LISTEN' = Channel._receive +-------------+ +---+-------+ | | | | | 8 | | | on_readable(fileno) | | | ^ | | cycle +---------------------> | _fd_to_chan +----------------> channel . handlers 'BRPOP' = Channel._brpop_read | | | | 4 | | | 'LISTEN' = Channel._receive | | | _callbacks[queue]| | | | | on_m | 9 | + | +-------------------------+ +------> channel . handlers 'BRPOP' = Channel._brpop_read | | +-------------------+ 'LISTEN' = Channel._receive | | | | v | 7 _callback | +-----------------------------------------------------------------------------------------------------------------------------------------+ User Function
手机如图:
4.4 Transport
Transport:真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例。就是存储和发送消息的实体,用来区分底层消息队列是用amqp、Redis还是其它实现的。
我们顺着上文理一下:
- Connection 是 AMQP 对 连接的封装;
- Channel 是 AMQP 对 MQ 的操作的封装;
- 那么两者的关系就是对 MQ 的操作必然离不开连接,但是 Kombu 并不直接让 Channel 使用 Connection 来发送/接受请求,而是引入了一个新的抽象 Transport,Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行;
在Kombu 体系中,用 transport 对所有的 broker 进行了抽象,为不同的 broker 提供了一致的解决方案。通过Kombu,开发者可以根据实际需求灵活的选择或更换broker。
Transport负责具体操作,但是 很多操作移交给 loop 与 MultiChannelPoller 进行。
其主要成员变量为:
- 本transport的驱动类型,名字;
- 对应的 Channel;
- cycle:MultiChannelPoller,具体下文会提到;
其中重点是MultiChannelPoller。一个Connection有一个Transport, 一个Transport有一个MultiChannelPoller,对poll操作都是由MultiChannelPoller完成,redis操作由channel完成。
定义如下:
class Transport(virtual.Transport): """Redis Transport.""" Channel = Channel polling_interval = None # disable sleep between unsuccessful polls. default_port = DEFAULT_PORT driver_type = 'redis' driver_name = 'redis' implements = virtual.Transport.implements.extend( asynchronous=True, exchange_type=frozenset(['direct', 'topic', 'fanout']) ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # All channels share the same poller. self.cycle = MultiChannelPoller()
4.5 MultiChannelPoller
MultiChannelPoller 定义如下,可以理解为 执行 engine,主要作用是:
- 收集 channel;
- 建立 socks fd 到 channel 的映射;
- 建立 channel 到 socks fd 的映射;
- 使用 poll;
或者从逻辑上这么理解,MultiChannelPoller 就是:
- 把 Channel 对应的 socket 同 poll 联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作;
- 把 poll 对应的 fd 添加到 MultiChannelPoller 这里,这样 MultiChannelPoller 就可以 打通
Channel ---> socket ---> poll ---> fd ---> 读取 redis
这条通路了,就是如果 redis 有数据来了,MultiChannelPoller 就马上通过 poll 得到通知,就去 redis 读取;
具体定义如下:
class MultiChannelPoller: """Async I/O poller for Redis transport.""" eventflags = READ | ERR def __init__(self): # active channels self._channels = set() # file descriptor -> channel map. self._fd_to_chan = {} # channel -> socket map self._chan_to_sock = {} # poll implementation (epoll/kqueue/select) self.poller = poll() # one-shot callbacks called after reading from socket. self.after_read = set()
4.6 Consumer
Consumer 是消息接收者。Consumer & 相关组件 的作用主要如下:
- Exchange:MQ 路由,消息发送者将消息发至 Exchange,Exchange 负责将消息分发至队列。
- Queue:对应的队列抽象,存储着即将被应用消费掉的消息,Exchange 负责将消息分发 Queue,消费者从Queue 接收消息;
- Consumers 是接受消息的抽象类,consumer 需要声明一个 queue,并将 queue 与指定的 exchange 绑定,然后从 queue 里面接收消息。就是说,从用户角度,知道了一个 exchange 就可以从中读取消息,而具体这个消息就是从 queue 中读取的。
在具体 Consumer 的实现中,它把 queue 与 channel 联系起来。queue 里面有一个 channel,用来访问redis,也有 Exchange,知道访问具体 redis 哪个key(就是queue对应的那个key)。
Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。
所以服务端的逻辑大致为:
- 建立连接;
- 创建Exchange ;
- 创建Queue,并将Exchange与Queue绑定,Queue的名称为routing_key ;
- 创建Consumer对Queue监听;
Consumer 定义如下:
class Consumer: """Message consumer. Arguments: channel (kombu.Connection, ChannelT): see :attr:`channel`. queues (Sequence[kombu.Queue]): see :attr:`queues`. no_ack (bool): see :attr:`no_ack`. auto_declare (bool): see :attr:`auto_declare` callbacks (Sequence[Callable]): see :attr:`callbacks`. on_message (Callable): See :attr:`on_message` on_decode_error (Callable): see :attr:`on_decode_error`. prefetch_count (int): see :attr:`prefetch_count`. """ ContentDisallowed = ContentDisallowed #: The connection/channel to use for this consumer. channel = None #: A single :class:`~kombu.Queue`, or a list of queues to #: consume from. queues = None #: Flag for automatic message acknowledgment. no_ack = None #: By default all entities will be declared at instantiation, if you #: want to handle this manually you can set this to :const:`False`. auto_declare = True #: List of callbacks called in order when a message is received. callbacks = None #: Optional function called whenever a message is received. on_message = None #: Callback called when a message can't be decoded. on_decode_error = None #: List of accepted content-types. accept = None #: Initial prefetch count prefetch_count = None #: Mapping of queues we consume from. _queues = None _tags = count(1) # global
此时总体逻辑如下图:
+----------------------+ +-------------------+ | Consumer | | Channel | | | | | +-----------------------------------------------------------+ | | | client +-------------> | Redis<ConnectionPool<Connection| | channel +--------------------> | | +-----------------------------------------------------------+ | | | pool | | | +---------> | | | connection +---------------+ | | | | | | | | | | +----------------------+ | | +-------------------+ | | | | | v | | | | +-------------------+ +---+-----------------+ +--------------------+ | | | | | Connection | | redis.Transport | | MultiChannelPoller | | | | | | | | | | | | | | | | | | | | _channels +--------+ | | | | | | cycle +------------> | _fd_to_chan | | | | | transport +---------> | | | _chan_to_sock | | +-------->+ | | | | | +------+ poller | | | | +-------------------+ +---------------------+ | | after_read | | | | | | | | | | | +--------------------+ | | | +------------------+ +---------------+ | | | | Hub | | | | | | | v | | | | | +------+------+ | | | | poller +---------------> | _poll | | | | | | | | +-------+ | | | | | | _poller+---------> | poll | v | | +------------------+ | | +-------+ | | +-------------+ +-------------------+ | +----------------+ | Queue | | | | Exchange | | _chann+l | +----+ | | | | | | | exchange +----------------> | channel | | | | | | | | | +-------------------+ +----------------+
手机如下:
现在我们知道:
4.7 Producer
Producer 是消息发送者。Producer中,主要变量是:
- _channel :就是channel;
- exchange :exchange;
class Producer: """Message Producer. Arguments: channel (kombu.Connection, ChannelT): Connection or channel. exchange (kombu.entity.Exchange, str): Optional default exchange. routing_key (str): Optional default routing key. """ #: Default exchange exchange = None #: Default routing key. routing_key = '' #: Default serializer to use. Default is JSON. serializer = None #: Default compression method. Disabled by default. compression = None #: By default, if a defualt exchange is set, #: that exchange will be declare when publishing a message. auto_declare = True #: Basic return callback. on_return = None #: Set if channel argument was a Connection instance (using #: default_channel). __connection__ = None
逻辑如图:
+----------------------+ +-------------------+ | Producer | | Channel | | | | | +-----------------------------------------------------------+ | | | client +-------------> | Redis<ConnectionPool<Connection| | channel +------------------> | | +-----------------------------------------------------------+ | | | pool | | exchange | +---------> | | | connection +---------------+ | | + | | | | | | | +--+-------------------+ | | +-------------------+ | | | | | | v | | | | | +-------------------+ +---+-----------------+ +--------------------+ | | | | | | Connection | | redis.Transport | | MultiChannelPoller | | | +----------------------> | | | | | | | | | | | | | | | _channels +--------+ | | | | | | cycle +------------> | _fd_to_chan | | | | | transport +---------> | | | _chan_to_sock | | +-------->+ | | | | | +------+ poller | | | | +-------------------+ +---------------------+ | | after_read | | | | | | | | | | | +--------------------+ | | | +------------------+ +---------------+ | | | | Hub | | | | | | | v | | | | | +------+------+ | | | | poller +---------------> | _poll | | publish | | | | | | +-------+ +--------------------------------+ | | | _poller+---------> | poll | | | | +------------------+ | | +-------+ | | | +-------------+ +-------------------+ | +-----> +----------------+ | Queue | | | | Exchange | | _channel | +---------+ | | | | | | | exchange +--------------------> | channel | | | | | | | | | +-------------------+ +----------------+
手机如图:
4.8 Hub
用户可以通过同步方式自行读取消息,如果不想自行读取,也可以通过Hub(本身构建了一个异步消息引擎)读取。
4.8.1 自己的poller
Hub 是一个eventloop,拥有自己的 poller。
前面在 MultiChannelPoller 中间提到了,MultiChannelPoller 会建立了自己内部的 poller。但是实际上在注册时候,Transport 会使用 hub 的 poller,而非 MultiChannelPoller 内部的 poller。
4.8.2 Connection
Connection注册到Hub,一个Connection对应一个Hub。
hub = Hub() conn = Connection('redis://localhost:6379') conn.register_with_event_loop(hub)
4.8.3 联系
在注册过程中,Hub 把自己内部的 poller 配置在 Transport 之中。这样就通过 transport 内部的 MultiChannelPoller 可以把 Hub . poller 和 Channel 对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作;
因而,如前面所述,这样 MultiChannelPoller 就可以 打通 Channel ---> socket ---> poll ---> fd ---> 读取 redis
这条通路了,就是如果 redis 有数据来了,MultiChannelPoller 就马上通过 poll 得到通知,就去 redis 读取。
def register_with_event_loop(self, loop): self.transport.register_with_event_loop(self.connection, loop)
4.8.4 定义
Hub定义如下:
class Hub: """Event loop object. """ def __init__(self, timer=None): self.timer = timer if timer is not None else Timer() self.readers = {} self.writers = {} self.on_tick = set() self.on_close = set() self._ready = set() self._create_poller() @property def poller(self): if not self._poller: self._create_poller() return self._poller def _create_poller(self): self._poller = poll() self._register_fd = self._poller.register self._unregister_fd = self._poller.unregister def add(self, fd, callback, flags, args=(), consolidate=False): fd = fileno(fd) try: self.poller.register(fd, flags) except ValueError: self._remove_from_loop(fd) raise else: dest = self.readers if flags & READ else self.writers if consolidate: self.consolidate.add(fd) dest[fd] = None else: dest[fd] = callback, args def run_forever(self): self._running = True try: while 1: try: self.run_once() except Stop: break finally: self._running = False def run_once(self): try: next(self.loop) except StopIteration: self._loop = None def create_loop(self, ...): readers, writers = self.readers, self.writers poll = self.poller.poll while 1: for fd, event in events or (): cb, cbargs = readers[fd] if isinstance(cb, generator): next(cb) cb(*cbargs) else: # no sockets yet, startup is probably not done. sleep(min(poll_timeout, 0.1)) yield
0x05 总结
我们通过文字和图例来总结下本文。
5.1 逻辑
Message:消息,发送和消费的主体,其实就是我们所谓的一条条消息;
Connection是AMQP对消息队列连接的封装抽象,那么两者的关系就是:对MQ的操作必然离不开连接。
-
Channel是AMQP对MQ的操作的封装,可以理解成共享一个Connection的多个轻量化连接。
-
Channel
将Consumer
标签,Consumer
要消费的队列,以及标签与队列的映射关系都记录下来,等待循环调用。 - 还通过
Transport
将队列与回调函数列表的映射关系记录下来。 - Kombu对所有需要监听的队列
_active_queues
都查询一遍,直到查询完毕或者遇到一个可以使用的Queue,然后就获取消息,回调此队列对应的callback。 - Channel初始化的过程就是连接的过程。
-
Kombu并不直接让Channel使用Connection来发送/接受请求,而是引入了一个新的抽象Transport,Transport负责具体的MQ的操作,也就是说Channel的操作都会落到Transport上执行。是以Transport为中心,把Channel代表的真实redis与Hub其中的poll联系起来。
Queue:消息队列,消息内容的载体,存储着即将被应用消费掉的消息。Exchange 负责将消息分发 Queue,消费者从 Queue 接收消息;
-
Exchange:交换机,消息发送者将消息发至 Exchange,Exchange 负责将消息分发至 Queue;
- 消息发送是交给 Exchange 来做的,但Exchange只是将发送的
routing_key
转化为queue
的名字,这样发送就知道应该发给哪个queue;实际发送还是得 channel 来干活, - 即从 exchange 得到 routing_key ---> queue 的规则,然后再依据 routing_key 得到 queue。就知道 Consumer 和 Producer 需要依据哪个 queue 交换消息。
- 每个不同的 Transport 都有对应的 Channel;生产者将消息发送到Exchange,Exchange通过匹配BindingKey和消息中的RouteKey来将消息路由到队列,最后队列将消息投递给消费者。
- 消息发送是交给 Exchange 来做的,但Exchange只是将发送的
Producers: 发送消息的抽象类,Producer 包含了很多东西,有 Exchange、routing_key 和 channel 等等;
-
Consumers:接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息;
- Consumer绑定了消息的处理函数,每一个Consumer初始化的时候都是和Channel绑定的,也就是说我们Consumer包含了Queue也就和Connection关联起来了。
- Consumer消费消息是通过Queue来消费,然后Queue又转嫁给Channel,再转给connection。
用户可以通过同步方式自行读取消息,如果不想自行读取,也可以通过Hub(本身构建了一个异步消息引擎)读取。
Hub是一个eventloop,Connection注册到Hub,一个Connection对应一个Hub。Hub 把自己内部的 poller 配置在 Transport 之中。这样就通过 transport 内部的 MultiChannelPoller 可以把 Hub . poller 和 Channel 对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作;
MultiChannelPoller
是Connection 和 Hub的枢纽,它负责找出哪个 Channel 是可用的,但是这些 Channel 都是来自同一个 Connection。
5.2 示例图
具体如图,可以看到,
- 目前是以Transport为中心,把Channel代表的真实redis与Hub其中的poll联系起来,但是具体如何使用则尚未得知。
- 用户是通过Connection来作为API入口,connection可以得到Transport。
+-------------------+ | Channel | | | +-----------------------------------------------------------+ | client +-------------> | Redis<ConnectionPool<Connection| | | +-----------------------------------------------------------+ | | | | +---------------------------------------------------+-+ | pool +--------------> |ConnectionPool<Connection| | | +---------------------------------------------------+-+ | | | | | _fd_to_chan | | transport +---------> | | | _chan_to_sock | | | | | + | _poll | | | | | +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+ +----------------+ +-------------------+ | Exchange | | Queue | | | | | | | | | | channel | <------------+ exchange | | | | | | | | | +----------------+ +-------------------+
我们下文用实例来介绍Kombu的启动过程。
因为本文是一个综述,所以大家会发现,一些概念讲解文字会同时出现在后续文章和综述之中。
0xFF 参考
celery 7 优秀开源项目kombu源码分析之registry和entrypoint