python 64式: 第32式、rabbitmq性能调优
1 rabbit qos基础
rabbit qos可以设置一个整数值N,表示的意思就是一个消费者最多只能一次拉取N条消息,
一旦N条消息没有处理完,就不会从队列中获取新的消息,直到有消息被ack。
设置qos的作用就是放置消费者从队列中一下拉取所有消息,从而导致
击穿服务,导致服务崩溃或异常。
2 rabbit qos在openstack组件中的应用
可以在组件的配置文件中设置类似如下内容,其中数字100可以根据实际需要修改。
[oslo_messaging_rabbit]
rabbit_qos_prefetch_count = 100
3 oslo_messaging中rabbitmq qos源码分析
查看oslo_messaging
newton版本:
oslo.messaging===5.10.2
代码:
https://github.com/openstack/oslo.messaging
https://github.com/openstack/oslo.messaging/tree/5.10.2
下载:
cd /home/machao/myproject/task/385_rabbitmq_qos原理/oslo.messaging-5.10.2
3.1 查找相关代码
grep -r "rabbit_qos_prefetch_count" ./*
对应输出内容如下:
[root@localhost oslo.messaging-5.10.2]# grep -r "rabbit_qos_prefetch_count" ./*
./oslo_messaging/_drivers/impl_rabbit.py: cfg.IntOpt('rabbit_qos_prefetch_count',
./oslo_messaging/_drivers/impl_rabbit.py: self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
./oslo_messaging/_drivers/impl_rabbit.py: if self.rabbit_qos_prefetch_count > 0:
./oslo_messaging/_drivers/impl_rabbit.py: self.rabbit_qos_prefetch_count,
./oslo_messaging/_drivers/impl_rabbit.py: conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)
./oslo_messaging/tests/drivers/test_impl_rabbit.py: self.config(rabbit_qos_prefetch_count=prefetch,
./oslo_messaging/tests/test_config_opts_proxy.py: rabbit_qos_prefetch_count=0,
./oslo_messaging/tests/test_config_opts_proxy.py: "?rabbit_qos_prefetch_count=2"
./oslo_messaging/tests/test_config_opts_proxy.py: conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)
3.2 总入口在
oslo_messaging/_drivers/impl_rabbit.py
中的Connection类中,具体代码如下:
class Connection(object):
"""Connection object."""
pools = {}
def __init__(self, conf, url, purpose):
# NOTE(viktors): Parse config options
driver_conf = conf.oslo_messaging_rabbit
self.max_retries = driver_conf.rabbit_max_retries
self.interval_start = driver_conf.rabbit_retry_interval
self.interval_stepping = driver_conf.rabbit_retry_backoff
self.interval_max = driver_conf.rabbit_interval_max
self.login_method = driver_conf.rabbit_login_method
self.fake_rabbit = driver_conf.fake_rabbit
self.virtual_host = driver_conf.rabbit_virtual_host
self.rabbit_hosts = driver_conf.rabbit_hosts
self.rabbit_port = driver_conf.rabbit_port
self.rabbit_userid = driver_conf.rabbit_userid
self.rabbit_password = driver_conf.rabbit_password
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
......
def _set_qos(self, channel):
"""Set QoS prefetch count on the channel"""
if self.rabbit_qos_prefetch_count > 0:
channel.basic_qos(0,
self.rabbit_qos_prefetch_count,
False)
分析:
3.2.1) 其中_set_qos方法被如下方法调用
def _set_current_channel(self, new_channel):
"""Change the channel to use.
NOTE(sileht): Must be called within the connection lock
"""
if new_channel == self.channel:
return
if self.channel is not None:
self._declared_queues.clear()
self._declared_exchanges.clear()
self.connection.maybe_close_channel(self.channel)
self.channel = new_channel
if new_channel is not None:
if self.purpose == rpc_common.PURPOSE_LISTEN:
self._set_qos(new_channel)
self._producer = kombu.messaging.Producer(new_channel)
for consumer in self._consumers:
consumer.declare(self)
3.2.2) 分析_set_current_channel方法
其中_set_current_channel方法被oslo_messaging/_drivers/impl_rabbit.py
文件中的Connection类的ensure_connection方法调用
def ensure_connection(self):
# NOTE(sileht): we reset the channel and ensure
# the kombu underlying connection works
self._set_current_channel(None)
self.ensure(method=lambda: self.connection.connection)
self.set_transport_socket_timeout()
3.2.3)分析ensure_connection方法
该ensure_connection方法被oslo_messaging/_drivers/impl_rabbit.py
文件中的Connection类的__init__方法调用,具体代码如下
class Connection(object):
"""Connection object."""
pools = {}
def __init__(self, conf, url, purpose):
# NOTE(viktors): Parse config options
driver_conf = conf.oslo_messaging_rabbit
self.max_retries = driver_conf.rabbit_max_retries
self.interval_start = driver_conf.rabbit_retry_interval
self.interval_stepping = driver_conf.rabbit_retry_backoff
self.interval_max = driver_conf.rabbit_interval_max
self.login_method = driver_conf.rabbit_login_method
self.fake_rabbit = driver_conf.fake_rabbit
self.virtual_host = driver_conf.rabbit_virtual_host
self.rabbit_hosts = driver_conf.rabbit_hosts
self.rabbit_port = driver_conf.rabbit_port
self.rabbit_userid = driver_conf.rabbit_userid
self.rabbit_password = driver_conf.rabbit_password
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
self.heartbeat_timeout_threshold = \
driver_conf.heartbeat_timeout_threshold
self.heartbeat_rate = driver_conf.heartbeat_rate
self.kombu_reconnect_delay = driver_conf.kombu_reconnect_delay
self.amqp_durable_queues = driver_conf.amqp_durable_queues
self.amqp_auto_delete = driver_conf.amqp_auto_delete
self.rabbit_use_ssl = driver_conf.rabbit_use_ssl
self.kombu_missing_consumer_retry_timeout = \
driver_conf.kombu_missing_consumer_retry_timeout
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
self.kombu_compression = driver_conf.kombu_compression
if self.rabbit_use_ssl:
self.kombu_ssl_version = driver_conf.kombu_ssl_version
self.kombu_ssl_keyfile = driver_conf.kombu_ssl_keyfile
self.kombu_ssl_certfile = driver_conf.kombu_ssl_certfile
self.kombu_ssl_ca_certs = driver_conf.kombu_ssl_ca_certs
# Try forever?
if self.max_retries <= 0:
self.max_retries = None
if url.virtual_host is not None:
virtual_host = url.virtual_host
else:
virtual_host = self.virtual_host
self._url = ''
if self.fake_rabbit:
LOG.warning(_LW("Deprecated: fake_rabbit option is deprecated, "
"set rpc_backend to kombu+memory or use the fake "
"driver instead."))
self._url = 'memory://%s/' % virtual_host
elif url.hosts:
if url.transport.startswith('kombu+'):
LOG.warning(_LW('Selecting the kombu transport through the '
'transport url (%s) is a experimental feature '
'and this is not yet supported.'),
url.transport)
if len(url.hosts) > 1:
random.shuffle(url.hosts)
for host in url.hosts:
transport = url.transport.replace('kombu+', '')
transport = transport.replace('rabbit', 'amqp')
self._url += '%s%s://%s:%s@%s:%s/%s' % (
";" if self._url else '',
transport,
parse.quote(host.username or ''),
parse.quote(host.password or ''),
self._parse_url_hostname(host.hostname) or '',
str(host.port or 5672),
virtual_host)
elif url.transport.startswith('kombu+'):
# NOTE(sileht): url have a + but no hosts
# (like kombu+memory:///), pass it to kombu as-is
transport = url.transport.replace('kombu+', '')
self._url = "%s://%s" % (transport, virtual_host)
else:
if len(self.rabbit_hosts) > 1:
random.shuffle(self.rabbit_hosts)
for adr in self.rabbit_hosts:
hostname, port = netutils.parse_host_port(
adr, default_port=self.rabbit_port)
self._url += '%samqp://%s:%s@%s:%s/%s' % (
";" if self._url else '',
parse.quote(self.rabbit_userid, ''),
parse.quote(self.rabbit_password, ''),
self._parse_url_hostname(hostname), port,
virtual_host)
self._initial_pid = os.getpid()
self._consumers = {}
self._producer = None
self._new_tags = set()
self._active_tags = {}
self._tags = itertools.count(1)
# Set of exchanges and queues declared on the channel to avoid
# unnecessary redeclaration. This set is resetted each time
# the connection is resetted in Connection._set_current_channel
self._declared_exchanges = set()
self._declared_queues = set()
self._consume_loop_stopped = False
self.channel = None
self.purpose = purpose
# NOTE(sileht): if purpose is PURPOSE_LISTEN
# we don't need the lock because we don't
# have a heartbeat thread
if purpose == rpc_common.PURPOSE_SEND:
self._connection_lock = ConnectionLock()
else:
self._connection_lock = DummyConnectionLock()
self.connection_id = str(uuid.uuid4())
self.name = "%s:%d:%s" % (os.path.basename(sys.argv[0]),
os.getpid(),
self.connection_id)
self.connection = kombu.connection.Connection(
self._url, ssl=self._fetch_ssl_params(),
login_method=self.login_method,
heartbeat=self.heartbeat_timeout_threshold,
failover_strategy=self.kombu_failover_strategy,
transport_options={
'confirm_publish': True,
'client_properties': {
'capabilities': {
'authentication_failure_close': True,
'connection.blocked': True,
'consumer_cancel_notify': True
},
'connection_name': self.name},
'on_blocked': self._on_connection_blocked,
'on_unblocked': self._on_connection_unblocked,
},
)
LOG.debug('[%(connection_id)s] Connecting to AMQP server on'
' %(hostname)s:%(port)s',
self._get_connection_info())
# NOTE(sileht): kombu recommend to run heartbeat_check every
# seconds, but we use a lock around the kombu connection
# so, to not lock to much this lock to most of the time do nothing
# expected waiting the events drain, we start heartbeat_check and
# retrieve the server heartbeat packet only two times more than
# the minimum required for the heartbeat works
# (heatbeat_timeout/heartbeat_rate/2.0, default kombu
# heartbeat_rate is 2)
self._heartbeat_wait_timeout = (
float(self.heartbeat_timeout_threshold) /
float(self.heartbeat_rate) / 2.0)
self._heartbeat_support_log_emitted = False
# NOTE(sileht): just ensure the connection is setuped at startup
self.ensure_connection()
......
总结:
oslo.messaging库中设置rabbitmq的rabbit_qos_prefetch_count整体调用路径是:
1 RabbitDriver的__init__方法中使用
connection_pool = pool.ConnectionPool(
conf, max_size, min_size, ttl,
url, Connection)
2 Connection的__init__方法中调用ensure_connection方法
3 ensure_connection方法中调用_set_current_channel方法
4 _set_current_channel方法中_set_qos方法
5 _set_qos方法中
def _set_qos(self, channel):
"""Set QoS prefetch count on the channel"""
if self.rabbit_qos_prefetch_count > 0:
channel.basic_qos(0,
self.rabbit_qos_prefetch_count,
False)
也就是说oslo.messaging在使用rabbitmq作为后端实现时,设置的
rabbit_qos_prefetch_count配置项实际上就是rabbitmq中的prefetchCount字段,即
限制一个消费者最多只接收指定个数的消息。实际就是一个消息限流的处理,避免同时处理大量消息
导致的问题。
发现:
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize:0
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别
4 总结
oslo_messaging中设置的rabbit_qos_prefetch_count配置项实际上就是rabbitmq中的prefetchCount字段,即
限制一个消费者最多只接收指定个数的消息。
参考
https://www.rabbitmq.com/consumer-prefetch.html
https://www.itsvse.com/thread-4667-1-1.html