[源码分析] 消息队列 Kombu 之 Hub

[源码分析] 消息队列 Kombu 之 Hub

0x00 摘要

本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Hub 概念。

0x01 示例代码

下面使用如下代码来进行说明。

本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。

def main(arguments):
hub = Hub()
exchange = Exchange('asynt_exchange')
queue = Queue('asynt_queue', exchange, 'asynt_routing_key') def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
print('message sent') def on_message(message):
print('received: {0!r}'.format(message.body))
message.ack()
# hub.stop() # <-- exit after one message conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub) def p_message():
print(' kombu ') with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(3, p_message)
hub.run_forever() if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))

0x02 来由

前文中,Consumer部分有一句代码没有分析:

hub.run_forever()

此时,hub与Connection已经联系起来,具体如下:

具体如下图:

+----------------------+               +-------------------+
| Consumer | | Channel |
| | | | +-----------------------------------------------------------+
| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
| channel +--------------------> | | +-----------------------------------------------------------+
| | | pool |
| | +---------> | | <------------------------------------------------------------+
| queues | | | | |
| | | +----> | connection +---------------+ |
| | | | | | | | |
+----------------------+ | | +-------------------+ | |
| | | v |
| | | +-------------------+ +---+-----------------+ +--------------------+ |
| | | | Connection | | redis.Transport | | MultiChannelPoller | |
| | | | | | | | | |
| | | | | | | | _channels +--------+
| | | | | | cycle +------------> | _fd_to_chan |
| | | | transport +---------> | | | _chan_to_sock |
| +-------->+ | | | | | +------+ poller |
| | | +-------------------+ +---------------------+ | | after_read |
| | | | | |
| | | | +--------------------+
| | | +------------------+ +---------------+
| | | | Hub | |
| | | | | v
| | | | | +------+------+
| | | | poller +---------------> | _poll |
| | | | | | | +-------+
| | | | | | _poller+---------> | poll |
v | | +------------------+ | | +-------+
| | +-------------+
+-------------------+ | +----------------+
| Queue | | | | Exchange |
| _chann+l | +----+ | |
| | | |
| exchange +----------------> | channel |
| | | |
| | | |
+-------------------+ +----------------+

手机如下:

[源码分析] 消息队列 Kombu 之 Hub

现在我们知道:

  • Consumers:接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息。
  • Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
  • Queue:对应的 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
  • Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接,是操作的抽象;

但是,我们只是大致知道 poll 是用来做什么的,但是不知道consumer,poll 究竟如何与Hub交互。我们本文就接着分析。

0x03 Poll一般步骤

在linux系统中,使用Poll的一般步骤如下:

  1. Create an epoll object——创建1个epoll对象;
  2. Tell the epoll object to monitor specific events on specific sockets——告诉epoll对象,在指定的socket上监听指定的事件;
  3. Ask the epoll object which sockets may have had the specified event since the last query——询问epoll对象,从上次查询以来,哪些socket发生了哪些指定的事件;
  4. Perform some action on those sockets——在这些socket上执行一些操作;
  5. Tell the epoll object to modify the list of sockets and/or events to monitor——告诉epoll对象,修改socket列表和(或)事件,并监控;
  6. Repeat steps 3 through 5 until finished——重复步骤3-5,直到完成;
  7. Destroy the epoll object——销毁epoll对象;

所以我们就需要在 Hub 代码中看看 kombu 如何使用 Poll。

0x04 建立 Hub

在建立 Hub 这里会建立 Hub 内部的 Poller。

_get_poller, eventio.py:312
poll, eventio.py:328
_create_poller, hub.py:113
__init__, hub.py:96
main, hub_receive.py:23
<module>, hub_receive.py:46

具体代码是:

def _get_poller():
if detect_environment() != 'default':
# greenlet
return _select
elif epoll:
# Py2.6+ Linux
return _epoll
elif kqueue and 'netbsd' in sys.platform:
return _kqueue
elif xpoll:
return _poll
else:
return _select

这样,在 Hub内部就建立了 poller。

class Hub:
"""Event loop object. Arguments:
timer (kombu.asynchronous.Timer): Specify custom timer instance.
"""
def __init__(self, timer=None):
self.timer = timer if timer is not None else Timer() self.readers = {}
self.writers = {}
self.on_tick = set()
self.on_close = set()
self._ready = set() self._running = False
self._loop = None self._create_poller() @property
def poller(self):
if not self._poller:
self._create_poller()
return self._poller @poller.setter
def poller(self, value):
self._poller = value def _create_poller(self):
self._poller = poll()
self._register_fd = self._poller.register
self._unregister_fd = self._poller.unregister

这里需要注意的是:

在 MultiChannelPoller 之中,也会生成一个 poller,但是在注册时候,Transport 会使用 hub 的 poller,而非 MultiChannelPoller 内部的 poller

on_poll_init, redis.py:333
register_with_event_loop, redis.py:1061
register_with_event_loop, connection.py:266
main, hub_receive.py:38
<module>, hub_receive.py:46

在 kombu.transport.redis.Transport 代码如下:

def register_with_event_loop(self, connection, loop):
cycle = self.cycle
cycle.on_poll_init(loop.poller) # 这里赋值。
cycle_poll_start = cycle.on_poll_start
add_reader = loop.add_reader
on_readable = self.on_readable

继续深入,看到进一步赋值:

def on_poll_init(self, poller):
self.poller = poller # 这里赋值
for channel in self._channels:
return channel.qos.restore_visible(
num=channel.unacked_restore_limit,
)

0x05 Forever in Hub

hub.run_forever() 主要作用是:

  • 建立loop
  • 因为Hub里面有Channel,有poll,所以现在就把Channel与poll联系起来,包括socket,socket的file等待。
  • 进行poll,有消息就相应处理;

比如维护如下变量:

self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self.poller.register(sock, self.eventflags)

具体 run_forever 如下:

def run_forever(self):
self._running = True
try:
while 1:
try:
self.run_once()
except Stop:
break
finally:
self._running = False

于是又有调用如下,这里就进入了loop:

def run_once(self):
try:
next(self.loop)
except StopIteration:
self._loop = None

5.1 建立loop

next(self.loop) 继续调用,建立loop。这就是Hub的作用

调用stack如下:

create_loop, hub.py:279
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55

简化版代码如下:

def create_loop(self, ...):

    while 1:
todo = self._ready
self._ready = set() for tick_callback in on_tick:
tick_callback() # 这里回调用户方法 for item in todo:
if item:
item() poll_timeout = fire_timers(propagate=propagate) if scheduled else 1 if readers or writers:
to_consolidate = []
events = poll(poll_timeout) # 等待消息 for fd, event in events or ():
if fd in consolidate and \
writers.get(fd) is None:
to_consolidate.append(fd)
continue
cb = cbargs = None if event & READ:
cb, cbargs = readers[fd] # 读取redis
elif event & WRITE:
cb, cbargs = writers[fd] # 处理redis if isinstance(cb, generator):
try:
next(cb)
else:
cb(*cbargs) # 调用用户代码
if to_consolidate:
consolidate_callback(to_consolidate)
else:
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
yield

下面我们逐步分析。

0x06 启动Poll

循环最开始将启动 Poll。 tick_callback 的作用就是启动 Poll。就是建立一个机制,当 redis 有消息时候,得到通知

while 1:
todo = self._ready
self._ready = set() for tick_callback in on_tick:
tick_callback()

此时:tick_callback的数值为:<function Transport.register_with_event_loop.<locals>.on_poll_start >,所以 tick_callback就调用到 Transport.register_with_event_loop.<locals>.on_poll_start

6.1 回顾如何注册回调

Transport方法如何注册,我们需要回顾,在前面代码这里会注册回调方法。

conn.register_with_event_loop(hub)

具体注册如下:

def register_with_event_loop(self, connection, loop):

    cycle_poll_start = cycle.on_poll_start
add_reader = loop.add_reader
on_readable = self.on_readable def _on_disconnect(connection):
if connection._sock:
loop.remove(connection._sock)
cycle._on_connection_disconnect = _on_disconnect def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds] loop.on_tick.add(on_poll_start)

on_poll_start就是在这里注册的,就是把 on_poll_start 注册到 hub 的 on_tick 回调之中

loop.on_tick.add(on_poll_start)

所以前面的如下代码就调用到了 on_poll_start。

for tick_callback in on_tick:
tick_callback()

6.2 Transport启动

所以,我们回到on_poll_start。

def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds]

可以看到,有两部分代码:

  • poll_start : 这部分是 把 Channel 对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作;
  • add_reader :这部分是 把 poll 对应的 fd 添加到 MultiChannelPoller 这里,这样 MultiChannelPoller 就可以 打通 redis queue ----> Channel ---> socket ---> poll ---> fd ---> 读取 redis 这条通路了,就是如果 redis 有数据来了,MultiChannelPoller 就马上通过 poll 得到通知,就去 redis 读取;

让我们逐一看看。

6.3 poll_start in MultiChannelPoller

这里就是把Channel对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作

此时代码进入到MultiChannelPoller,数据如下:

self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f84e7928940>
after_read = {set: 0} set()
eventflags = {int} 25
fds = {dict: 0} {}
poller = {_poll} <kombu.utils.eventio._poll object at 0x7f84e75f4d68>

可以看出来,此处就是针对channel来进行注册,把所有的channel注册到 poll上。

def on_poll_start(self):
for channel in self._channels:
if channel.active_queues: # BRPOP mode?
if channel.qos.can_consume():
self._register_BRPOP(channel)
if channel.active_fanout_queues: # LISTEN mode?
self._register_LISTEN(channel)

对于 redis 的使用,有两种方法:BRPOP mode 和 LISTEN mode。分别对应 list 和 subscribe。

6.3.1 _register_BRPOP

我们先来看看 _register_BRPOP,这里做了两个判断,第一个是判断当前的 channel 是否放进了 epoll 模型里面,如果没有,那么就放进去;同时,如果之前这个 channel 不在 epoll 里面,那么这次放进去了,但是,这个 connection 还没有对 epoll 其效果,所以发送一个 _brpop_start

def _register_BRPOP(self, channel):
"""Enable BRPOP mode for channel."""
ident = channel, channel.client, 'BRPOP'
if not self._client_registered(channel, channel.client, 'BRPOP'):
channel._in_poll = False
self._register(*ident)
if not channel._in_poll: # send BRPOP
channel._brpop_start()
6.3.1.1 注册到MultiChannelPoller

一个 Connection 对应一个 Hub它们之间的枢纽是 MultiChannelPoller,它负责找出哪个 Channel 是可用的,这些 Channel 都是来自同一个 Connection。具体注册代码如下:

def _register(self, channel, client, type):
if (channel, client, type) in self._chan_to_sock:
self._unregister(channel, client, type)
if client.connection._sock is None: # not connected yet.
client.connection.connect() sock = client.connection._sock
self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self.poller.register(sock, self.eventflags)

这里的client是Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>

注意到这里client.connection._sock的数值是socket。

client.connection._sock = {socket} <socket.socket fd=8, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 52353), raddr=('127.0.0.1', 6379)>
family = {AddressFamily} AddressFamily.AF_INET
proto = {int} 6
timeout = {NoneType} None
type = {SocketKind} SocketKind.SOCK_STREAM

经过此阶段之后。

_fd_to_chan有意义,具体fd是 chanel 对应的 redis socket的fd

def _register(self, channel, client, type):
if (channel, client, type) in self._chan_to_sock:
self._unregister(channel, client, type)
if client.connection._sock is None: # not connected yet.
client.connection.connect()
sock = client.connection._sock
self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self.poller.register(sock, self.eventflags)

这里就是把channel与自己对应的socket联系起来,也把channel与socket的file联系起来

变量如下:

self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0>
after_read = {set: 0} set()
eventflags = {int} 25
fds = {dict: 1}
8 = {tuple: 2} (<kombu.transport.redis.Channel object at 0x7f9056a57278>, 'BRPOP')
__len__ = {int} 1
poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048>

这样,从 socket fd 可以找到 对应的 channel,也能从 channel 找到 对应的 socket fd 。

如下图:

+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| +---------------------------------------+ |
| |
| |
| |
+----------------------------------------------------------------------------------+
6.3.1.2 注册到Poll

继续处理register,就是把socket注册到poll

class _poll:

    def __init__(self):
self._poller = xpoll()
self._quick_poll = self._poller.poll
self._quick_register = self._poller.register
self._quick_unregister = self._poller.unregister def register(self, fd, events):
fd = fileno(fd)
poll_flags = 0
if events & ERR:
poll_flags |= POLLERR
if events & WRITE:
poll_flags |= POLLOUT
if events & READ:
poll_flags |= POLLIN
self._quick_register(fd, poll_flags)
return fd

此时如下,我们仅仅以 fd 3 为例:

下面就是 Channel ---> socket ---> poll ---> fd 这条通路。

+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| | + | |
| | | | |
| +---------------------------------------+ |
| | |
+----------------------------------------------------------------------------------+
|
|
v poll with OS
6.3.1.3 _brpop_start

若这个 connection 还没有对 epoll 其效果,就发送一个 _brpop_start作用为选择下一次读取的queue

_brpop_start如下:

def _brpop_start(self, timeout=1):
queues = self._queue_cycle.consume(len(self.active_queues))
if not queues:
return
keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] + [timeout or 0]
self._in_poll = self.client.connection
self.client.connection.send_command('BRPOP', *keys)

此时stack如下:

_register, redis.py:296
_register_BRPOP, redis.py:312
on_poll_start, redis.py:328
on_poll_start, redis.py:1072
create_loop, hub.py:294
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55

此时如下,现在我们有两条通路:

  • Channel ---> socket ---> poll ---> fd 这条通路;
  • MultiChannelPoller ---> 读取 redis 这条通路;
  • 因为这个时候 下一次 读取的 queue 已经确定了,所以已经 打通 Redis queue ----> Channel ---> socket ---> poll ---> fd 这条通路了。
+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| connection | + | |
| + | | | |
| | +---------------------------------------+ |
| | | |
+----------------------------------------------------------------------------------+
| |
| |
v v Redis Queue +-----------------> poll with OS

6.3.2 _register_LISTEN

本文没有相关部分,如果有topic 相关则会调用这里。Celery event 就利用了这种方法

def _register_LISTEN(self, channel):
"""Enable LISTEN mode for channel."""
if not self._client_registered(channel, channel.subclient, 'LISTEN'):
channel._in_listen = False
self._register(channel, channel.subclient, 'LISTEN')
if not channel._in_listen:
channel._subscribe() # send SUBSCRIBE

注册如下:

_subscribe, redis.py:656
_register_LISTEN, redis.py:322
on_poll_start, redis.py:330
on_poll_start, redis.py:1072
create_loop, hub.py:294
asynloop, loops.py:81
start, consumer.py:592
start, bootsteps.py:116
start, consumer.py:311
start, bootsteps.py:365
start, bootsteps.py:116
start, worker.py:204
worker, worker.py:327

此时变量如下:

c = {PubSub} <redis.client.PubSub object at 0x7fb09e750400>
keys = {list: 1}
0 = {str} '/0.celery.pidbox' self = {Channel} <kombu.transport.redis.Channel object at 0x7fb09e6c8c88>

6.4 注册 reader in MultiChannelPoller

上面可以看到,把所有的 channel 注册到 poll上,对所有的 queue 都发起了监听请求,也就是说任一个队列有消息过来,那么都会被响应到,那么响应给谁呢?需要看看 add_reader 这个函数做了啥:

就是说,前面那些注册到 poll,其实没有注册响应方法,现在需要注册

复习下,add_reader 在 on_poll_start 这里。

def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds]

cycle.fds 具体是得到了所有fd。

@property
def fds(self):
return self._fd_to_chan

具体添加是在 Hub 类中。

  • 这里会再次尝试添加。
  • 然后会把 fd 与 callback 联系起来。
class Hub:
def add_reader(self, fds, callback, *args):
return self.add(fds, callback, READ | ERR, args) def add(self, fd, callback, flags, args=(), consolidate=False):
fd = fileno(fd)
try:
self.poller.register(fd, flags)
except ValueError:
self._remove_from_loop(fd)
raise
else:
dest = self.readers if flags & READ else self.writers
if consolidate:
self.consolidate.add(fd)
dest[fd] = None
else:
dest[fd] = callback, args

注意,这里设置的是:hub 的成员变量,self.readers ,其在后续 poll 消息产生的就用到了,就调用这些callback,就是 Transport.on_readable。

readers = {dict: 1}
8 = {tuple: 2} (<bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>, (8,))
0 = {method} <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>
1 = {tuple: 1} 8

stack为:

register, eventio.py:187
add, hub.py:164
add_reader, hub.py:213
<listcomp>, redis.py:1073
on_poll_start, redis.py:1073
create_loop, hub.py:294
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55

所以此时为如下,依然不知道响应给谁

+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| connection | + | |
| + | | | |
| | +---------------------------------------+ |
| | | |
+----------------------------------------------------------------------------------+
| |
| |
v v Redis Queue +------------------> poll with OS +---------------------------------------------------------------------------------+
| |
| Hub |
| +--------------------------------------+ |
| |fd 3 : [ Transport.on_readable, fd 3] | |
| | | |
| readers +------------------> | ...... | |
| | | |
| |fd 1 : [ Transport.on_readable, fd 1] | |
| +--------------------------------------+ |
| |
+---------------------------------------------------------------------------------+

因为这个流程十分复杂,为了简化,我们这里提前剧透,在 消费函数时候,Transport 会设置 自己的 _callbacks[queue] 为一个回调函数,所以 MultiChannelPoller 读取 queue 这部分也可以联系起来

    def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue) def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message) self.connection._callbacks[queue] = _callback # 这里设置 self._consumers.add(consumer_tag) self._reset_cycle()

6.5 启动timer

然后是启动poll的timer,定期做业务操作。

poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
# print('[[[HUB]]]: %s' % (self.repr_active(),))
if readers or writers:
to_consolidate = []
try:
events = poll(poll_timeout)
# print('[EVENTS]: %s' % (self.repr_events(events),))
except ValueError: # Issue celery/#882
return

6.6 poll

然后是进行poll,若对应的file有消息,就处理(读取redis中的内容),然后进行下一次poll。

对于我们例子,下面简略版代码就是进行Poll:

poll_timeout = fire_timers(propagate=propagate) if scheduled else 1

if readers or writers:
to_consolidate = []
try:
events = poll(poll_timeout)
except ValueError: # Issue celery/#882
return for fd, event in events or ():
cb = cbargs = None if event & READ:
try:
cb, cbargs = readers[fd]
elif event & WRITE:
try:
cb, cbargs = writers[fd] if isinstance(cb, generator):
next(cb) else:
try:
cb(*cbargs)
except Empty:
pass
else:
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
yield

6.6.1 poll方法

具体的poll方法如下,就是调用系统的方法来进行poll:

def poll(self, timeout, round=math.ceil,
POLLIN=POLLIN, POLLOUT=POLLOUT, POLLERR=POLLERR,
READ=READ, WRITE=WRITE, ERR=ERR, Integral=Integral):
timeout = 0 if timeout and timeout < 0 else round((timeout or 0) * 1e3)
event_list = self._quick_poll(timeout) ready = []
for fd, event in event_list:
events = 0
if event & POLLIN:
events |= READ
if event & POLLOUT:
events |= WRITE
if event & POLLERR or event & POLLNVAL or event & POLLHUP:
events |= ERR
assert events
if not isinstance(fd, Integral):
fd = fd.fileno()
ready.append((fd, events))
return ready

6.6.2 callback

在 create_loop 代码中可以看到

def create_loop(self,
generator=generator, sleep=sleep, min=min, next=next,
Empty=Empty, StopIteration=StopIteration,
KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
readers, writers = self.readers, self.writers cb, cbargs = readers[fd]
cb(*cbargs)

这就是说,poll回调的时候,会调用reader中对应fd的回调函数来处理。

readers就是在之前 6.4 那节 设定的。

其内容是,就是 8 这个fd 对应的回调函数是Transport.on_readable:

readers = {dict: 1}
8 = {tuple: 2} (<bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7ffe7482ddd8>>, (8,))
0 = {method} <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7ffe7482ddd8>>
1 = {tuple: 1} 8
__len__ = {int} 2

因此回调到<kombu.transport.redis.Transport object at 0x7ffe7482ddd8>。

def on_readable(self, fileno):
"""Handle AIO event for one of our file descriptors."""
self.cycle.on_readable(fileno)

进而调用到

<kombu.transport.redis.MultiChannelPoller object at 0x7faee4166d68>

def on_readable(self, fileno):
chan, type = self._fd_to_chan[fileno]
if chan.qos.can_consume():
chan.handlers[type]()

从 socket fd 可以找到 对应的 channel,也能从 channel 找到 对应的 socket fd 。从 channel 找到 channel 的 callback。

对应 self._fd_to_chan[fileno],取出 socket fd 对应 callback,进行处理。这里的callback如下:

handlers = {dict: 2}
'BRPOP' = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7faee418dfd0>>
'LISTEN' = {method} <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7faee418dfd0>>

于是调用 Channel._brpop_read 或者 Channel._receive 从redis 中 读取消息。

具体调用堆栈如下:

_brpop_read, redis.py:734
on_readable, redis.py:358
on_readable, redis.py:1087
create_loop, hub.py:361
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55

逻辑如下:

+--------------+    socket
| redis | <------------> port +--> fd +--->+ +---> channel +--> handlers 'BRPOP' = Channel._brpop_read
| | | | 'LISTEN' = Channel._receive
| | socket | |
| | <------------> port +--> fd +--->---> _fd_to_chan +-------> channel +--> handlers 'BRPOP' = Channel._brpop_read
| port=6379 | | | 'LISTEN' = Channel._receive
| | socket | |
| | <------------> port +--> fd +--->+ +---> channel +--> handlers 'BRPOP' = Channel._brpop_read
+--------------+ 'LISTEN' = Channel._receive

此时手机为:

[源码分析] 消息队列 Kombu 之 Hub

如果加入poll,则如下:

            +---------------------------------------------------------------------------------------------------------------------------------------+
| +--------------+ 6 parse_response |
| +--> | Linux Kernel | +---+ |
| | +--------------+ | |
| | | |
| | | event |
| | 1 | |
| | | 2 |
| | | |
+-------+---+ socket + | |
| redis | <------------> port +--> fd +--->+ v |
| | | +------+--------+ |
| | socket | | Hub | |
| | <------------> port +--> fd +--->----------> | | |
| port=6379 | | | | |
| | socket | | readers +-----> Transport.on_readable |
| | <------------> port +--> fd +--->+ | | + |
+-----------+ +---------------+ | |
| |
3 | |
+----------------------------------------------------------------------------------------+ |
| v
| _receive_callback
| 5 +-------------+ +-----------+
+------------+------+ +-------------------------+ 'BRPOP' = Channel._brpop_read +-----> | Channel | +------------------> | Consumer |
| Transport | | MultiChannelPoller | +---> channel +--> handlers 'LISTEN' = Channel._receive +-------------+ +---+-------+
| | | | | |
| | on_readable(fileno) | | | ^ |
| cycle +---------------------> | _fd_to_chan +-------------> channel +--> handlers 'BRPOP' = Channel._brpop_read | |
| | 4 | | | 'LISTEN' = Channel._receive | |
| _callbacks[queue]| | | | | on_m |
| + | +-------------------------+ +---> channel +--> handlers 'BRPOP' = Channel._brpop_read | |
+-------------------+ 'LISTEN' = Channel._receive | |
| | v
| 7 _callback |
+-----------------------------------------------------------------------------------------------------------------------------------------+ User Function

此时手机为:

[源码分析] 消息队列 Kombu 之 Hub

0x07 接收消息

现在消息已经被放置于redis 队列中,那么消息又被如何使用呢?

从上节得知,当poll提示有消息时候,会通过 Channel._brpop_read 或者 Channel._receive 从 redis 中 读取消息。

具体堆栈如下:

_brpop_read, redis.py:734
on_readable, redis.py:358
on_readable, redis.py:1087
create_loop, hub.py:361
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55

即:在 hub 的 loop中,通过 redis 驱动代码 从 redis 队列中取出消息,然后调用Transport传递过来的_deliver方法,最后调用userfunction

def _brpop_read(self, **options):
try:
try:
dest__item = self.client.parse_response(self.client.connection,
'BRPOP',
**options)
except self.connection_errors:
# if there's a ConnectionError, disconnect so the next
# iteration will reconnect automatically.
self.client.connection.disconnect()
raise
if dest__item:
dest, item = dest__item
dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
self._queue_cycle.rotate(dest)
self.connection._deliver(loads(bytes_to_str(item)), dest) #调用用户function
return True
else:
raise Empty()
finally:
self._in_poll = None

7.1 从驱动读取

7.1.1 从redis读取

这里会从redis驱动读取,文件是 redis/connection.py,具体就是通过 SocketBuffer 类从 redis 对应的 socket 读取。代码为:

def readline(self):
buf = self._buffer
buf.seek(self.bytes_read)
data = buf.readline()
while not data.endswith(SYM_CRLF):
# there's more data in the socket that we need
self._read_from_socket()
buf.seek(self.bytes_read)
data = buf.readline() self.bytes_read += len(data) # purge the buffer when we've consumed it all so it doesn't
# grow forever
if self.bytes_read == self.bytes_written:
self.purge() return data[:-2]

当读到 response 之后,调用 Redis驱动中对应命令的 回调方法来处理。此处命令为BRPOP。回调方法为:string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None)

代码为:

def parse_response(self, connection, command_name, **options):
"Parses a response from the Redis server"
try:
response = connection.read_response()
except ResponseError:
if EMPTY_RESPONSE in options:
return options[EMPTY_RESPONSE]
raise
if command_name in self.response_callbacks:
return self.response_callbacks[command_name](response, **options)
return response

此时上下文相关变量为:

command_name = {str} 'BRPOP'
connection = {Connection} Connection<host=localhost,port=6379,db=0>
options = {dict: 0} {}
self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
connection = {Connection} Connection<host=localhost,port=6379,db=0>
connection_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
response_callbacks = {CaseInsensitiveDict: 179} {.
'AUTH' = {type} <class 'bool'>
'EXPIRE' = {type} <class 'bool'>
.....
'LLEN' = {type} <class 'int'>
'LPUSHX' = {type} <class 'int'>
'PFADD' = {type} <class 'int'>
'PFCOUNT' = {type} <class 'int'>
......
'SWAPDB' = {function} <function bool_ok at 0x7fbad4276620>
'WATCH' = {function} <function bool_ok at 0x7fbad4276620>
'UNWATCH' = {function} <function bool_ok at 0x7fbad4276620>
'BLPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
'BRPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
....

这些代码堆栈如下:

readline, connection.py:251
read_response, connection.py:324
read_response, connection.py:739
parse_response, client.py:915
_brpop_read, redis.py:738
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, testUb.py:50
<module>, testUb.py:53

7.2 分发消息

loop从驱动得到消息之后,进行 deliver 分发。

self.connection._deliver(loads(bytes_to_str(item)), dest)

所做的事情是根据队列取出注册到此队列的回调函数列表,然后对消息执行列表中的所有回调函数

def _deliver(self, message, queue):
try:
callback = self._callbacks[queue]
except KeyError:
logger.warning(W_NO_CONSUMERS, queue)
self._reject_inbound_message(message)
else:
callback(message)

7.2.1 找到callback

此时 self是

<kombu.transport.redis.Transport object at 0x7faee4128f98>

callback如下:

self._callbacks = {dict: 1}
'asynt_queue' = {function} <function Channel.basic_consume.<locals>._callback at 0x7faee244a2f0>

这里意味着 asynt_queue 这个 queue 对应的 callback 是 Channel.basic_consume。

7.2.2 何时设定callback

调用的 callback 是 Channel 这里定义的。basic_consume就是把传入的参数 callback 数值,实际这个传入的参数 callback就是 Consumer. _receive_callback。

def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue) def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message) self.connection._callbacks[queue] = _callback
self._consumers.add(consumer_tag) self._reset_cycle()

设置是在上面函数里面这句,

    self.connection._callbacks[queue] = _callback

stack如下:

basic_consume, base.py:632
basic_consume, redis.py:598
consume, entity.py:738
_basic_consume, messaging.py:594
consume, messaging.py:473
__enter__, messaging.py:430
main, testUb.py:46
<module>, testUb.py:55

7.2.3 调用到用户方法

Consumer的函数定义如下:

def _receive_callback(self, message):
accept = self.accept
on_m, channel, decoded = self.on_message, self.channel, None
try:
m2p = getattr(channel, 'message_to_python', None)
if m2p:
message = m2p(message)
if accept is not None:
message.accept = accept
if message.errors:
return message._reraise_error(self.on_decode_error)
decoded = None if on_m else message.decode()
except Exception as exc:
if not self.on_decode_error:
raise
self.on_decode_error(message, exc)
else:
return on_m(message) if on_m else self.receive(decoded, message)

self.on_message就是用户方法,所以最终调用到用户方法

on_message, testUb.py:36
_receive_callback, messaging.py:620
_callback, base.py:630
_deliver, base.py:980
_brpop_read, redis.py:748
on_readable, redis.py:358
on_readable, redis.py:1087
create_loop, hub.py:361
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55

此时如下:

+----------------------+               +-------------------+
| Producer | | Channel |
| | | | +-----------------------------------------------------------+
| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
| channel +------------------> | | +-----------------------------------------------------------+
| | | pool |
| exchange | +---------> | | <------------------------------------------------------------+
| | | | | |
| connection | | +----> | connection +---------------+ |
| + | | | | | | |
| | | | | +-------------------+ +----------------------------------------------------------------+
+--+-------------------+ | | | v | |
| | | | +-------------------+ | +---+-----------------+ +--------------------+ | |
| | | | | Connection | | | redis.Transport | | MultiChannelPoller | | |
| +----------------------> | | | | | | | | |
| | | | _sock <----------+ | | | _channels +--------+ |
| | | | | | cycle +------------> | _fd_to_chan | |
| | | | transport +---------> | | | _chan_to_sock+-----------+
| +-------->+ | | | | | +------+ poller |
| | | +-------------------+ +---------------------+ | | after_read |
| | | | | |
| | | | +--------------------+
| | | +------------------+ +---------------+
| | | | Hub | |
| | | | | v
| | | | | +------+------+
| | | | poller +---------------> | _poll |
| publish | | | | | | +-------+
+--------------------------------+ | | | _poller+---------> | poll |
| | | +------------------+ | | +-------+
| | | +-------------+
+-------------------+ | +-----> +----------------+
| Queue | | | | Exchange |
| _channel | +---------+ | |
| | | |
| exchange +--------------------> | channel |
| | | |
| | | |
+-------------------+ +----------------+

手机如下:

[源码分析] 消息队列 Kombu 之 Hub

动态逻辑如下:

 +-----+           +-----------+     +--------------------+ +---------+ +-------+ +--------+
| Hub | | Transport | | MultiChannelPoller | | _poll | |Channel| |Consumer|
+--+--+ +----+------+ +------------+-------+ +----+----+ +---+---+ +------+-+
| | | | | |
v | | | | |
create_loop | | | | |
+ | | | | |
| on_poll_start | | | | |
| | | | | |
| +---------------> | on_poll_start | | | |
| | | | | |
| | +--------------------> | | | |
| | | | | |
| | _register_BRPOP | | |
| | | | | |
| add_reader | register | | |
| + | +----------> | | |
| | register | | | |
fire_timers | +------------------------------------> | | |
| | | | | |
| poll | | | | |
| +--------------------------------------------------------> | | |
| | | | | |
| | | | | |
+ | | | | |
for fd, event in events | | | | |
| | | | | |
| | | | | |
cb, cbargs = readers[fd]| | | | |
+ | | | | |
| | | | | |
| | | | | |
cb(*cbargs | | | | |
+ | | | | |
| on_readable | | | | |
| | | | | |
| +--------------> | on_readable | | | |
| | | | | |
| +-----------------------> | | | |
| | | | | |
| | | | | |
| | chan, type = _fd_to_chan[fileno]| | |
| | | | | |
| | | _brpop_read | | |
| | | | | |
| | | +---------------------> | |
| | _deliver | | | |
| | | | | |
| | <----------------------------------------------+ | |
| | | | | |
| | | | | |
| | _callback | | | |
| | | | | |
| | +----------------------------------------------> | |
| | | | + +
| | | | _receive_callback
| | | | | +
| | | | | +--------->+
| | | | | |
v v v v v v

手机如下:

[源码分析] 消息队列 Kombu 之 Hub

0xFF 参考

celery 7 优秀开源项目kombu源码分析之registry和entrypoint

(二)放弃pika,选择kombu

kombu消息框架<二>

AMQP中的概念

AMQP的基本概念

深入理解AMQP协议

kombu和消息队列总结

关于epoll版服务器的理解(Python实现)

celery源码解读

Kombu源码分析(一)概述

上一篇:开启Qt Lite Project


下一篇:QT5.9 新特性与版本回顾