本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 是如何启动,以及如何搭建一个基本的架子。
[源码分析] 消息队列 Kombu 之 启动过程
0x00 摘要
本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 是如何启动,以及如何搭建一个基本的架子。
因为之前有一个综述,所以大家会发现,一些概念讲解文字会同时出现在后续文章和综述之中。
0x01 示例
下面使用如下代码来进行说明。
本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。
def main(arguments): hub = Hub() exchange = Exchange('asynt_exchange') queue = Queue('asynt_queue', exchange, 'asynt_routing_key') def send_message(conn): producer = Producer(conn) producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key') print('message sent') def on_message(message): print('received: {0!r}'.format(message.body)) message.ack() # hub.stop() # <-- exit after one message conn = Connection('redis://localhost:6379') conn.register_with_event_loop(hub) def p_message(): print(' kombu ') with Consumer(conn, [queue], on_message=on_message): send_message(conn) hub.timer.call_repeatedly(3, p_message) hub.run_forever() if __name__ == '__main__': sys.exit(main(sys.argv[1:]))
0x02 启动
让我们顺着程序流程看看Kombu都做了些什么,也可以对 Kombu 内部有所了解。
本文关注的重点是:Connection,Channel 和 Hub 是如何联系在一起的。
2.1 Hub
在程序开始,我们建立了Hub。
Hub的作用是建立消息Loop,但是此时尚未建立,因此只是一个静态实例。
hub = Hub()
其定义如下:
class Hub: """Event loop object. Arguments: timer (kombu.asynchronous.Timer): Specify custom timer instance. """ def __init__(self, timer=None): self.timer = timer if timer is not None else Timer() self.readers = {} self.writers = {} self.on_tick = set() self.on_close = set() self._ready = set() self._running = False self._loop = None self.consolidate = set() self.consolidate_callback = None self.propagate_errors = () self._create_poller()
因为此时没有建立loop,所以目前重要的步骤是建立Poll,其Stack如下:
_get_poller, eventio.py:321 poll, eventio.py:328 _create_poller, hub.py:113 __init__, hub.py:96 main, testUb.py:22, testUb.py:55
在eventio.py中有如下,我们可以看到Kombu可以使用多种模型来进行内核消息处理:
def _get_poller(): if detect_environment() != 'default': # greenlet return _select elif epoll: # Py2.6+ Linux return _epoll elif kqueue and 'netbsd' in sys.platform: return _kqueue elif xpoll: return _poll else: return _select
因为本机情况,这里选择的是:_poll。
+------------------+ | Hub | | | | | +-------------+ | poller +---------------> | _poll | | | | | +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+
2.2 Exchange与Queue
其次建立了Exchange与Queue。
- Exchange:交换机,消息发送者将消息发至 Exchange,Exchange 负责将消息分发至 Queue;
- Queue:消息队列,存储着即将被应用消费掉的消息,Exchange 负责将消息分发 Queue,消费者从 Queue 接收消息;
因为此时也没有具体消息,所以我们暂且无法探究Exchange机制。
exchange = Exchange('asynt') queue = Queue('asynt', exchange, 'asynt')
此时将把Exchange与Queue联系起来。图示如下:
+------------------+ | Hub | | | | | +-------------+ | poller +---------------> | _poll | | | | | +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+ +----------------+ +-------------------+ | Exchange | | Queue | | | | | | | | | | channel | <------------+ exchange | | | | | | | | | +----------------+ +-------------------+
2.3 Connection
第三步就是建立Connection。
Connection是对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接。现在就是对'redis://localhost:6379'连接进行抽象。
conn = Connection('redis://localhost:6379')
2.3.1 定义
由定义注释可知,Connection是到broker的连接。从具体代码可以看出,Connection更接近是一个逻辑概念,具体功能都委托给别人完成。
消息从来不直接发送给队列,甚至 Producers 都可能不知道队列的存在。 Producer如何才能将消息发送给Consumer呢?这中间需要经过 Message Broker 的处理和传递。
AMQP中,承担 Message Broker 功能的就是 AMQP Server。也正是从这个角度讲,AMQP 的 Producer 和Consumer 都是 AMQP Client。
在Kombu 体系中,用 transport 对所有的 broker 进行了抽象,为不同的 broker 提供了一致的解决方案。通过Kombu,开发者可以根据实际需求灵活的选择或更换broker。
Connection主要成员变量是,但是此时没有赋值:
- _connection:
- _transport:就是上面提到的对 broker 的抽象。
- cycle:与broker交互的调度策略。
- failover_strategy:在连接失效时,选取其他hosts的策略。
- heartbeat:用来实施心跳。
代码如下:
class Connection: """A connection to the broker""" port = None virtual_host = '/' connect_timeout = 5 _connection = None _default_channel = None _transport = None uri_prefix = None #: The cache of declared entities is per connection, #: in case the server loses data. declared_entities = None #: Iterator returning the next broker URL to try in the event #: of connection failure (initialized by :attr:`failover_strategy`). cycle = None #: Additional transport specific options, #: passed on to the transport instance. transport_options = None #: Strategy used to select new hosts when reconnecting after connection #: failure. One of "round-robin", "shuffle" or any custom iterator #: constantly yielding new URLs to try. failover_strategy = 'round-robin' #: Heartbeat value, currently only supported by the py-amqp transport. heartbeat = None resolve_aliases = resolve_aliases failover_strategies = failover_strategies hostname = userid = password = ssl = login_method = None
2.3.2 init 与 transport
Connection内部主要任务是建立了transport。
Stack大致如下:
Transport, redis.py:1039, redis.py:1031 import_module, __init__.py:126 symbol_by_name, imports.py:56 resolve_transport, __init__.py:70 get_transport_cls, __init__.py:85 __init__, connection.py:183 main, testUb.py:40, testUb.py:55
2.4 Transport
在Kombu体系中,用transport对所有的broker进行了抽象,为不同的broker提供了一致的解决方案。通过Kombu,开发者可以根据实际需求灵活的选择或更换broker。
Transport:真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例。就是存储和发送消息的实体,用来区分底层消息队列是用amqp、Redis还是其它实现的。
Transport负责具体操作,但是很多操作移交给 loop 与 MultiChannelPoller 进行。
2.4.1 定义
其主要成员变量为:
- 本transport的驱动类型,名字;
- 对应的 Channel;
- cycle:MultiChannelPoller,具体下文提到;
定义如下:
class Transport(virtual.Transport): """Redis Transport.""" Channel = Channel polling_interval = None # disable sleep between unsuccessful polls. default_port = DEFAULT_PORT driver_type = 'redis' driver_name = 'redis' implements = virtual.Transport.implements.extend( asynchronous=True, exchange_type=frozenset(['direct', 'topic', 'fanout']) ) def __init__(self, *args, **kwargs): if redis is None: raise ImportError('Missing redis library (pip install redis)') super().__init__(*args, **kwargs) # Get redis-py exceptions. self.connection_errors, self.channel_errors = self._get_errors() # All channels share the same poller. self.cycle = MultiChannelPoller()
2.4.2 移交操作
Transport负责具体操作,但是很多操作移交给 loop 与 MultiChannelPoller 进行,具体从下面代码可见。
def register_with_event_loop(self, connection, loop): cycle = self.cycle cycle.on_poll_init(loop.poller) cycle_poll_start = cycle.on_poll_start add_reader = loop.add_reader on_readable = self.on_readable def _on_disconnect(connection): if connection._sock: loop.remove(connection._sock) cycle._on_connection_disconnect = _on_disconnect def on_poll_start(): cycle_poll_start() [add_reader(fd, on_readable, fd) for fd in cycle.fds] loop.on_tick.add(on_poll_start) loop.call_repeatedly(10, cycle.maybe_restore_messages) health_check_interval = connection.client.transport_options.get( 'health_check_interval', DEFAULT_HEALTH_CHECK_INTERVAL ) loop.call_repeatedly( health_check_interval, cycle.maybe_check_subclient_health )
其中重点是MultiChannelPoller。一个Connection有一个Transport, 一个Transport有一个MultiChannelPoller,对poll操作都是由MultiChannelPoller完成,redis操作由channel完成。
2.4.3 MultiChannelPoller
定义如下,可以理解为执行engine,主要作用是:
- 收集channel;
- 建立fd到channel的映射;
- 建立channel到socks的映射;
- 使用poll;
class MultiChannelPoller: """Async I/O poller for Redis transport.""" eventflags = READ | ERR def __init__(self): # active channels self._channels = set() # file descriptor -> channel map. self._fd_to_chan = {} # channel -> socket map self._chan_to_sock = {} # poll implementation (epoll/kqueue/select) self.poller = poll() # one-shot callbacks called after reading from socket. self.after_read = set()
2.4.4 获取
Transport是预先生成的,若需要,则依据名字取得。
TRANSPORT_ALIASES = { 'amqp': 'kombu.transport.pyamqp:Transport', 'amqps': 'kombu.transport.pyamqp:SSLTransport', 'pyamqp': 'kombu.transport.pyamqp:Transport', 'librabbitmq': 'kombu.transport.librabbitmq:Transport', 'memory': 'kombu.transport.memory:Transport', 'redis': 'kombu.transport.redis:Transport', ...... 'pyro': 'kombu.transport.pyro:Transport' } _transport_cache = {} def resolve_transport(transport=None): """Get transport by name. """ if isinstance(transport, str): try: transport = TRANSPORT_ALIASES[transport] except KeyError: if '.' not in transport and ':' not in transport: from kombu.utils.text import fmatch_best alt = fmatch_best(transport, TRANSPORT_ALIASES) else: if callable(transport): transport = transport() return symbol_by_name(transport) return transport def get_transport_cls(transport=None): """Get transport class by name. """ if transport not in _transport_cache: _transport_cache[transport] = resolve_transport(transport) return _transport_cache[transport]
此时Connection数据如下,注意其部分成员变量尚且没有意义:
conn = {Connection}alt = {list: 0} [] connect_timeout = {int} 5 connection = {Transport}cycle = {NoneType} None declared_entities = {set: 0} set() default_channel = {Channel}failover_strategies = {dict: 2} {'round-robin':, 'shuffle':} failover_strategy = {type}heartbeat = {int} 0 host = {str} 'localhost:6379' hostname = {str} 'localhost' manager = {Management}port = {int} 6379 recoverable_channel_errors = {tuple: 0} () resolve_aliases = {dict: 2} {'pyamqp': 'amqp', 'librabbitmq': 'amqp'} transport = {Transport}transport_cls = {str} 'redis' uri_prefix = {NoneType} None userid = {NoneType} None virtual_host = {str} '/'
至此,Kombu的基本就建立完成,但是彼此之间没有建立逻辑联系。
所以此时示例如下,注意此时三者没有联系:
+-------------------+ +---------------------+ +--------------------+ | Connection | | redis.Transport | | MultiChannelPoller | | | | | | | | | | | | _channels | | | | cycle +------------> | _fd_to_chan | | transport +---------> | | | _chan_to_sock | | | | | | poller | +-------------------+ +---------------------+ | after_read | | | +--------------------+ +------------------+ | Hub | | | | | +-------------+ | poller +---------------> | _poll | | | | | +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+ +----------------+ +-------------------+ | Exchange | | Queue | | | | | | | | | | channel | <------------+ exchange | | | | | | | | | +----------------+ +-------------------+
0x03 Connection注册hub
之前我们提到,基本架子已经建立起来,但是各个模块之间彼此没有联系,下面我们就看看如何建立联系。
示例代码来到:
conn.register_with_event_loop(hub)
这里进行了注册,此时作用是把hub与Connection联系起来。随之调用到:
def register_with_event_loop(self, loop): self.transport.register_with_event_loop(self.connection, loop)
进而调用到transport类:
具体代码如下:
def register_with_event_loop(self, connection, loop): cycle = self.cycle cycle.on_poll_init(loop.poller)# 这里建立联系,loop就是hub cycle_poll_start = cycle.on_poll_start add_reader = loop.add_reader on_readable = self.on_readable def _on_disconnect(connection): if connection._sock: loop.remove(connection._sock) cycle._on_connection_disconnect = _on_disconnect def on_poll_start(): cycle_poll_start() [add_reader(fd, on_readable, fd) for fd in cycle.fds] loop.on_tick.add(on_poll_start) loop.call_repeatedly(10, cycle.maybe_restore_messages) health_check_interval = connection.client.transport_options.get( 'health_check_interval', DEFAULT_HEALTH_CHECK_INTERVAL ) loop.call_repeatedly( health_check_interval, cycle.maybe_check_subclient_health )
3.1 建立Channel
注册最初是建立Channel。这里有一个连接的动作,就是在这里,建立了Channel。
@property def connection(self): """The underlying connection object""" if not self._closed: if not self.connected: return self._ensure_connection( max_retries=1, reraise_as_library_errors=False ) return self._connection
具体建立是在 base.py 中完成,这是 Transport 基类。Stack 如下:
create_channel, base.py:920 establish_connection, base.py:938 _establish_connection, connection.py:801 _connection_factory, connection.py:866 retry_over_time, functional.py:325 _ensure_connection, connection.py:439 connection, connection.py:859 register_with_event_loop, connection.py:266 main, testUb.py:41, testUb.py:55
3.2 Channel
Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接。就是真正的连接。
可以认为是 redis 操作和连接的封装。每个 Channel 都可以与 redis 建立一个连接,在此连接之上对 redis 进行操作,每个连接都有一个 socket,每个 socket 都有一个 file,从这个 file 可以进行 poll。
为了更好的说明,我们提前给出这个通讯流程大约如下:
+---------------------------------------------------------------------------------------------------------------------------------------+ | +--------------+ 6 parse_response | | +--> | Linux Kernel | +---+ | | | +--------------+ | | | | | | | | | event | | | 1 | | | | | 2 | | | | | +-------+---+ socket + | | | redis | port +--> fd +--->+ v | | | | +------+--------+ | | | socket | | Hub | | | | port +--> fd +--->----------> | | | | port=6379 | | | | | | | socket | | readers +-----> Transport.on_readable | | | port +--> fd +--->+ | | + | +-----------+ +---------------+ | | | | 3 | | +----------------------------------------------------------------------------------------+ | | v | _receive_callback | 5 +-------------+ +-----------+ +------------+------+ +-------------------------+ 'BRPOP' = Channel._brpop_read +-----> | Channel | +------------------> | Consumer | | Transport | | MultiChannelPoller | +------> channel . handlers 'LISTEN' = Channel._receive +-------------+ +---+-------+ | | | | | 8 | | | on_readable(fileno) | | | ^ | | cycle +---------------------> | _fd_to_chan +----------------> channel . handlers 'BRPOP' = Channel._brpop_read | | | | 4 | | | 'LISTEN' = Channel._receive | | | _callbacks[queue]| | | | | on_m | 9 | + | +-------------------------+ +------> channel . handlers 'BRPOP' = Channel._brpop_read | | +-------------------+ 'LISTEN' = Channel._receive | | | | v | 7 _callback | +-----------------------------------------------------------------------------------------------------------------------------------------+ User Function
手机上如下:
3.2.1 定义
Channel 主要成员是:
- async_pool :redis异步连接池;
- pool :redis连接池;
- channel_id :Channel ID;
- client :就是StrictRedis之类的driver;
- connection :对应的Transport;
- cycle = {FairCycle}
- queue_order_strategy :获取queue的策略;
- state :BrokerState状态;
- subclient :PubSub所用的client;
keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX) :bing用到的key;
比如_get_client可以看出来client。
def _get_client(self): if redis.VERSION < (3, 2, 0): raise VersionMismatch( 'Redis transport requires redis-py versions 3.2.0 or later. ' 'You have {0.__version__}'.format(redis)) return redis.StrictRedis
简化版定义如下:
class Channel(virtual.Channel): """Redis Channel.""" QoS = QoS _client = None _subclient = None keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX) keyprefix_fanout = '/{db}.' sep = '\x06\x16' _fanout_queues = {} unacked_key = '{p}unacked'.format(p=KEY_PREFIX) unacked_index_key = '{p}unacked_index'.format(p=KEY_PREFIX) unacked_mutex_key = '{p}unacked_mutex'.format(p=KEY_PREFIX) unacked_mutex_expire = 300 # 5 minutes unacked_restore_limit = None visibility_timeout = 3600 # 1 hour max_connections = 10 queue_order_strategy = 'round_robin' _async_pool = None _pool = None from_transport_options = ( virtual.Channel.from_transport_options + ('sep', 'ack_emulation', 'unacked_key', ...... 'max_connections', 'health_check_interval', 'retry_on_timeout', 'priority_steps') # <-- do not add comma here! ) connection_class = redis.Connection if redis else None
3.2.2 基类
基类定义如下:
class Channel(AbstractChannel, base.StdChannel): """Virtual channel. Arguments: connection (ConnectionT): The transport instance this channel is part of. """ #: message class used. Message = Message #: QoS class used. QoS = QoS #: flag to restore unacked messages when channel #: goes out of scope. do_restore = True #: mapping of exchange types and corresponding classes. exchange_types = dict(STANDARD_EXCHANGE_TYPES) #: flag set if the channel supports fanout exchanges. supports_fanout = False #: Binary ASCII codecs. codecs = {'base64': Base64()} #: Default body encoding. #: NOTE: ``transport_options['body_encoding']`` will override this value. body_encoding = 'base64' #: counter used to generate delivery tags for this channel. _delivery_tags = count(1) #: Optional queue where messages with no route is delivered. #: Set by ``transport_options['deadletter_queue']``. deadletter_queue = None # List of options to transfer from :attr:`transport_options`. from_transport_options = ('body_encoding', 'deadletter_queue') # Priority defaults default_priority = 0 min_priority = 0 max_priority = 9
最终具体举例如下:
self = {Channel}Client = {type}Message = {type}QoS = {type}active_fanout_queues = {set: 0} set() active_queues = {set: 0} set() async_pool = {ConnectionPool} ConnectionPool<Connection> auto_delete_queues = {set: 0} set() channel_id = {int} 1 client = {Redis} Redis<ConnectionPool<Connection>> codecs = {dict: 1} {'base64':} connection = {Transport}connection_class = {type}cycle = {FairCycle}deadletter_queue = {NoneType} None exchange_types = {dict: 3} {'direct':, 'topic':, handlers = {dict: 2} {'BRPOP':<bound method Channel._brpop_read of >, 'LISTEN':<bound method Channel._receive of >} pool = {ConnectionPool} ConnectionPool<Connection> qos = {QoS}queue_order_strategy = {str} 'round_robin' state = {BrokerState}subclient = {PubSub}
3.2.3 redis消息回调函数
关于上面成员变量,这里需要说明的是
handlers = {dict: 2} { 'BRPOP':<bound method Channel._brpop_read of >, 'LISTEN':<bound method Channel._receive of > }
这是redis有消息时的回调函数,即:
- BPROP 有消息时候,调用 Channel._brpop_read;
- LISTEN 有消息时候,调用 Channel._receive;
3.2.4 Redis 直接相关的主要成员
与Redis 直接相关的成员定义在:redis/client.py。
与 Redis 直接相关的主要成员是如下,会利用如下变量进行具体 redis操作:
- async_pool :redis异步连接池;
- pool :redis连接池;
- client :就是StrictRedis之类的driver;
- subclient :PubSub所用的client;
分别对应如下类型:
channel = {Channel}Client = {type}async_pool = {ConnectionPool} ConnectionPool<Connection> client = {Redis} Redis<ConnectionPool<Connection>> connection = {Transport}connection_class = {type}connection_class_ssl = {type}pool = {ConnectionPool} ConnectionPool<Connection> subclient = {PubSub}
具体代码如下:
def _create_client(self, asynchronous=False): if asynchronous: return self.Client(connection_pool=self.async_pool) return self.Client(connection_pool=self.pool) def _get_pool(self, asynchronous=False): params = self._connparams(asynchronous=asynchronous) self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db']) return redis.ConnectionPool(**params) def _get_client(self): if redis.VERSION < (3, 2, 0): raise VersionMismatch( 'Redis transport requires redis-py versions 3.2.0 or later. ' 'You have {0.__version__}'.format(redis)) return redis.StrictRedis @property def pool(self): if self._pool is None: self._pool = self._get_pool() return self._pool @property def async_pool(self): if self._async_pool is None: self._async_pool = self._get_pool(asynchronous=True) return self._async_pool @cached_property def client(self): """Client used to publish messages, BRPOP etc.""" return self._create_client(asynchronous=True) @cached_property def subclient(self): """Pub/Sub connection used to consume fanout queues.""" client = self._create_client(asynchronous=True) return client.pubsub()
因为添加了Channel,所以此时如下:
+-----------------+ | Channel | | | +-----------------------------------------------------------+ | client +---------> | Redis<ConnectionPool<Connection| | | +-----------------------------------------------------------+ | | | | +---------------------------------------------------+-+ | pool +----------> |ConnectionPool<Connection| | | +---------------------------------------------------+-+ | | | | | | | connection | | | +-----------------+ +-------------------+ +---------------------+ +--------------------+ | Connection | | redis.Transport | | MultiChannelPoller | | | | | | | | | | | | _channels | | | | cycle +------------> | _fd_to_chan | | transport +---------> | | | _chan_to_sock | | | | | | poller | +-------------------+ +---------------------+ | after_read | | | +------------------+ +--------------------+ | Hub | | | | | +-------------+ | poller +---------------> | _poll | | | | | +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+ +----------------+ +-------------------+ | Exchange | | Queue | | | | | | | | | | channel | <------------+ exchange | | | | | | | | | +----------------+ +-------------------+
3.3 channel 与 Connection 联系
讲到这里,基本道理大家都懂,但是具体两者之间如何联系,我们需要再剖析下。
3.3.1 从Connection得到channel
在Connection定义中有如下,原来 Connection 是通过 transport 来得到 channel:
def channel(self): """Create and return a new channel.""" self._debug('create channel') chan = self.transport.create_channel(self.connection) return chan
3.3.2 Transport具体创建
在Transport之中有:
def create_channel(self, connection): try: return self._avail_channels.pop() except IndexError: channel = self.Channel(connection) self.channels.append(channel) return channel
原来在 Transport 有两个channels 列表:
self._avail_channels self.channels
如果_avail_channels 有内容则直接获取,否则生成一个新的Channel。
在真正连接时候,会调用 establish_connection 放入self._avail_channels。
def establish_connection(self): # creates channel to verify connection. # this channel is then used as the next requested channel. # (returned by ``create_channel``). self._avail_channels.append(self.create_channel(self)) return self # for drain events
其堆栈如下:
__init__, redis.py:557 create_channel, base.py:921 establish_connection, base.py:939 _establish_connection, connection.py:801 _connection_factory, connection.py:866 retry_over_time, functional.py:313 _ensure_connection, connection.py:439 connection, connection.py:859 channel, connection.py:283, node.py:11
3.3.3 建立联系
在init中有:
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not self.ack_emulation: # disable visibility timeout self.QoS = virtual.QoS self._queue_cycle = cycle_by_name(self.queue_order_strategy)() self.Client = self._get_client() self.ResponseError = self._get_response_error() self.active_fanout_queues = set() self.auto_delete_queues = set() self._fanout_to_queue = {} self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive} ...... self.connection.cycle.add(self) # add to channel poller. if register_after_fork is not None: register_after_fork(self, _after_fork_cleanup_channel)
重点是:
self.connection.cycle.add(self) # add to channel poller.
这就是把 Channel与Transport 中的 poller 联系起来,这样Transport可以利用Channel去与真实的redis进行交互。
堆栈如下:
add, redis.py:277 __init__, redis.py:531 create_channel, base.py:920 establish_connection, base.py:938 _establish_connection, connection.py:801 _connection_factory, connection.py:866 retry_over_time, functional.py:325 _ensure_connection, connection.py:439 connection, connection.py:859 register_with_event_loop, connection.py:266 main, testUb.py:41
因为已经联系起来,所以此时如下:
+-----------------+ | Channel | | | +-----------------------------------------------------------+ | client +---------> | Redis<ConnectionPool<Connection| | | +-----------------------------------------------------------+ | | | | +---------------------------------------------------+-+ | pool +----------> |ConnectionPool<Connection| | | +---------------------------------------------------+-+ | | | | | _fd_to_chan | | transport +---------> | | | _chan_to_sock | | | | | | poller | +-------------------+ +---------------------+ | after_read | | | +------------------+ +--------------------+ | Hub | | | | | +-------------+ | poller +---------------> | _poll | | | | | +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+ +----------------+ +-------------------+ | Exchange | | Queue | | | | | | | | | | channel | <------------+ exchange | | | | | | | | | +----------------+ +-------------------+
3.3 Transport 与 Hub 联系
on_poll_init 这里就是把 kombu.transport.redis.Transport 与 Hub 联系起来。
用self.poller = poller
把Transport与Hub的poll联系起来。这样 Transport 就可以利用 poll。
def on_poll_init(self, poller): self.poller = poller for channel in self._channels: return channel.qos.restore_visible( num=channel.unacked_restore_limit, )
此时变量如下:
poller = {_poll}self = {MultiChannelPoller}after_read = {set: 0} set() eventflags = {int} 25 fds = {dict: 0} {} poller = {_poll}
因此,我们最终如下:
+-----------------+ | Channel | | | +-----------------------------------------------------------+ | client +---------> | Redis<ConnectionPool<Connection| | | +-----------------------------------------------------------+ | | | | +---------------------------------------------------+-+ | pool +----------> |ConnectionPool<Connection| | | +---------------------------------------------------+-+ | | | | | _fd_to_chan | | transport +---------> | | | _chan_to_sock | | | | | + | _poll | | | | | +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+ +----------------+ +-------------------+ | Exchange | | Queue | | | | | | | | | | channel | <------------+ exchange | | | | | | | | | +----------------+ +-------------------+
0x04 总结
具体如图,可以看出来,上面三个基本模块已经联系到了一起。
可以看到,
- 目前是以Transport为中心,把 Channel代表的真实 redis 与 Hub其中的poll联系起来,但是具体如何使用则尚未得知。
- 用户是通过Connection来作为API入口,connection可以得到Transport。
既然基本架构已经搭好,所以从下文开始,我们看看 Consumer 是如何运作的。
0xFF 参考
celery 7 优秀开源项目kombu源码分析之registry和entrypoint