Cinder Volume 服务启动流程分析和周期性任务分析

1、cinder-volume服务的程序入口

#!/usr/bin/python2
# PBR Generated from u'console_scripts'
import sys
from cinder.cmd.volume import main
if __name__ == "__main__":
sys.exit(main())

2、cinder/cmd/volume.py的main方法实现

def main():
objects.register_all() # import cinder/objects目录下的所有模块
gmr_opts.set_defaults(CONF) # oslo_reports模块,用于生成错误报告,如内存泄漏等
CONF(sys.argv[1:], project='cinder',
version=version.version_string())
logging.setup(CONF, "cinder")
python_logging.captureWarnings(True)
priv_context.init(root_helper=shlex.split(utils.get_root_helper())) # oslo_privsep,service的执行权限设置
utils.monkey_patch() # monkey-patch,替换库比如socket、thread等,不改变import的行为,改变的是操作的指向函数
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
launcher = service.get_launcher()----------------第一步
LOG = logging.getLogger(__name__)
service_started = False if CONF.enabled_backends:-----------在cinder.conf文件中使能的多存储后端
for backend in filter(None, CONF.enabled_backends):-------循环读取配置文件中enabled_backends的值
CONF.register_opt(host_opt, group=backend)
backend_host = getattr(CONF, backend).backend_host
host = "%s@%s" % (backend_host or CONF.host, backend)------生成cinder-volume host的名字,使用命令行cinder service-list的时候,查到的名字
# We also want to set cluster to None on empty strings, and we
# ignore leading and trailing spaces.
cluster = CONF.cluster and CONF.cluster.strip()
cluster = (cluster or None) and '%s@%s' % (cluster, backend)
try:
server = service.Service.create(host=host,------------------s1 步为每一个backend生成一个服务对象
service_name=backend,
binary='cinder-volume',
coordination=True,
cluster=cluster)
except Exception:
msg = _('Volume service %s failed to start.') % host
LOG.exception(msg)
else:--------如果没有异常发生,那么走这个分支
# Dispose of the whole DB connection pool here before
# starting another process. Otherwise we run into cases where
# child processes share DB connections which results in errors.
session.dispose_engine()
launcher.launch_service(server)--------------第二步,调用launcher里面的launch_service
service_started = True
else:
LOG.error(_LE('Configuration for cinder-volume does not specify '
'"enabled_backends". Using DEFAULT section to configure '
'drivers is not supported since Ocata.')) if not service_started:
msg = _('No volume service(s) started successfully, terminating.')
LOG.error(msg)
sys.exit(1) launcher.wait()

3、对service.Service.create()的详解  

cinder/service.py:class Service(service.Service)里面的类方法
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None, service_name=None,
coordination=False, cluster=None):
"""Instantiates class and passes back application object. :param host: defaults to CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'cinder-' part
:param manager: defaults to CONF.<topic>_manager
:param report_interval: defaults to CONF.report_interval
:param periodic_interval: defaults to CONF.periodic_interval
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
:param cluster: Defaults to None, as only some services will have it """
if not host:
host = CONF.host
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = binary
if not manager:
subtopic = topic.rpartition('cinder-')[2]
manager = CONF.get('%s_manager' % subtopic, None)
if report_interval is None:
report_interval = CONF.report_interval-------------读取配置文件中的上报周期值
if periodic_interval is None:
periodic_interval = CONF.periodic_interval---------读取配置文件中的轮训周期值
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,--------调用类初始化一个服务对象
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay,
service_name=service_name,
coordination=coordination,
cluster=cluster) return service_obj

4、 launcher = service.get_launcher()详解

cinder/cmd/volume.py
from cinder import service
launcher = service.get_launcher() cinder/service.py
from oslo_service import service
def get_launcher():
# Note(lpetrut): ProcessLauncher uses green pipes which fail on Windows
# due to missing support of non-blocking I/O pipes. For this reason, the
# service must be spawned differently on Windows, using the ServiceLauncher
# class instead.
if os.name == 'nt':
return Launcher()--------------------windows操作系统
else:
return process_launcher()--------------Linux操作系统 def process_launcher():
return service.ProcessLauncher(CONF) oslo_service/service.py
class ProcessLauncher(object):
"""Launch a service with a given number of workers.""" def __init__(self, conf, wait_interval=0.01, restart_method='reload'):
"""Constructor.
:param conf: an instance of ConfigOpts
:param wait_interval: The interval to sleep for between checks
of child process exit.
:param restart_method: If 'reload', calls reload_config_files on
SIGHUP. If 'mutate', calls mutate_config_files on SIGHUP. Other
values produce a ValueError.
"""
self.conf = conf
conf.register_opts(_options.service_opts)
self.children = {}
self.sigcaught = None
self.running = True
self.wait_interval = wait_interval
self.launcher = None
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.signal_handler = SignalHandler()
self.handle_signal()
self.restart_method = restart_method
if restart_method not in _LAUNCHER_RESTART_METHODS:
raise ValueError(_("Invalid restart_method: %s") % restart_method) def launch_service(self, service, workers=1):-----默认是一个worker,即一个进程
"""Launch a service with a given number of workers.
:param service: a service to launch, must be an instance of
:class:`oslo_service.service.ServiceBase`
:param workers: a number of processes in which a service
will be running
"""
_check_service_base(service)
wrap = ServiceWrapper(service, workers) # Hide existing objects from the garbage collector, so that most
# existing pages will remain in shared memory rather than being
# duplicated between subprocesses in the GC mark-and-sweep. (Requires
# Python 3.7 or later.)
if hasattr(gc, 'freeze'):
gc.freeze() LOG.info('Starting %d workers', wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap) def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info('Forking too fast, sleeping')
time.sleep(1) wrap.forktimes.pop(0) wrap.forktimes.append(time.time()) pid = os.fork()
使用fork创建子进程后,子进程会复制父进程的数据信息,而后,程序就分两个进程继续运行后面的程序
在子进程内,这个方法会返回0;在父进程内,这个方法会返回子进程的编号PID
if pid == 0:
self.launcher = self._child_process(wrap.service)-----fork的子进程接管服务
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(
self.launcher)
if not _is_sighup_and_daemon(signo):
self.launcher.wait()
break
self.launcher.restart()------重启这个进程 os._exit(status) LOG.debug('Started child %d', pid) wrap.children.add(pid)
self.children[pid] = wrap return pid
因此得出,每个 backend 启动的 cinder-volume service 都是独立的子进程,即每个 launcher 都是一个子进程
oslo_service/service.py:class Launcher(object):
def restart(self):
"""Reload config files and restart service.
:returns: The return value from reload_config_files or
mutate_config_files, according to the restart_method.
"""
if self.restart_method == 'reload':
self.conf.reload_config_files()
elif self.restart_method == 'mutate':
self.conf.mutate_config_files()
self.services.restart()
oslo_service/service.py:class Services(object)
def restart(self):
"""Reset services and start them in new threads."""
self.stop()
self.done = event.Event()
for restart_service in self.services:
restart_service.reset()
self.tg.add_thread(self.run_service, restart_service, self.done) @staticmethod
def run_service(service, done):
"""Service start wrapper.
:param service: service to run
:param done: event to wait on until a shutdown is triggered
:returns: None
"""
try:
service.start()-------调用服务的start方法,最终调用的是cinder/service.py:class Service(service.Service):start方法,因为该方法继承oslo_service,service方法
except Exception:
LOG.exception('Error starting thread.')
raise SystemExit(1)
else:
done.wait() cinder/service.py:class Service类:start方法,
def start(self):
version_string = version.version_string()
LOG.info(_LI('Starting %(topic)s node (version %(version_string)s)'),
{'topic': self.topic, 'version_string': version_string})
self.model_disconnected = False if self.coordination:
coordination.COORDINATOR.start() self.manager.init_host(added_to_cluster=self.added_to_cluster,--------------------初始化驱动程序
service_id=Service.service_id) LOG.debug("Creating RPC server for service %s", self.topic) ctxt = context.get_admin_context()
endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)
obj_version_cap = objects.Service.get_minimum_obj_version(ctxt)
LOG.debug("Pinning object versions for RPC server serializer to %s",
obj_version_cap)
serializer = objects_base.CinderObjectSerializer(obj_version_cap) target = messaging.Target(topic=self.topic, server=self.host)
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start() # NOTE(dulek): Kids, don't do that at home. We're relying here on
# oslo.messaging implementation details to keep backward compatibility
# with pre-Ocata services. This will not matter once we drop
# compatibility with them.
if self.topic == constants.VOLUME_TOPIC:
target = messaging.Target(
topic='%(topic)s.%(host)s' % {'topic': self.topic,
'host': self.host},
server=vol_utils.extract_host(self.host, 'host'))
self.backend_rpcserver = rpc.get_server(target, endpoints,
serializer)
self.backend_rpcserver.start() # TODO(geguileo): In O - Remove the is_svc_upgrading_to_n part
if self.cluster and not self.is_svc_upgrading_to_n(self.binary):
LOG.info(_LI('Starting %(topic)s cluster %(cluster)s (version '
'%(version)s)'),
{'topic': self.topic, 'version': version_string,
'cluster': self.cluster})
target = messaging.Target(
topic='%s.%s' % (self.topic, self.cluster),
server=vol_utils.extract_host(self.cluster, 'host'))
serializer = objects_base.CinderObjectSerializer(obj_version_cap)
self.cluster_rpcserver = rpc.get_server(target, endpoints,
serializer)
self.cluster_rpcserver.start() self.manager.init_host_with_rpc() if self.report_interval:
pulse = loopingcall.FixedIntervalLoopingCall(----------------启动上报给scheduler定时任务
self.report_state)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
self.timers.append(pulse) if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None periodic = loopingcall.FixedIntervalLoopingCall(--------------启动轮训根性定时任务
self.periodic_tasks)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
self.timers.append(periodic) cinder/volume/manager.py:VolumeManager
init_host 过程分析
def init_host(self, added_to_cluster=None, **kwargs):
"""Perform any required initialization."""
ctxt = context.get_admin_context()
if not self.driver.supported:
utils.log_unsupported_driver_warning(self.driver) if not self.configuration.enable_unsupported_driver:
LOG.error(_LE("Unsupported drivers are disabled."
" You can re-enable by adding "
"enable_unsupported_driver=True to the "
"driver section in cinder.conf"),
resource={'type': 'driver',
'id': self.__class__.__name__})
return # If we have just added this host to a cluster we have to include all
# our resources in that cluster.
if added_to_cluster:
self._include_resources_in_cluster(ctxt) LOG.info(_LI("Starting volume driver %(driver_name)s (%(version)s)"),
{'driver_name': self.driver.__class__.__name__,
'version': self.driver.get_version()})
try:
self.driver.do_setup(ctxt)---------------------存储的初始化操作放到了 do_setup 中,当 do_setup 失败时,并不会导致服务启动失败,也不会影响 multi backend 的其他 backend。
self.driver.check_for_setup_error()
except Exception:
LOG.exception(_LE("Failed to initialize driver."),
resource={'type': 'driver',
'id': self.__class__.__name__})
# we don't want to continue since we failed
# to initialize the driver correctly.
return # Initialize backend capabilities list
self.driver.init_capabilities() volumes = self._get_my_volumes(ctxt)
snapshots = self._get_my_snapshots(ctxt)
self._sync_provider_info(ctxt, volumes, snapshots)
# FIXME volume count for exporting is wrong self.stats['pools'] = {}
self.stats.update({'allocated_capacity_gb': 0}) try:
for volume in volumes:
# available volume should also be counted into allocated
if volume['status'] in ['in-use', 'available']:
# calculate allocated capacity for driver
self._count_allocated_capacity(ctxt, volume) try:
if volume['status'] in ['in-use']:
self.driver.ensure_export(ctxt, volume)
except Exception:
LOG.exception(_LE("Failed to re-export volume, "
"setting to ERROR."),
resource=volume)
volume.conditional_update({'status': 'error'},
{'status': 'in-use'})
# All other cleanups are processed by parent class CleanableManager except Exception:
LOG.exception(_LE("Error during re-export on driver init."),
resource=volume)
return self.driver.set_throttle() # at this point the driver is considered initialized.
# NOTE(jdg): Careful though because that doesn't mean
# that an entry exists in the service table
self.driver.set_initialized()-------------set_initialized 将驱动的 initialized 属性设定为 true,标志驱动成功启动并初始化完成。 # Keep the image tmp file clean when init host.
backend_name = vol_utils.extract_host(self.service_topic_queue)
image_utils.cleanup_temporary_file(backend_name) # collect and publish service capabilities
self.publish_service_capabilities(ctxt)
LOG.info(_LI("Driver initialization completed successfully."),
resource={'type': 'driver',
'id': self.driver.__class__.__name__}) # Make sure to call CleanableManager to do the cleanup
super(VolumeManager, self).init_host(added_to_cluster=added_to_cluster,
**kwargs)

  

 

上一篇:zabbix监控 linux/windows 主机tcp连接状态


下一篇:iOS中@class #import #include 简介