标签:exist server eve time() warnings 名称 custom client text
比较忙,很长世间没空看openstack源码,抽时间看了一下cinder的源码,贴下学习心得。本文简单写一下cinder的三个服务的启动,cinder-api, cinder-scheduler, 以及cinder-volume,三者启动都差不多
1、cinder-api
入口文件为/usr/bin/cinder-api,由此可知,入口为cinder.cmd.api文件中的main函数
1 #!/usr/bin/python2 2 # PBR Generated from u‘console_scripts‘ 3 4 import sys 5 6 from cinder.cmd.api import main 7 8 9 if __name__ == "__main__": 10 sys.exit(main())
main函数如下,主要关注14-16行即可
1 def main(): 2 objects.register_all() 3 gmr_opts.set_defaults(CONF) 4 CONF(sys.argv[1:], project=‘cinder‘, 5 version=version.version_string()) 6 config.set_middleware_defaults() 7 logging.setup(CONF, "cinder") 8 python_logging.captureWarnings(True) 9 utils.monkey_patch() 10 11 gmr.TextGuruMeditation.setup_autorun(version, conf=CONF) 12 13 rpc.init(CONF) 14 launcher = service.process_launcher() 15 server = service.WSGIService(‘osapi_volume‘) 16 launcher.launch_service(server, workers=server.workers) 17 launcher.wait()
14行创建一个 ProcessLauncher 对象,以便后续对app进行launch
15行创建 WSGIService 对象,名称为 osapi_volume
16行,launch对象调用launch_service方法对server进行处理,通过查看源码,是调用了ProcessLauncher对象的_start_child对服务进行处理
1 def launch_service(self, service, workers=1): 2 """Launch a service with a given number of workers. 3 4 :param service: a service to launch, must be an instance of 5 :class:`oslo_service.service.ServiceBase` 6 :param workers: a number of processes in which a service 7 will be running 8 """ 9 _check_service_base(service) #对象类型校验 10 wrap = ServiceWrapper(service, workers) #对象包装 11 12 # Hide existing objects from the garbage collector, so that most 13 # existing pages will remain in shared memory rather than being 14 # duplicated between subprocesses in the GC mark-and-sweep. (Requires 15 # Python 3.7 or later.) 16 if hasattr(gc, ‘freeze‘): 17 gc.freeze() 18 19 LOG.info(‘Starting %d workers‘, wrap.workers) 20 while self.running and len(wrap.children) < wrap.workers: 21 self._start_child(wrap)
_start_child方法很简单,调用了os.fork()创建了一个子进程,如果创建子进程成功,再次调用_child_process方法
1 def _start_child(self, wrap): 2 if len(wrap.forktimes) > wrap.workers: 3 # Limit ourselves to one process a second (over the period of 4 # number of workers * 1 second). This will allow workers to 5 # start up quickly but ensure we don‘t fork off children that 6 # die instantly too quickly. 7 if time.time() - wrap.forktimes[0] < wrap.workers: 8 LOG.info(‘Forking too fast, sleeping‘) 9 time.sleep(1) 10 11 wrap.forktimes.pop(0) 12 13 wrap.forktimes.append(time.time()) 14 15 pid = os.fork() 16 if pid == 0: 17 self.launcher = self._child_process(wrap.service) 18 while True: 19 self._child_process_handle_signal() 20 status, signo = self._child_wait_for_exit_or_signal( 21 self.launcher) 22 if not _is_sighup_and_daemon(signo): 23 self.launcher.wait() 24 break 25 self.launcher.restart() 26 27 os._exit(status) 28 29 LOG.debug(‘Started child %d‘, pid) 30 31 wrap.children.add(pid) 32 self.children[pid] = wrap 33 34 return pid
1 def _child_process(self, service): 2 self._child_process_handle_signal() 3 4 # Reopen the eventlet hub to make sure we don‘t share an epoll 5 # fd with parent and/or siblings, which would be bad 6 eventlet.hubs.use_hub() 7 8 # Close write to ensure only parent has it open 9 os.close(self.writepipe) 10 # Create greenthread to watch for parent to close pipe 11 eventlet.spawn_n(self._pipe_watcher) 12 13 # Reseed random number generator 14 random.seed() 15 16 launcher = Launcher(self.conf, restart_method=self.restart_method) 17 launcher.launch_service(service) 18 return launcher
_child_process方法中调用了eventlet,获取hub,并且创建一个线程,对进程进行观察,同时创建一个Launcher对象,对服务进行lanch,launch_service
1 def launch_service(self, service, workers=1): 2 """Load and start the given service. 3 4 :param service: The service you would like to start, must be an 5 instance of :class:`oslo_service.service.ServiceBase` 6 :param workers: This param makes this method compatible with 7 ProcessLauncher.launch_service. It must be None, 1 or 8 omitted. 9 :returns: None 10 11 """ 12 if workers is not None and workers != 1: 13 raise ValueError(_("Launcher asked to start multiple workers")) 14 _check_service_base(service) 15 service.backdoor_port = self.backdoor_port 16 self.services.add(service) #关键
Launcher 对象的关键在于这个add方法,它将所有调用其进行launch的服务添加到Service()的service列表中,最终调用了添加的service的start()方法
1 def add(self, service): 2 """Add a service to a list and create a thread to run it. 3 4 :param service: service to run 5 """ 6 self.services.append(service) 7 self.tg.add_thread(self.run_service, service, self.done) 8 9 @staticmethod 10 def run_service(service, done): 11 """Service start wrapper. 12 13 :param service: service to run 14 :param done: event to wait on until a shutdown is triggered 15 :returns: None 16 17 """ 18 try: 19 service.start() 20 except Exception: 21 LOG.exception(‘Error starting thread.‘) 22 raise SystemExit(1) 23 else: 24 done.wait()
这个service即最初提到的WSGIService 对象,查看一下其start方法
1 def start(self): 2 """Start serving this service using loaded configuration. 3 4 Also, retrieve updated port number in case ‘0‘ was passed in, which 5 indicates a random port should be used. 6 7 :returns: None 8 9 """ 10 if self.manager: 11 self.manager.init_host() 12 self.server.start() 13 self.port = self.server.port
此时self.manager为None,关键执行步骤为self.server.start(),这个server为WSGIService 进行init的时候构造的对象
1 class WSGIService(service.ServiceBase): 2 """Provides ability to launch API from a ‘paste‘ configuration.""" 3 4 def __init__(self, name, loader=None): 5 """Initialize, but do not start the WSGI server. 6 7 :param name: The name of the WSGI server given to the loader. 8 :param loader: Loads the WSGI application using the given name. 9 :returns: None 10 11 """ 12 self.name = name 13 self.manager = self._get_manager() 14 self.loader = loader or wsgi.Loader(CONF) 15 self.app = self.loader.load_app(name) 16 self.host = getattr(CONF, ‘%s_listen‘ % name, "0.0.0.0") 17 self.port = getattr(CONF, ‘%s_listen_port‘ % name, 0) 18 self.use_ssl = getattr(CONF, ‘%s_use_ssl‘ % name, False) 19 self.workers = (getattr(CONF, ‘%s_workers‘ % name, None) or 20 processutils.get_worker_count()) 21 if self.workers and self.workers < 1: 22 worker_name = ‘%s_workers‘ % name 23 msg = (_("%(worker_name)s value of %(workers)d is invalid, " 24 "must be greater than 0.") % 25 {‘worker_name‘: worker_name, 26 ‘workers‘: self.workers}) 27 raise exception.InvalidConfigurationValue(msg) 28 setup_profiler(name, self.host) 29 30 self.server = wsgi.Server(CONF, 31 name, 32 self.app, 33 host=self.host, 34 port=self.port, 35 use_ssl=self.use_ssl) # 这里
1 def start(self): 2 """Start serving a WSGI application. 3 4 :returns: None 5 """ 6 # The server socket object will be closed after server exits, 7 # but the underlying file descriptor will remain open, and will 8 # give bad file descriptor error. So duplicating the socket object, 9 # to keep file descriptor usable. 10 11 self.dup_socket = self.socket.dup() 12 13 if self._use_ssl: 14 self.dup_socket = sslutils.wrap(self.conf, self.dup_socket) 15 16 wsgi_kwargs = { 17 ‘func‘: eventlet.wsgi.server, 18 ‘sock‘: self.dup_socket, 19 ‘site‘: self.app, 20 ‘protocol‘: self._protocol, 21 ‘custom_pool‘: self._pool, 22 ‘log‘: self._logger, 23 ‘log_format‘: self.conf.wsgi_log_format, 24 ‘debug‘: False, 25 ‘keepalive‘: self.conf.wsgi_keep_alive, 26 ‘socket_timeout‘: self.client_socket_timeout 27 } 28 29 if self._max_url_len: 30 wsgi_kwargs[‘url_length_limit‘] = self._max_url_len 31 32 self._server = eventlet.spawn(**wsgi_kwargs)
至此,cinder-api启动顺利启动
2、cinder-scheduler
入口文件为/usr/bin/cinder-scheduler,则实际调用文件为cinder/cmd/scheduler.py下的main
1 #!/usr/bin/python2 2 # PBR Generated from u‘console_scripts‘ 3 4 import sys 5 6 from cinder.cmd.scheduler import main 7 8 9 if __name__ == "__main__": 10 sys.exit(main())
1 def main(): 2 objects.register_all() 3 gmr_opts.set_defaults(CONF) 4 CONF(sys.argv[1:], project=‘cinder‘, 5 version=version.version_string()) 6 logging.setup(CONF, "cinder") 7 python_logging.captureWarnings(True) 8 utils.monkey_patch() 9 gmr.TextGuruMeditation.setup_autorun(version, conf=CONF) 10 server = service.Service.create(binary=‘cinder-scheduler‘) 11 service.serve(server) 12 service.wait()
实际启动服务的只有10,11,12行,通过Service对象的类方法create创建一个名server的service,然后用serve方法(实际调用launch对service进行处理),launch方法通过判断serve传进的worker参数来判断,传入的对象是process还是service,但是不管是service还是process,都是调用了launch_service这个接口,此处,同上述api所述,Launcher 对象的关键在于这个add方法,它将所有调用其进行launch的服务添加到Service()的service列表中,最终调用了添加的Service的start()方法。
1 @classmethod 2 def create(cls, host=None, binary=None, topic=None, manager=None, 3 report_interval=None, periodic_interval=None, 4 periodic_fuzzy_delay=None, service_name=None, 5 coordination=False, cluster=None, **kwargs): 6 if not host: 7 host = CONF.host 8 if not binary: 9 binary = os.path.basename(inspect.stack()[-1][1]) 10 if not topic: 11 topic = binary 12 if not manager: 13 subtopic = topic.rpartition(‘cinder-‘)[2] 14 manager = CONF.get(‘%s_manager‘ % subtopic, None) 15 if report_interval is None: 16 report_interval = CONF.report_interval 17 if periodic_interval is None: 18 periodic_interval = CONF.periodic_interval 19 if periodic_fuzzy_delay is None: 20 periodic_fuzzy_delay = CONF.periodic_fuzzy_delay 21 service_obj = cls(host, binary, topic, manager, 22 report_interval=report_interval, 23 periodic_interval=periodic_interval, 24 periodic_fuzzy_delay=periodic_fuzzy_delay, 25 service_name=service_name, 26 coordination=coordination, 27 cluster=cluster, **kwargs) 28 29 return service_obj
1 def serve(server, workers=None): 2 global _launcher 3 if _launcher: 4 raise RuntimeError(_(‘serve() can only be called once‘)) 5 6 _launcher = service.launch(CONF, server, workers=workers)
1 def launch(conf, service, workers=1, restart_method=‘reload‘): 2 """Launch a service with a given number of workers. 3 4 :param conf: an instance of ConfigOpts 5 :param service: a service to launch, must be an instance of 6 :class:`oslo_service.service.ServiceBase` 7 :param workers: a number of processes in which a service will be running 8 :param restart_method: Passed to the constructed launcher. If ‘reload‘, the 9 launcher will call reload_config_files on SIGHUP. If ‘mutate‘, it will 10 call mutate_config_files on SIGHUP. Other values produce a ValueError. 11 :returns: instance of a launcher that was used to launch the service 12 """ 13 14 if workers is not None and workers <= 0: 15 raise ValueError(_("Number of workers should be positive!")) 16 17 if workers is None or workers == 1: 18 launcher = ServiceLauncher(conf, restart_method=restart_method) 19 else: 20 launcher = ProcessLauncher(conf, restart_method=restart_method) 21 launcher.launch_service(service, workers=workers) 22 23 return launcher
至此,cinder-scheduler启动完成
3、cinder-volume
cinder-volume的入口文件为/usr/bin/cinder-volume,由此可知真正的入口函数为cinder/cmd/volume.py中的main函数
1 #!/usr/bin/python2 2 # PBR Generated from u‘console_scripts‘ 3 4 import sys 5 6 from cinder.cmd.volume import main 7 8 9 if __name__ == "__main__": 10 sys.exit(main())
1 def _launch_services_win32(): 2 if CONF.backend_name and CONF.backend_name not in CONF.enabled_backends: 3 msg = _(‘The explicitly passed backend name "%(backend_name)s" is not ‘ 4 ‘among the enabled backends: %(enabled_backends)s.‘) 5 raise exception.InvalidInput( 6 reason=msg % dict(backend_name=CONF.backend_name, 7 enabled_backends=CONF.enabled_backends)) 8 9 # We‘ll avoid spawning a subprocess if a single backend is requested. 10 single_backend_name = (CONF.enabled_backends[0] 11 if len(CONF.enabled_backends) == 1 12 else CONF.backend_name) 13 if single_backend_name: 14 launcher = service.get_launcher() 15 _launch_service(launcher, single_backend_name) 16 elif CONF.enabled_backends: 17 # We‘re using the ‘backend_name‘ argument, requesting a certain backend 18 # and constructing the service object within the child process. 19 launcher = service.WindowsProcessLauncher() 20 py_script_re = re.compile(r‘.*\.py\w?$‘) 21 for backend in filter(None, CONF.enabled_backends): 22 cmd = sys.argv + [‘--backend_name=%s‘ % backend] 23 # Recent setuptools versions will trim ‘-script.py‘ and ‘.exe‘ 24 # extensions from sys.argv[0]. 25 if py_script_re.match(sys.argv[0]): 26 cmd = [sys.executable] + cmd 27 launcher.add_process(cmd) 28 _notify_service_started() 29 30 _ensure_service_started() 31 32 launcher.wait() 33 34 35 def _launch_services_posix(): 36 launcher = service.get_launcher() 37 38 for backend in filter(None, CONF.enabled_backends): 39 _launch_service(launcher, backend) 40 41 _ensure_service_started() 42 43 launcher.wait() 44 45 46 def main(): 47 objects.register_all() 48 gmr_opts.set_defaults(CONF) 49 CONF(sys.argv[1:], project=‘cinder‘, 50 version=version.version_string()) 51 logging.setup(CONF, "cinder") 52 python_logging.captureWarnings(True) 53 priv_context.init(root_helper=shlex.split(utils.get_root_helper())) 54 utils.monkey_patch() 55 gmr.TextGuruMeditation.setup_autorun(version, conf=CONF) 56 global LOG 57 LOG = logging.getLogger(__name__) 58 59 if not CONF.enabled_backends: 60 LOG.error(‘Configuration for cinder-volume does not specify ‘ 61 ‘"enabled_backends". Using DEFAULT section to configure ‘ 62 ‘drivers is not supported since Ocata.‘) 63 sys.exit(1) 64 65 if os.name == ‘nt‘: 66 # We cannot use oslo.service to spawn multiple services on Windows. 67 # It relies on forking, which is not available on Windows. 68 # Furthermore, service objects are unmarshallable objects that are 69 # passed to subprocesses. 70 _launch_services_win32() 71 else: 72 _launch_services_posix()
根据平台是windows还是linux,进行不同的调用,因为是在linux上部署,所有调用的函数为_launch_services_posix(),其中调用了_launch_service(launcher, backend)创建Service对象,同上述cinder-schedule的启动流程,后面不再累述了~
openstack Rocky系列之Cinder:(一)Cinder服务启动
标签:exist server eve time() warnings 名称 custom client text
原文地址:https://www.cnblogs.com/sumoning/p/12111723.html