码迷,mamicode.com
首页 > 其他好文 > 详细

openstack Rocky系列之Cinder:(一)Cinder服务启动

时间:2020-01-02 20:26:32      阅读:80      评论:0      收藏:0      [点我收藏+]

标签:exist   server   eve   time()   warnings   名称   custom   client   text   

比较忙,很长世间没空看openstack源码,抽时间看了一下cinder的源码,贴下学习心得。本文简单写一下cinder的三个服务的启动,cinder-api, cinder-scheduler, 以及cinder-volume,三者启动都差不多

1、cinder-api

  入口文件为/usr/bin/cinder-api,由此可知,入口为cinder.cmd.api文件中的main函数 

 1 #!/usr/bin/python2                                                                                                                                                                                         
 2 # PBR Generated from u‘console_scripts‘
 3 
 4 import sys 
 5 
 6 from cinder.cmd.api import main
 7 
 8 
 9 if __name__ == "__main__":
10     sys.exit(main())

  main函数如下,主要关注14-16行即可

 1 def main():
 2     objects.register_all()
 3     gmr_opts.set_defaults(CONF)
 4     CONF(sys.argv[1:], project=cinder,
 5          version=version.version_string())
 6     config.set_middleware_defaults()
 7     logging.setup(CONF, "cinder")
 8     python_logging.captureWarnings(True)
 9     utils.monkey_patch()
10 
11     gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
12 
13     rpc.init(CONF)
14     launcher = service.process_launcher()
15     server = service.WSGIService(osapi_volume)
16     launcher.launch_service(server, workers=server.workers)
17     launcher.wait()

  14行创建一个 ProcessLauncher 对象,以便后续对app进行launch

  15行创建 WSGIService 对象,名称为 osapi_volume

  16行,launch对象调用launch_service方法对server进行处理,通过查看源码,是调用了ProcessLauncher对象的_start_child对服务进行处理

 1     def launch_service(self, service, workers=1):
 2         """Launch a service with a given number of workers.
 3 
 4        :param service: a service to launch, must be an instance of
 5               :class:`oslo_service.service.ServiceBase`
 6        :param workers: a number of processes in which a service
 7               will be running
 8         """
 9         _check_service_base(service)  #对象类型校验
10         wrap = ServiceWrapper(service, workers)  #对象包装
11 
12         # Hide existing objects from the garbage collector, so that most
13         # existing pages will remain in shared memory rather than being
14         # duplicated between subprocesses in the GC mark-and-sweep. (Requires
15         # Python 3.7 or later.)
16         if hasattr(gc, freeze):
17             gc.freeze()
18 
19         LOG.info(Starting %d workers, wrap.workers)
20         while self.running and len(wrap.children) < wrap.workers:
21             self._start_child(wrap)

      _start_child方法很简单,调用了os.fork()创建了一个子进程,如果创建子进程成功,再次调用_child_process方法

 1     def _start_child(self, wrap):
 2         if len(wrap.forktimes) > wrap.workers:
 3             # Limit ourselves to one process a second (over the period of
 4             # number of workers * 1 second). This will allow workers to
 5             # start up quickly but ensure we don‘t fork off children that
 6             # die instantly too quickly.
 7             if time.time() - wrap.forktimes[0] < wrap.workers:
 8                 LOG.info(Forking too fast, sleeping)
 9                 time.sleep(1)
10 
11             wrap.forktimes.pop(0)
12 
13         wrap.forktimes.append(time.time())
14 
15         pid = os.fork()
16         if pid == 0:
17             self.launcher = self._child_process(wrap.service)
18             while True:
19                 self._child_process_handle_signal()
20                 status, signo = self._child_wait_for_exit_or_signal(
21                     self.launcher)
22                 if not _is_sighup_and_daemon(signo):
23                     self.launcher.wait()
24                     break
25                 self.launcher.restart()
26 
27             os._exit(status)
28 
29         LOG.debug(Started child %d, pid)
30 
31         wrap.children.add(pid)
32         self.children[pid] = wrap
33 
34         return pid
 1     def _child_process(self, service):
 2         self._child_process_handle_signal()
 3 
 4         # Reopen the eventlet hub to make sure we don‘t share an epoll
 5         # fd with parent and/or siblings, which would be bad
 6         eventlet.hubs.use_hub()
 7 
 8         # Close write to ensure only parent has it open
 9         os.close(self.writepipe)
10         # Create greenthread to watch for parent to close pipe
11         eventlet.spawn_n(self._pipe_watcher)
12 
13         # Reseed random number generator
14         random.seed()
15 
16         launcher = Launcher(self.conf, restart_method=self.restart_method)
17         launcher.launch_service(service)
18         return launcher

      _child_process方法中调用了eventlet,获取hub,并且创建一个线程,对进程进行观察,同时创建一个Launcher对象,对服务进行lanch,launch_service

 1     def launch_service(self, service, workers=1):
 2         """Load and start the given service.
 3 
 4         :param service: The service you would like to start, must be an
 5                         instance of :class:`oslo_service.service.ServiceBase`
 6         :param workers: This param makes this method compatible with
 7                         ProcessLauncher.launch_service. It must be None, 1 or
 8                         omitted.
 9         :returns: None
10 
11         """
12         if workers is not None and workers != 1:
13             raise ValueError(_("Launcher asked to start multiple workers"))
14         _check_service_base(service)
15         service.backdoor_port = self.backdoor_port
16         self.services.add(service)   #关键

Launcher 对象的关键在于这个add方法,它将所有调用其进行launch的服务添加到Service()的service列表中,最终调用了添加的service的start()方法

 1     def add(self, service):
 2         """Add a service to a list and create a thread to run it.
 3 
 4         :param service: service to run
 5         """
 6         self.services.append(service)
 7         self.tg.add_thread(self.run_service, service, self.done)
 8 
 9 @staticmethod
10     def run_service(service, done):
11         """Service start wrapper.
12 
13         :param service: service to run
14         :param done: event to wait on until a shutdown is triggered
15         :returns: None
16 
17         """
18         try:
19             service.start()
20         except Exception:
21             LOG.exception(Error starting thread.)
22             raise SystemExit(1)
23         else:
24             done.wait()

  这个service即最初提到的WSGIService 对象,查看一下其start方法

 1     def start(self):
 2         """Start serving this service using loaded configuration.
 3 
 4         Also, retrieve updated port number in case ‘0‘ was passed in, which
 5         indicates a random port should be used.
 6 
 7         :returns: None
 8 
 9         """
10         if self.manager:
11             self.manager.init_host()
12         self.server.start()
13         self.port = self.server.port

  此时self.manager为None,关键执行步骤为self.server.start(),这个server为WSGIService 进行init的时候构造的对象

 1 class WSGIService(service.ServiceBase):
 2     """Provides ability to launch API from a ‘paste‘ configuration."""
 3 
 4     def __init__(self, name, loader=None):
 5         """Initialize, but do not start the WSGI server.
 6 
 7         :param name: The name of the WSGI server given to the loader.
 8         :param loader: Loads the WSGI application using the given name.
 9         :returns: None
10 
11         """
12         self.name = name
13         self.manager = self._get_manager()
14         self.loader = loader or wsgi.Loader(CONF)
15         self.app = self.loader.load_app(name)
16         self.host = getattr(CONF, %s_listen % name, "0.0.0.0")
17         self.port = getattr(CONF, %s_listen_port % name, 0)
18         self.use_ssl = getattr(CONF, %s_use_ssl % name, False)
19         self.workers = (getattr(CONF, %s_workers % name, None) or
20                         processutils.get_worker_count())
21         if self.workers and self.workers < 1:
22             worker_name = %s_workers % name
23             msg = (_("%(worker_name)s value of %(workers)d is invalid, "
24                      "must be greater than 0.") %
25                    {worker_name: worker_name,
26                     workers: self.workers})
27             raise exception.InvalidConfigurationValue(msg)
28         setup_profiler(name, self.host)
29 
30         self.server = wsgi.Server(CONF,
31                                   name,
32                                   self.app,
33                                   host=self.host,
34                                   port=self.port,
35                                   use_ssl=self.use_ssl)   # 这里
 1     def start(self):
 2         """Start serving a WSGI application.
 3 
 4         :returns: None
 5         """
 6         # The server socket object will be closed after server exits,
 7         # but the underlying file descriptor will remain open, and will
 8         # give bad file descriptor error. So duplicating the socket object,
 9         # to keep file descriptor usable.
10 
11         self.dup_socket = self.socket.dup()
12 
13         if self._use_ssl:
14             self.dup_socket = sslutils.wrap(self.conf, self.dup_socket)
15 
16         wsgi_kwargs = {
17             func: eventlet.wsgi.server,
18             sock: self.dup_socket,
19             site: self.app,
20             protocol: self._protocol,
21             custom_pool: self._pool,
22             log: self._logger,
23             log_format: self.conf.wsgi_log_format,
24             debug: False,
25             keepalive: self.conf.wsgi_keep_alive,
26             socket_timeout: self.client_socket_timeout
27             }
28 
29         if self._max_url_len:
30             wsgi_kwargs[url_length_limit] = self._max_url_len
31 
32         self._server = eventlet.spawn(**wsgi_kwargs)

  至此,cinder-api启动顺利启动

2、cinder-scheduler

  入口文件为/usr/bin/cinder-scheduler,则实际调用文件为cinder/cmd/scheduler.py下的main

 1 #!/usr/bin/python2                                                                                                                                                                                         
 2 # PBR Generated from u‘console_scripts‘
 3 
 4 import sys 
 5 
 6 from cinder.cmd.scheduler import main
 7 
 8 
 9 if __name__ == "__main__":
10     sys.exit(main())
 1 def main():
 2     objects.register_all()
 3     gmr_opts.set_defaults(CONF)
 4     CONF(sys.argv[1:], project=cinder,
 5          version=version.version_string())
 6     logging.setup(CONF, "cinder")
 7     python_logging.captureWarnings(True)
 8     utils.monkey_patch()
 9     gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
10     server = service.Service.create(binary=cinder-scheduler)
11     service.serve(server)
12     service.wait()

  实际启动服务的只有10,11,12行,通过Service对象的类方法create创建一个名server的service,然后用serve方法(实际调用launch对service进行处理),launch方法通过判断serve传进的worker参数来判断,传入的对象是process还是service,但是不管是service还是process,都是调用了launch_service这个接口,此处,同上述api所述,Launcher 对象的关键在于这个add方法,它将所有调用其进行launch的服务添加到Service()的service列表中,最终调用了添加的Service的start()方法。

 1     @classmethod
 2     def create(cls, host=None, binary=None, topic=None, manager=None,
 3                report_interval=None, periodic_interval=None,
 4                periodic_fuzzy_delay=None, service_name=None,
 5                coordination=False, cluster=None, **kwargs):
 6         if not host:
 7             host = CONF.host
 8         if not binary:
 9             binary = os.path.basename(inspect.stack()[-1][1])
10         if not topic:
11             topic = binary
12         if not manager:
13             subtopic = topic.rpartition(cinder-)[2]
14             manager = CONF.get(%s_manager % subtopic, None)
15         if report_interval is None:
16             report_interval = CONF.report_interval
17         if periodic_interval is None:
18             periodic_interval = CONF.periodic_interval
19         if periodic_fuzzy_delay is None:
20             periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
21         service_obj = cls(host, binary, topic, manager,
22                           report_interval=report_interval,
23                           periodic_interval=periodic_interval,
24                           periodic_fuzzy_delay=periodic_fuzzy_delay,
25                           service_name=service_name,
26                           coordination=coordination,
27                           cluster=cluster, **kwargs)
28 
29         return service_obj
1 def serve(server, workers=None):
2     global _launcher
3     if _launcher:
4         raise RuntimeError(_(serve() can only be called once))
5 
6     _launcher = service.launch(CONF, server, workers=workers)
 1 def launch(conf, service, workers=1, restart_method=reload):
 2     """Launch a service with a given number of workers.
 3 
 4     :param conf: an instance of ConfigOpts
 5     :param service: a service to launch, must be an instance of
 6            :class:`oslo_service.service.ServiceBase`
 7     :param workers: a number of processes in which a service will be running
 8     :param restart_method: Passed to the constructed launcher. If ‘reload‘, the
 9         launcher will call reload_config_files on SIGHUP. If ‘mutate‘, it will
10         call mutate_config_files on SIGHUP. Other values produce a ValueError.
11     :returns: instance of a launcher that was used to launch the service
12     """
13 
14     if workers is not None and workers <= 0:
15         raise ValueError(_("Number of workers should be positive!"))
16 
17     if workers is None or workers == 1:
18         launcher = ServiceLauncher(conf, restart_method=restart_method)
19     else:
20         launcher = ProcessLauncher(conf, restart_method=restart_method)
21     launcher.launch_service(service, workers=workers)
22 
23     return launcher

  至此,cinder-scheduler启动完成

3、cinder-volume

  cinder-volume的入口文件为/usr/bin/cinder-volume,由此可知真正的入口函数为cinder/cmd/volume.py中的main函数

 1 #!/usr/bin/python2                                                                                                                                                                                         
 2 # PBR Generated from u‘console_scripts‘
 3 
 4 import sys 
 5 
 6 from cinder.cmd.volume import main
 7 
 8 
 9 if __name__ == "__main__":
10     sys.exit(main())
 1 def _launch_services_win32():
 2     if CONF.backend_name and CONF.backend_name not in CONF.enabled_backends:
 3         msg = _(The explicitly passed backend name "%(backend_name)s" is not 
 4                 among the enabled backends: %(enabled_backends)s.)
 5         raise exception.InvalidInput(
 6             reason=msg % dict(backend_name=CONF.backend_name,
 7                               enabled_backends=CONF.enabled_backends))
 8 
 9     # We‘ll avoid spawning a subprocess if a single backend is requested.
10     single_backend_name = (CONF.enabled_backends[0]
11                            if len(CONF.enabled_backends) == 1
12                            else CONF.backend_name)
13     if single_backend_name:
14         launcher = service.get_launcher()
15         _launch_service(launcher, single_backend_name)
16     elif CONF.enabled_backends:
17         # We‘re using the ‘backend_name‘ argument, requesting a certain backend
18         # and constructing the service object within the child process.
19         launcher = service.WindowsProcessLauncher()
20         py_script_re = re.compile(r.*\.py\w?$)
21         for backend in filter(None, CONF.enabled_backends):
22             cmd = sys.argv + [--backend_name=%s % backend]
23             # Recent setuptools versions will trim ‘-script.py‘ and ‘.exe‘
24             # extensions from sys.argv[0].
25             if py_script_re.match(sys.argv[0]):
26                 cmd = [sys.executable] + cmd
27             launcher.add_process(cmd)
28             _notify_service_started()
29 
30     _ensure_service_started()
31 
32     launcher.wait()
33 
34 
35 def _launch_services_posix():
36     launcher = service.get_launcher()
37 
38     for backend in filter(None, CONF.enabled_backends):
39         _launch_service(launcher, backend)
40 
41     _ensure_service_started()
42 
43     launcher.wait()
44 
45 
46 def main():
47     objects.register_all()
48     gmr_opts.set_defaults(CONF)
49     CONF(sys.argv[1:], project=cinder,
50          version=version.version_string())
51     logging.setup(CONF, "cinder")
52     python_logging.captureWarnings(True)
53     priv_context.init(root_helper=shlex.split(utils.get_root_helper()))
54     utils.monkey_patch()
55     gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
56     global LOG
57     LOG = logging.getLogger(__name__)
58 
59     if not CONF.enabled_backends:
60         LOG.error(Configuration for cinder-volume does not specify 
61                   "enabled_backends". Using DEFAULT section to configure 
62                   drivers is not supported since Ocata.)
63         sys.exit(1)
64 
65     if os.name == nt:
66         # We cannot use oslo.service to spawn multiple services on Windows.
67         # It relies on forking, which is not available on Windows.
68         # Furthermore, service objects are unmarshallable objects that are
69         # passed to subprocesses.
70         _launch_services_win32()
71     else:
72         _launch_services_posix()

  根据平台是windows还是linux,进行不同的调用,因为是在linux上部署,所有调用的函数为_launch_services_posix(),其中调用了_launch_service(launcher, backend)创建Service对象,同上述cinder-schedule的启动流程,后面不再累述了~

 

openstack Rocky系列之Cinder:(一)Cinder服务启动

标签:exist   server   eve   time()   warnings   名称   custom   client   text   

原文地址:https://www.cnblogs.com/sumoning/p/12111723.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!