[源码分析] 消息队列 Kombu 之 Hub
0x00 摘要
本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Hub 概念。
0x01 示例代码
下面使用如下代码来进行说明。
本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。
def main(arguments):
hub = Hub()
exchange = Exchange('asynt_exchange')
queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
print('message sent')
def on_message(message):
print('received: {0!r}'.format(message.body))
message.ack()
# hub.stop() # <-- exit after one message
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
def p_message():
print(' kombu ')
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(3, p_message)
hub.run_forever()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
0x02 来由
前文中,Consumer部分有一句代码没有分析:
hub.run_forever()
此时,hub与Connection已经联系起来,具体如下:
具体如下图:
+----------------------+ +-------------------+
| Consumer | | Channel |
| | | | +-----------------------------------------------------------+
| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
| channel +--------------------> | | +-----------------------------------------------------------+
| | | pool |
| | +---------> | | <------------------------------------------------------------+
| queues | | | | |
| | | +----> | connection +---------------+ |
| | | | | | | | |
+----------------------+ | | +-------------------+ | |
| | | v |
| | | +-------------------+ +---+-----------------+ +--------------------+ |
| | | | Connection | | redis.Transport | | MultiChannelPoller | |
| | | | | | | | | |
| | | | | | | | _channels +--------+
| | | | | | cycle +------------> | _fd_to_chan |
| | | | transport +---------> | | | _chan_to_sock |
| +-------->+ | | | | | +------+ poller |
| | | +-------------------+ +---------------------+ | | after_read |
| | | | | |
| | | | +--------------------+
| | | +------------------+ +---------------+
| | | | Hub | |
| | | | | v
| | | | | +------+------+
| | | | poller +---------------> | _poll |
| | | | | | | +-------+
| | | | | | _poller+---------> | poll |
v | | +------------------+ | | +-------+
| | +-------------+
+-------------------+ | +----------------+
| Queue | | | | Exchange |
| _chann+l | +----+ | |
| | | |
| exchange +----------------> | channel |
| | | |
| | | |
+-------------------+ +----------------+
手机如下:
现在我们知道:
- Consumers:接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息。
- Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
- Queue:对应的 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
- Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接,是操作的抽象;
但是,我们只是大致知道 poll 是用来做什么的,但是不知道consumer,poll 究竟如何与Hub交互。我们本文就接着分析。
0x03 Poll一般步骤
在linux系统中,使用Poll的一般步骤如下:
- Create an epoll object——创建1个epoll对象;
- Tell the epoll object to monitor specific events on specific sockets——告诉epoll对象,在指定的socket上监听指定的事件;
- Ask the epoll object which sockets may have had the specified event since the last query——询问epoll对象,从上次查询以来,哪些socket发生了哪些指定的事件;
- Perform some action on those sockets——在这些socket上执行一些操作;
- Tell the epoll object to modify the list of sockets and/or events to monitor——告诉epoll对象,修改socket列表和(或)事件,并监控;
- Repeat steps 3 through 5 until finished——重复步骤3-5,直到完成;
- Destroy the epoll object——销毁epoll对象;
所以我们就需要在 Hub 代码中看看 kombu 如何使用 Poll。
0x04 建立 Hub
在建立 Hub 这里会建立 Hub 内部的 Poller。
_get_poller, eventio.py:312
poll, eventio.py:328
_create_poller, hub.py:113
__init__, hub.py:96
main, hub_receive.py:23
<module>, hub_receive.py:46
具体代码是:
def _get_poller():
if detect_environment() != 'default':
# greenlet
return _select
elif epoll:
# Py2.6+ Linux
return _epoll
elif kqueue and 'netbsd' in sys.platform:
return _kqueue
elif xpoll:
return _poll
else:
return _select
这样,在 Hub内部就建立了 poller。
class Hub:
"""Event loop object.
Arguments:
timer (kombu.asynchronous.Timer): Specify custom timer instance.
"""
def __init__(self, timer=None):
self.timer = timer if timer is not None else Timer()
self.readers = {}
self.writers = {}
self.on_tick = set()
self.on_close = set()
self._ready = set()
self._running = False
self._loop = None
self._create_poller()
@property
def poller(self):
if not self._poller:
self._create_poller()
return self._poller
@poller.setter
def poller(self, value):
self._poller = value
def _create_poller(self):
self._poller = poll()
self._register_fd = self._poller.register
self._unregister_fd = self._poller.unregister
这里需要注意的是:
在 MultiChannelPoller 之中,也会生成一个 poller,但是在注册时候,Transport 会使用 hub 的 poller,而非 MultiChannelPoller 内部的 poller。
on_poll_init, redis.py:333
register_with_event_loop, redis.py:1061
register_with_event_loop, connection.py:266
main, hub_receive.py:38
<module>, hub_receive.py:46
在 kombu.transport.redis.Transport 代码如下:
def register_with_event_loop(self, connection, loop):
cycle = self.cycle
cycle.on_poll_init(loop.poller) # 这里赋值。
cycle_poll_start = cycle.on_poll_start
add_reader = loop.add_reader
on_readable = self.on_readable
继续深入,看到进一步赋值:
def on_poll_init(self, poller):
self.poller = poller # 这里赋值
for channel in self._channels:
return channel.qos.restore_visible(
num=channel.unacked_restore_limit,
)
0x05 Forever in Hub
hub.run_forever() 主要作用是:
- 建立loop
- 因为Hub里面有Channel,有poll,所以现在就把Channel与poll联系起来,包括socket,socket的file等待。
- 进行poll,有消息就相应处理;
比如维护如下变量:
self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self.poller.register(sock, self.eventflags)
具体 run_forever 如下:
def run_forever(self):
self._running = True
try:
while 1:
try:
self.run_once()
except Stop:
break
finally:
self._running = False
于是又有调用如下,这里就进入了loop:
def run_once(self):
try:
next(self.loop)
except StopIteration:
self._loop = None
5.1 建立loop
next(self.loop)
继续调用,建立loop。这就是Hub的作用。
调用stack如下:
create_loop, hub.py:279
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55
简化版代码如下:
def create_loop(self, ...):
while 1:
todo = self._ready
self._ready = set()
for tick_callback in on_tick:
tick_callback() # 这里回调用户方法
for item in todo:
if item:
item()
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
if readers or writers:
to_consolidate = []
events = poll(poll_timeout) # 等待消息
for fd, event in events or ():
if fd in consolidate and \
writers.get(fd) is None:
to_consolidate.append(fd)
continue
cb = cbargs = None
if event & READ:
cb, cbargs = readers[fd] # 读取redis
elif event & WRITE:
cb, cbargs = writers[fd] # 处理redis
if isinstance(cb, generator):
try:
next(cb)
else:
cb(*cbargs) # 调用用户代码
if to_consolidate:
consolidate_callback(to_consolidate)
else:
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
yield
下面我们逐步分析。
0x06 启动Poll
循环最开始将启动 Poll。 tick_callback 的作用就是启动 Poll。就是建立一个机制,当 redis 有消息时候,得到通知。
while 1:
todo = self._ready
self._ready = set()
for tick_callback in on_tick:
tick_callback()
此时:tick_callback的数值为:<function Transport.register_with_event_loop.<locals>.on_poll_start >
,所以 tick_callback就调用到 Transport.register_with_event_loop.<locals>.on_poll_start
。
6.1 回顾如何注册回调
Transport方法如何注册,我们需要回顾,在前面代码这里会注册回调方法。
conn.register_with_event_loop(hub)
具体注册如下:
def register_with_event_loop(self, connection, loop):
cycle_poll_start = cycle.on_poll_start
add_reader = loop.add_reader
on_readable = self.on_readable
def _on_disconnect(connection):
if connection._sock:
loop.remove(connection._sock)
cycle._on_connection_disconnect = _on_disconnect
def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
loop.on_tick.add(on_poll_start)
on_poll_start就是在这里注册的,就是把 on_poll_start 注册到 hub 的 on_tick 回调之中。
loop.on_tick.add(on_poll_start)
所以前面的如下代码就调用到了 on_poll_start。
for tick_callback in on_tick:
tick_callback()
6.2 Transport启动
所以,我们回到on_poll_start。
def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
可以看到,有两部分代码:
- poll_start : 这部分是 把 Channel 对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作;
- add_reader :这部分是 把 poll 对应的 fd 添加到 MultiChannelPoller 这里,这样 MultiChannelPoller 就可以 打通
redis queue ----> Channel ---> socket ---> poll ---> fd ---> 读取 redis
这条通路了,就是如果 redis 有数据来了,MultiChannelPoller 就马上通过 poll 得到通知,就去 redis 读取;
让我们逐一看看。
6.3 poll_start in MultiChannelPoller
这里就是把Channel对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作。
此时代码进入到MultiChannelPoller,数据如下:
self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f84e7928940>
after_read = {set: 0} set()
eventflags = {int} 25
fds = {dict: 0} {}
poller = {_poll} <kombu.utils.eventio._poll object at 0x7f84e75f4d68>
可以看出来,此处就是针对channel来进行注册,把所有的channel注册到 poll上。
def on_poll_start(self):
for channel in self._channels:
if channel.active_queues: # BRPOP mode?
if channel.qos.can_consume():
self._register_BRPOP(channel)
if channel.active_fanout_queues: # LISTEN mode?
self._register_LISTEN(channel)
对于 redis 的使用,有两种方法:BRPOP mode 和 LISTEN mode。分别对应 list 和 subscribe。
6.3.1 _register_BRPOP
我们先来看看 _register_BRPOP
,这里做了两个判断,第一个是判断当前的 channel 是否放进了 epoll 模型里面,如果没有,那么就放进去;同时,如果之前这个 channel 不在 epoll 里面,那么这次放进去了,但是,这个 connection 还没有对 epoll 其效果,所以发送一个 _brpop_start
。
def _register_BRPOP(self, channel):
"""Enable BRPOP mode for channel."""
ident = channel, channel.client, 'BRPOP'
if not self._client_registered(channel, channel.client, 'BRPOP'):
channel._in_poll = False
self._register(*ident)
if not channel._in_poll: # send BRPOP
channel._brpop_start()
6.3.1.1 注册到MultiChannelPoller
一个 Connection 对应一个 Hub,它们之间的枢纽是 MultiChannelPoller
,它负责找出哪个 Channel 是可用的,这些 Channel 都是来自同一个 Connection。具体注册代码如下:
def _register(self, channel, client, type):
if (channel, client, type) in self._chan_to_sock:
self._unregister(channel, client, type)
if client.connection._sock is None: # not connected yet.
client.connection.connect()
sock = client.connection._sock
self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self.poller.register(sock, self.eventflags)
这里的client是Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
。
注意到这里client.connection._sock的数值是socket。
client.connection._sock = {socket} <socket.socket fd=8, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 52353), raddr=('127.0.0.1', 6379)>
family = {AddressFamily} AddressFamily.AF_INET
proto = {int} 6
timeout = {NoneType} None
type = {SocketKind} SocketKind.SOCK_STREAM
经过此阶段之后。
_fd_to_chan有意义,具体fd是 chanel 对应的 redis socket的fd。
def _register(self, channel, client, type):
if (channel, client, type) in self._chan_to_sock:
self._unregister(channel, client, type)
if client.connection._sock is None: # not connected yet.
client.connection.connect()
sock = client.connection._sock
self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self.poller.register(sock, self.eventflags)
这里就是把channel与自己对应的socket联系起来,也把channel与socket的file联系起来。
变量如下:
self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0>
after_read = {set: 0} set()
eventflags = {int} 25
fds = {dict: 1}
8 = {tuple: 2} (<kombu.transport.redis.Channel object at 0x7f9056a57278>, 'BRPOP')
__len__ = {int} 1
poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048>
这样,从 socket fd 可以找到 对应的 channel,也能从 channel 找到 对应的 socket fd 。
如下图:
+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| +---------------------------------------+ |
| |
| |
| |
+----------------------------------------------------------------------------------+
6.3.1.2 注册到Poll
继续处理register,就是把socket注册到poll
class _poll:
def __init__(self):
self._poller = xpoll()
self._quick_poll = self._poller.poll
self._quick_register = self._poller.register
self._quick_unregister = self._poller.unregister
def register(self, fd, events):
fd = fileno(fd)
poll_flags = 0
if events & ERR:
poll_flags |= POLLERR
if events & WRITE:
poll_flags |= POLLOUT
if events & READ:
poll_flags |= POLLIN
self._quick_register(fd, poll_flags)
return fd
此时如下,我们仅仅以 fd 3 为例:
下面就是 Channel ---> socket ---> poll ---> fd
这条通路。
+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| | + | |
| | | | |
| +---------------------------------------+ |
| | |
+----------------------------------------------------------------------------------+
|
|
v
poll with OS
6.3.1.3 _brpop_start
若这个 connection 还没有对 epoll 其效果,就发送一个 _brpop_start
。作用为选择下一次读取的queue。
_brpop_start如下:
def _brpop_start(self, timeout=1):
queues = self._queue_cycle.consume(len(self.active_queues))
if not queues:
return
keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] + [timeout or 0]
self._in_poll = self.client.connection
self.client.connection.send_command('BRPOP', *keys)
此时stack如下:
_register, redis.py:296
_register_BRPOP, redis.py:312
on_poll_start, redis.py:328
on_poll_start, redis.py:1072
create_loop, hub.py:294
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55
此时如下,现在我们有两条通路:
-
Channel ---> socket ---> poll ---> fd
这条通路; -
MultiChannelPoller ---> 读取 redis
这条通路; - 因为这个时候 下一次 读取的 queue 已经确定了,所以已经 打通
Redis queue ----> Channel ---> socket ---> poll ---> fd
这条通路了。
+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| connection | + | |
| + | | | |
| | +---------------------------------------+ |
| | | |
+----------------------------------------------------------------------------------+
| |
| |
v v
Redis Queue +-----------------> poll with OS
6.3.2 _register_LISTEN
本文没有相关部分,如果有topic 相关则会调用这里。Celery event 就利用了这种方法。
def _register_LISTEN(self, channel):
"""Enable LISTEN mode for channel."""
if not self._client_registered(channel, channel.subclient, 'LISTEN'):
channel._in_listen = False
self._register(channel, channel.subclient, 'LISTEN')
if not channel._in_listen:
channel._subscribe() # send SUBSCRIBE
注册如下:
_subscribe, redis.py:656
_register_LISTEN, redis.py:322
on_poll_start, redis.py:330
on_poll_start, redis.py:1072
create_loop, hub.py:294
asynloop, loops.py:81
start, consumer.py:592
start, bootsteps.py:116
start, consumer.py:311
start, bootsteps.py:365
start, bootsteps.py:116
start, worker.py:204
worker, worker.py:327
此时变量如下:
c = {PubSub} <redis.client.PubSub object at 0x7fb09e750400>
keys = {list: 1}
0 = {str} '/0.celery.pidbox'
self = {Channel} <kombu.transport.redis.Channel object at 0x7fb09e6c8c88>
6.4 注册 reader in MultiChannelPoller
上面可以看到,把所有的 channel 注册到 poll上,对所有的 queue 都发起了监听请求,也就是说任一个队列有消息过来,那么都会被响应到,那么响应给谁呢?需要看看 add_reader
这个函数做了啥:
就是说,前面那些注册到 poll,其实没有注册响应方法,现在需要注册。
复习下,add_reader 在 on_poll_start 这里。
def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
cycle.fds 具体是得到了所有fd。
@property
def fds(self):
return self._fd_to_chan
具体添加是在 Hub 类中。
- 这里会再次尝试添加。
- 然后会把 fd 与 callback 联系起来。
class Hub:
def add_reader(self, fds, callback, *args):
return self.add(fds, callback, READ | ERR, args)
def add(self, fd, callback, flags, args=(), consolidate=False):
fd = fileno(fd)
try:
self.poller.register(fd, flags)
except ValueError:
self._remove_from_loop(fd)
raise
else:
dest = self.readers if flags & READ else self.writers
if consolidate:
self.consolidate.add(fd)
dest[fd] = None
else:
dest[fd] = callback, args
注意,这里设置的是:hub 的成员变量,self.readers ,其在后续 poll 消息产生的就用到了,就调用这些callback,就是 Transport.on_readable。
readers = {dict: 1}
8 = {tuple: 2} (<bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>, (8,))
0 = {method} <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>
1 = {tuple: 1} 8
stack为:
register, eventio.py:187
add, hub.py:164
add_reader, hub.py:213
<listcomp>, redis.py:1073
on_poll_start, redis.py:1073
create_loop, hub.py:294
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55
所以此时为如下,依然不知道响应给谁:
+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| connection | + | |
| + | | | |
| | +---------------------------------------+ |
| | | |
+----------------------------------------------------------------------------------+
| |
| |
v v
Redis Queue +------------------> poll with OS
+---------------------------------------------------------------------------------+
| |
| Hub |
| +--------------------------------------+ |
| |fd 3 : [ Transport.on_readable, fd 3] | |
| | | |
| readers +------------------> | ...... | |
| | | |
| |fd 1 : [ Transport.on_readable, fd 1] | |
| +--------------------------------------+ |
| |
+---------------------------------------------------------------------------------+
因为这个流程十分复杂,为了简化,我们这里提前剧透,在 消费函数时候,Transport 会设置 自己的 _callbacks[queue] 为一个回调函数,所以 MultiChannelPoller 读取 queue 这部分也可以联系起来:
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback # 这里设置
self._consumers.add(consumer_tag)
self._reset_cycle()
6.5 启动timer
然后是启动poll的timer,定期做业务操作。
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
# print('[[[HUB]]]: %s' % (self.repr_active(),))
if readers or writers:
to_consolidate = []
try:
events = poll(poll_timeout)
# print('[EVENTS]: %s' % (self.repr_events(events),))
except ValueError: # Issue celery/#882
return
6.6 poll
然后是进行poll,若对应的file有消息,就处理(读取redis中的内容),然后进行下一次poll。
对于我们例子,下面简略版代码就是进行Poll:
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
if readers or writers:
to_consolidate = []
try:
events = poll(poll_timeout)
except ValueError: # Issue celery/#882
return
for fd, event in events or ():
cb = cbargs = None
if event & READ:
try:
cb, cbargs = readers[fd]
elif event & WRITE:
try:
cb, cbargs = writers[fd]
if isinstance(cb, generator):
next(cb)
else:
try:
cb(*cbargs)
except Empty:
pass
else:
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
yield
6.6.1 poll方法
具体的poll方法如下,就是调用系统的方法来进行poll:
def poll(self, timeout, round=math.ceil,
POLLIN=POLLIN, POLLOUT=POLLOUT, POLLERR=POLLERR,
READ=READ, WRITE=WRITE, ERR=ERR, Integral=Integral):
timeout = 0 if timeout and timeout < 0 else round((timeout or 0) * 1e3)
event_list = self._quick_poll(timeout)
ready = []
for fd, event in event_list:
events = 0
if event & POLLIN:
events |= READ
if event & POLLOUT:
events |= WRITE
if event & POLLERR or event & POLLNVAL or event & POLLHUP:
events |= ERR
assert events
if not isinstance(fd, Integral):
fd = fd.fileno()
ready.append((fd, events))
return ready
6.6.2 callback
在 create_loop 代码中可以看到
def create_loop(self,
generator=generator, sleep=sleep, min=min, next=next,
Empty=Empty, StopIteration=StopIteration,
KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
readers, writers = self.readers, self.writers
cb, cbargs = readers[fd]
cb(*cbargs)
这就是说,poll回调的时候,会调用reader中对应fd的回调函数来处理。
readers就是在之前 6.4 那节 设定的。
其内容是,就是 8 这个fd 对应的回调函数是Transport.on_readable:
readers = {dict: 1}
8 = {tuple: 2} (<bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7ffe7482ddd8>>, (8,))
0 = {method} <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7ffe7482ddd8>>
1 = {tuple: 1} 8
__len__ = {int} 2
因此回调到<kombu.transport.redis.Transport object at 0x7ffe7482ddd8>。
def on_readable(self, fileno):
"""Handle AIO event for one of our file descriptors."""
self.cycle.on_readable(fileno)
进而调用到
<kombu.transport.redis.MultiChannelPoller object at 0x7faee4166d68>
def on_readable(self, fileno):
chan, type = self._fd_to_chan[fileno]
if chan.qos.can_consume():
chan.handlers[type]()
从 socket fd 可以找到 对应的 channel,也能从 channel 找到 对应的 socket fd 。从 channel 找到 channel 的 callback。
对应 self._fd_to_chan[fileno],取出 socket fd 对应 callback,进行处理。这里的callback如下:
handlers = {dict: 2}
'BRPOP' = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7faee418dfd0>>
'LISTEN' = {method} <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7faee418dfd0>>
于是调用 Channel._brpop_read 或者 Channel._receive 从redis 中 读取消息。
具体调用堆栈如下:
_brpop_read, redis.py:734
on_readable, redis.py:358
on_readable, redis.py:1087
create_loop, hub.py:361
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55
逻辑如下:
+--------------+ socket
| redis | <------------> port +--> fd +--->+ +---> channel +--> handlers 'BRPOP' = Channel._brpop_read
| | | | 'LISTEN' = Channel._receive
| | socket | |
| | <------------> port +--> fd +--->---> _fd_to_chan +-------> channel +--> handlers 'BRPOP' = Channel._brpop_read
| port=6379 | | | 'LISTEN' = Channel._receive
| | socket | |
| | <------------> port +--> fd +--->+ +---> channel +--> handlers 'BRPOP' = Channel._brpop_read
+--------------+ 'LISTEN' = Channel._receive
此时手机为:
如果加入poll,则如下:
+---------------------------------------------------------------------------------------------------------------------------------------+
| +--------------+ 6 parse_response |
| +--> | Linux Kernel | +---+ |
| | +--------------+ | |
| | | |
| | | event |
| | 1 | |
| | | 2 |
| | | |
+-------+---+ socket + | |
| redis | <------------> port +--> fd +--->+ v |
| | | +------+--------+ |
| | socket | | Hub | |
| | <------------> port +--> fd +--->----------> | | |
| port=6379 | | | | |
| | socket | | readers +-----> Transport.on_readable |
| | <------------> port +--> fd +--->+ | | + |
+-----------+ +---------------+ | |
| |
3 | |
+----------------------------------------------------------------------------------------+ |
| v
| _receive_callback
| 5 +-------------+ +-----------+
+------------+------+ +-------------------------+ 'BRPOP' = Channel._brpop_read +-----> | Channel | +------------------> | Consumer |
| Transport | | MultiChannelPoller | +---> channel +--> handlers 'LISTEN' = Channel._receive +-------------+ +---+-------+
| | | | | |
| | on_readable(fileno) | | | ^ |
| cycle +---------------------> | _fd_to_chan +-------------> channel +--> handlers 'BRPOP' = Channel._brpop_read | |
| | 4 | | | 'LISTEN' = Channel._receive | |
| _callbacks[queue]| | | | | on_m |
| + | +-------------------------+ +---> channel +--> handlers 'BRPOP' = Channel._brpop_read | |
+-------------------+ 'LISTEN' = Channel._receive | |
| | v
| 7 _callback |
+-----------------------------------------------------------------------------------------------------------------------------------------+ User Function
此时手机为:
0x07 接收消息
现在消息已经被放置于redis 队列中,那么消息又被如何使用呢?
从上节得知,当poll提示有消息时候,会通过 Channel._brpop_read 或者 Channel._receive 从 redis 中 读取消息。
具体堆栈如下:
_brpop_read, redis.py:734
on_readable, redis.py:358
on_readable, redis.py:1087
create_loop, hub.py:361
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55
即:在 hub 的 loop中,通过 redis 驱动代码 从 redis 队列中取出消息,然后调用Transport
传递过来的_deliver
方法,最后调用userfunction。
def _brpop_read(self, **options):
try:
try:
dest__item = self.client.parse_response(self.client.connection,
'BRPOP',
**options)
except self.connection_errors:
# if there's a ConnectionError, disconnect so the next
# iteration will reconnect automatically.
self.client.connection.disconnect()
raise
if dest__item:
dest, item = dest__item
dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
self._queue_cycle.rotate(dest)
self.connection._deliver(loads(bytes_to_str(item)), dest) #调用用户function
return True
else:
raise Empty()
finally:
self._in_poll = None
7.1 从驱动读取
7.1.1 从redis读取
这里会从redis驱动读取,文件是 redis/connection.py,具体就是通过 SocketBuffer 类从 redis 对应的 socket 读取。代码为:
def readline(self):
buf = self._buffer
buf.seek(self.bytes_read)
data = buf.readline()
while not data.endswith(SYM_CRLF):
# there's more data in the socket that we need
self._read_from_socket()
buf.seek(self.bytes_read)
data = buf.readline()
self.bytes_read += len(data)
# purge the buffer when we've consumed it all so it doesn't
# grow forever
if self.bytes_read == self.bytes_written:
self.purge()
return data[:-2]
当读到 response 之后,调用 Redis驱动中对应命令的 回调方法来处理。此处命令为BRPOP。回调方法为:string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None)
。
代码为:
def parse_response(self, connection, command_name, **options):
"Parses a response from the Redis server"
try:
response = connection.read_response()
except ResponseError:
if EMPTY_RESPONSE in options:
return options[EMPTY_RESPONSE]
raise
if command_name in self.response_callbacks:
return self.response_callbacks[command_name](response, **options)
return response
此时上下文相关变量为:
command_name = {str} 'BRPOP'
connection = {Connection} Connection<host=localhost,port=6379,db=0>
options = {dict: 0} {}
self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
connection = {Connection} Connection<host=localhost,port=6379,db=0>
connection_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
response_callbacks = {CaseInsensitiveDict: 179} {.
'AUTH' = {type} <class 'bool'>
'EXPIRE' = {type} <class 'bool'>
.....
'LLEN' = {type} <class 'int'>
'LPUSHX' = {type} <class 'int'>
'PFADD' = {type} <class 'int'>
'PFCOUNT' = {type} <class 'int'>
......
'SWAPDB' = {function} <function bool_ok at 0x7fbad4276620>
'WATCH' = {function} <function bool_ok at 0x7fbad4276620>
'UNWATCH' = {function} <function bool_ok at 0x7fbad4276620>
'BLPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
'BRPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
....
这些代码堆栈如下:
readline, connection.py:251
read_response, connection.py:324
read_response, connection.py:739
parse_response, client.py:915
_brpop_read, redis.py:738
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, testUb.py:50
<module>, testUb.py:53
7.2 分发消息
loop从驱动得到消息之后,进行 deliver 分发。
self.connection._deliver(loads(bytes_to_str(item)), dest)
所做的事情是根据队列取出注册到此队列的回调函数列表,然后对消息执行列表中的所有回调函数。
def _deliver(self, message, queue):
try:
callback = self._callbacks[queue]
except KeyError:
logger.warning(W_NO_CONSUMERS, queue)
self._reject_inbound_message(message)
else:
callback(message)
7.2.1 找到callback
此时 self是
<kombu.transport.redis.Transport object at 0x7faee4128f98>
callback如下:
self._callbacks = {dict: 1}
'asynt_queue' = {function} <function Channel.basic_consume.<locals>._callback at 0x7faee244a2f0>
这里意味着 asynt_queue 这个 queue 对应的 callback 是 Channel.basic_consume。
7.2.2 何时设定callback
调用的 callback 是 Channel 这里定义的。basic_consume就是把传入的参数 callback 数值,实际这个传入的参数 callback就是 Consumer. _receive_callback。
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback
self._consumers.add(consumer_tag)
self._reset_cycle()
设置是在上面函数里面这句,
self.connection._callbacks[queue] = _callback
stack如下:
basic_consume, base.py:632
basic_consume, redis.py:598
consume, entity.py:738
_basic_consume, messaging.py:594
consume, messaging.py:473
__enter__, messaging.py:430
main, testUb.py:46
<module>, testUb.py:55
7.2.3 调用到用户方法
Consumer的函数定义如下:
def _receive_callback(self, message):
accept = self.accept
on_m, channel, decoded = self.on_message, self.channel, None
try:
m2p = getattr(channel, 'message_to_python', None)
if m2p:
message = m2p(message)
if accept is not None:
message.accept = accept
if message.errors:
return message._reraise_error(self.on_decode_error)
decoded = None if on_m else message.decode()
except Exception as exc:
if not self.on_decode_error:
raise
self.on_decode_error(message, exc)
else:
return on_m(message) if on_m else self.receive(decoded, message)
self.on_message就是用户方法,所以最终调用到用户方法。
on_message, testUb.py:36
_receive_callback, messaging.py:620
_callback, base.py:630
_deliver, base.py:980
_brpop_read, redis.py:748
on_readable, redis.py:358
on_readable, redis.py:1087
create_loop, hub.py:361
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55
此时如下:
+----------------------+ +-------------------+
| Producer | | Channel |
| | | | +-----------------------------------------------------------+
| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
| channel +------------------> | | +-----------------------------------------------------------+
| | | pool |
| exchange | +---------> | | <------------------------------------------------------------+
| | | | | |
| connection | | +----> | connection +---------------+ |
| + | | | | | | |
| | | | | +-------------------+ +----------------------------------------------------------------+
+--+-------------------+ | | | v | |
| | | | +-------------------+ | +---+-----------------+ +--------------------+ | |
| | | | | Connection | | | redis.Transport | | MultiChannelPoller | | |
| +----------------------> | | | | | | | | |
| | | | _sock <----------+ | | | _channels +--------+ |
| | | | | | cycle +------------> | _fd_to_chan | |
| | | | transport +---------> | | | _chan_to_sock+-----------+
| +-------->+ | | | | | +------+ poller |
| | | +-------------------+ +---------------------+ | | after_read |
| | | | | |
| | | | +--------------------+
| | | +------------------+ +---------------+
| | | | Hub | |
| | | | | v
| | | | | +------+------+
| | | | poller +---------------> | _poll |
| publish | | | | | | +-------+
+--------------------------------+ | | | _poller+---------> | poll |
| | | +------------------+ | | +-------+
| | | +-------------+
+-------------------+ | +-----> +----------------+
| Queue | | | | Exchange |
| _channel | +---------+ | |
| | | |
| exchange +--------------------> | channel |
| | | |
| | | |
+-------------------+ +----------------+
手机如下:
动态逻辑如下:
+-----+ +-----------+ +--------------------+ +---------+ +-------+ +--------+
| Hub | | Transport | | MultiChannelPoller | | _poll | |Channel| |Consumer|
+--+--+ +----+------+ +------------+-------+ +----+----+ +---+---+ +------+-+
| | | | | |
v | | | | |
create_loop | | | | |
+ | | | | |
| on_poll_start | | | | |
| | | | | |
| +---------------> | on_poll_start | | | |
| | | | | |
| | +--------------------> | | | |
| | | | | |
| | _register_BRPOP | | |
| | | | | |
| add_reader | register | | |
| + | +----------> | | |
| | register | | | |
fire_timers | +------------------------------------> | | |
| | | | | |
| poll | | | | |
| +--------------------------------------------------------> | | |
| | | | | |
| | | | | |
+ | | | | |
for fd, event in events | | | | |
| | | | | |
| | | | | |
cb, cbargs = readers[fd]| | | | |
+ | | | | |
| | | | | |
| | | | | |
cb(*cbargs | | | | |
+ | | | | |
| on_readable | | | | |
| | | | | |
| +--------------> | on_readable | | | |
| | | | | |
| +-----------------------> | | | |
| | | | | |
| | | | | |
| | chan, type = _fd_to_chan[fileno]| | |
| | | | | |
| | | _brpop_read | | |
| | | | | |
| | | +---------------------> | |
| | _deliver | | | |
| | | | | |
| | <----------------------------------------------+ | |
| | | | | |
| | | | | |
| | _callback | | | |
| | | | | |
| | +----------------------------------------------> | |
| | | | + +
| | | | _receive_callback
| | | | | +
| | | | | +--------->+
| | | | | |
v v v v v v
手机如下: