消息队列 64式 : 1、rabbitmq qos原理分析

python 64式: 第32式、rabbitmq性能调优

1 rabbit qos基础
rabbit qos可以设置一个整数值N,表示的意思就是一个消费者最多只能一次拉取N条消息,
一旦N条消息没有处理完,就不会从队列中获取新的消息,直到有消息被ack。
设置qos的作用就是放置消费者从队列中一下拉取所有消息,从而导致
击穿服务,导致服务崩溃或异常。
 

2 rabbit qos在openstack组件中的应用
可以在组件的配置文件中设置类似如下内容,其中数字100可以根据实际需要修改。
[oslo_messaging_rabbit]
rabbit_qos_prefetch_count = 100

3 oslo_messaging中rabbitmq qos源码分析
查看oslo_messaging
newton版本:
oslo.messaging===5.10.2
代码:
https://github.com/openstack/oslo.messaging
https://github.com/openstack/oslo.messaging/tree/5.10.2
下载:
cd /home/machao/myproject/task/385_rabbitmq_qos原理/oslo.messaging-5.10.2

3.1 查找相关代码
grep -r "rabbit_qos_prefetch_count" ./*
对应输出内容如下:
[root@localhost oslo.messaging-5.10.2]# grep -r "rabbit_qos_prefetch_count" ./*
./oslo_messaging/_drivers/impl_rabbit.py:    cfg.IntOpt('rabbit_qos_prefetch_count',
./oslo_messaging/_drivers/impl_rabbit.py:        self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
./oslo_messaging/_drivers/impl_rabbit.py:        if self.rabbit_qos_prefetch_count > 0:
./oslo_messaging/_drivers/impl_rabbit.py:                              self.rabbit_qos_prefetch_count,
./oslo_messaging/_drivers/impl_rabbit.py:            conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)
./oslo_messaging/tests/drivers/test_impl_rabbit.py:        self.config(rabbit_qos_prefetch_count=prefetch,
./oslo_messaging/tests/test_config_opts_proxy.py:                    rabbit_qos_prefetch_count=0,
./oslo_messaging/tests/test_config_opts_proxy.py:                       "?rabbit_qos_prefetch_count=2"
./oslo_messaging/tests/test_config_opts_proxy.py:                         conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)


3.2 总入口在
oslo_messaging/_drivers/impl_rabbit.py
中的Connection类中,具体代码如下:

class Connection(object):
    """Connection object."""

    pools = {}

    def __init__(self, conf, url, purpose):
        # NOTE(viktors): Parse config options
        driver_conf = conf.oslo_messaging_rabbit

        self.max_retries = driver_conf.rabbit_max_retries
        self.interval_start = driver_conf.rabbit_retry_interval
        self.interval_stepping = driver_conf.rabbit_retry_backoff
        self.interval_max = driver_conf.rabbit_interval_max

        self.login_method = driver_conf.rabbit_login_method
        self.fake_rabbit = driver_conf.fake_rabbit
        self.virtual_host = driver_conf.rabbit_virtual_host
        self.rabbit_hosts = driver_conf.rabbit_hosts
        self.rabbit_port = driver_conf.rabbit_port
        self.rabbit_userid = driver_conf.rabbit_userid
        self.rabbit_password = driver_conf.rabbit_password
        self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
        self.rabbit_transient_queues_ttl = \
            driver_conf.rabbit_transient_queues_ttl
        self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
      ......


    def _set_qos(self, channel):
        """Set QoS prefetch count on the channel"""
        if self.rabbit_qos_prefetch_count > 0:
            channel.basic_qos(0,
                              self.rabbit_qos_prefetch_count,
                              False)

分析:
3.2.1) 其中_set_qos方法被如下方法调用
    def _set_current_channel(self, new_channel):
        """Change the channel to use.

        NOTE(sileht): Must be called within the connection lock
        """
        if new_channel == self.channel:
            return

        if self.channel is not None:
            self._declared_queues.clear()
            self._declared_exchanges.clear()
            self.connection.maybe_close_channel(self.channel)

        self.channel = new_channel

        if new_channel is not None:
            if self.purpose == rpc_common.PURPOSE_LISTEN:
                self._set_qos(new_channel)
            self._producer = kombu.messaging.Producer(new_channel)
            for consumer in self._consumers:
                consumer.declare(self)
3.2.2) 分析_set_current_channel方法
其中_set_current_channel方法被oslo_messaging/_drivers/impl_rabbit.py
文件中的Connection类的ensure_connection方法调用
    def ensure_connection(self):
        # NOTE(sileht): we reset the channel and ensure
        # the kombu underlying connection works
        self._set_current_channel(None)
        self.ensure(method=lambda: self.connection.connection)
        self.set_transport_socket_timeout()


3.2.3)分析ensure_connection方法
该ensure_connection方法被oslo_messaging/_drivers/impl_rabbit.py
文件中的Connection类的__init__方法调用,具体代码如下
class Connection(object):
    """Connection object."""

    pools = {}

    def __init__(self, conf, url, purpose):
        # NOTE(viktors): Parse config options
        driver_conf = conf.oslo_messaging_rabbit

        self.max_retries = driver_conf.rabbit_max_retries
        self.interval_start = driver_conf.rabbit_retry_interval
        self.interval_stepping = driver_conf.rabbit_retry_backoff
        self.interval_max = driver_conf.rabbit_interval_max

        self.login_method = driver_conf.rabbit_login_method
        self.fake_rabbit = driver_conf.fake_rabbit
        self.virtual_host = driver_conf.rabbit_virtual_host
        self.rabbit_hosts = driver_conf.rabbit_hosts
        self.rabbit_port = driver_conf.rabbit_port
        self.rabbit_userid = driver_conf.rabbit_userid
        self.rabbit_password = driver_conf.rabbit_password
        self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
        self.rabbit_transient_queues_ttl = \
            driver_conf.rabbit_transient_queues_ttl
        self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
        self.heartbeat_timeout_threshold = \
            driver_conf.heartbeat_timeout_threshold
        self.heartbeat_rate = driver_conf.heartbeat_rate
        self.kombu_reconnect_delay = driver_conf.kombu_reconnect_delay
        self.amqp_durable_queues = driver_conf.amqp_durable_queues
        self.amqp_auto_delete = driver_conf.amqp_auto_delete
        self.rabbit_use_ssl = driver_conf.rabbit_use_ssl
        self.kombu_missing_consumer_retry_timeout = \
            driver_conf.kombu_missing_consumer_retry_timeout
        self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
        self.kombu_compression = driver_conf.kombu_compression

        if self.rabbit_use_ssl:
            self.kombu_ssl_version = driver_conf.kombu_ssl_version
            self.kombu_ssl_keyfile = driver_conf.kombu_ssl_keyfile
            self.kombu_ssl_certfile = driver_conf.kombu_ssl_certfile
            self.kombu_ssl_ca_certs = driver_conf.kombu_ssl_ca_certs

        # Try forever?
        if self.max_retries <= 0:
            self.max_retries = None

        if url.virtual_host is not None:
            virtual_host = url.virtual_host
        else:
            virtual_host = self.virtual_host

        self._url = ''
        if self.fake_rabbit:
            LOG.warning(_LW("Deprecated: fake_rabbit option is deprecated, "
                            "set rpc_backend to kombu+memory or use the fake "
                            "driver instead."))
            self._url = 'memory://%s/' % virtual_host
        elif url.hosts:
            if url.transport.startswith('kombu+'):
                LOG.warning(_LW('Selecting the kombu transport through the '
                                'transport url (%s) is a experimental feature '
                                'and this is not yet supported.'),
                            url.transport)
            if len(url.hosts) > 1:
                random.shuffle(url.hosts)
            for host in url.hosts:
                transport = url.transport.replace('kombu+', '')
                transport = transport.replace('rabbit', 'amqp')
                self._url += '%s%s://%s:%s@%s:%s/%s' % (
                    ";" if self._url else '',
                    transport,
                    parse.quote(host.username or ''),
                    parse.quote(host.password or ''),
                    self._parse_url_hostname(host.hostname) or '',
                    str(host.port or 5672),
                    virtual_host)
        elif url.transport.startswith('kombu+'):
            # NOTE(sileht): url have a + but no hosts
            # (like kombu+memory:///), pass it to kombu as-is
            transport = url.transport.replace('kombu+', '')
            self._url = "%s://%s" % (transport, virtual_host)
        else:
            if len(self.rabbit_hosts) > 1:
                random.shuffle(self.rabbit_hosts)
            for adr in self.rabbit_hosts:
                hostname, port = netutils.parse_host_port(
                    adr, default_port=self.rabbit_port)
                self._url += '%samqp://%s:%s@%s:%s/%s' % (
                    ";" if self._url else '',
                    parse.quote(self.rabbit_userid, ''),
                    parse.quote(self.rabbit_password, ''),
                    self._parse_url_hostname(hostname), port,
                    virtual_host)

        self._initial_pid = os.getpid()

        self._consumers = {}
        self._producer = None
        self._new_tags = set()
        self._active_tags = {}
        self._tags = itertools.count(1)

        # Set of exchanges and queues declared on the channel to avoid
        # unnecessary redeclaration. This set is resetted each time
        # the connection is resetted in Connection._set_current_channel
        self._declared_exchanges = set()
        self._declared_queues = set()

        self._consume_loop_stopped = False
        self.channel = None
        self.purpose = purpose

        # NOTE(sileht): if purpose is PURPOSE_LISTEN
        # we don't need the lock because we don't
        # have a heartbeat thread
        if purpose == rpc_common.PURPOSE_SEND:
            self._connection_lock = ConnectionLock()
        else:
            self._connection_lock = DummyConnectionLock()

        self.connection_id = str(uuid.uuid4())
        self.name = "%s:%d:%s" % (os.path.basename(sys.argv[0]),
                                  os.getpid(),
                                  self.connection_id)
        self.connection = kombu.connection.Connection(
            self._url, ssl=self._fetch_ssl_params(),
            login_method=self.login_method,
            heartbeat=self.heartbeat_timeout_threshold,
            failover_strategy=self.kombu_failover_strategy,
            transport_options={
                'confirm_publish': True,
                'client_properties': {
                    'capabilities': {
                        'authentication_failure_close': True,
                        'connection.blocked': True,
                        'consumer_cancel_notify': True
                    },
                    'connection_name': self.name},
                'on_blocked': self._on_connection_blocked,
                'on_unblocked': self._on_connection_unblocked,
            },
        )

        LOG.debug('[%(connection_id)s] Connecting to AMQP server on'
                  ' %(hostname)s:%(port)s',
                  self._get_connection_info())

        # NOTE(sileht): kombu recommend to run heartbeat_check every
        # seconds, but we use a lock around the kombu connection
        # so, to not lock to much this lock to most of the time do nothing
        # expected waiting the events drain, we start heartbeat_check and
        # retrieve the server heartbeat packet only two times more than
        # the minimum required for the heartbeat works
        # (heatbeat_timeout/heartbeat_rate/2.0, default kombu
        # heartbeat_rate is 2)
        self._heartbeat_wait_timeout = (
            float(self.heartbeat_timeout_threshold) /
            float(self.heartbeat_rate) / 2.0)
        self._heartbeat_support_log_emitted = False

        # NOTE(sileht): just ensure the connection is setuped at startup
        self.ensure_connection()
      ......


总结:
oslo.messaging库中设置rabbitmq的rabbit_qos_prefetch_count整体调用路径是:
1 RabbitDriver的__init__方法中使用
        connection_pool = pool.ConnectionPool(
            conf, max_size, min_size, ttl,
            url, Connection)
2 Connection的__init__方法中调用ensure_connection方法
3 ensure_connection方法中调用_set_current_channel方法
4 _set_current_channel方法中_set_qos方法
5 _set_qos方法中
    def _set_qos(self, channel):
        """Set QoS prefetch count on the channel"""
        if self.rabbit_qos_prefetch_count > 0:
            channel.basic_qos(0,
                              self.rabbit_qos_prefetch_count,
                              False)
也就是说oslo.messaging在使用rabbitmq作为后端实现时,设置的
rabbit_qos_prefetch_count配置项实际上就是rabbitmq中的prefetchCount字段,即
限制一个消费者最多只接收指定个数的消息。实际就是一个消息限流的处理,避免同时处理大量消息
导致的问题。


发现:
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize:0 
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别

4 总结
oslo_messaging中设置的rabbit_qos_prefetch_count配置项实际上就是rabbitmq中的prefetchCount字段,即
限制一个消费者最多只接收指定个数的消息。

参考
https://www.rabbitmq.com/consumer-prefetch.html
https://www.itsvse.com/thread-4667-1-1.html

上一篇:网络设备端口速度限制


下一篇:neutron N版qos介绍