标签:
nova boot创建VM的流程大致为:
1. novaclient发送HTTP请求到nova-api(这里内部细节包括keystone对用户的验证及用户从keystone获取token和endpoints等信息,具体参考《keystone WSGI流程》)。
2. nova-api通过rpc调用到nova-conductor。
3. nova-conductor调用rpc进入nova-scheduler进行compute节点的选择,nova-scheduler将compute节点选择的信息的返回给nova-conductor。
4.最后nova-conductor执行rpc调用到nova-compute到选择的compute创建VM。
如下图所示:
由于网上有很多介绍这方面的VM创建流程,所以我有些细节会一带而过,本篇文章将着重分析从创建VM开始到进行Claim机制的代码流程。
#/nova/api/openstack/compute/servers.py:Controller @wsgi.response(202) def create(self, req, body): """Creates a new server for a given user.""" if not self.is_valid_body(body, 'server'): raise exc.HTTPUnprocessableEntity() context = req.environ['nova.context'] server_dict = body['server'] password = self._get_server_admin_password(server_dict) if 'name' not in server_dict: msg = _("Server name is not defined") raise exc.HTTPBadRequest(explanation=msg) name = server_dict['name'] self._validate_server_name(name) name = name.strip() image_uuid = self._image_from_req_data(body) personality = server_dict.get('personality') config_drive = None if self.ext_mgr.is_loaded('os-config-drive'): config_drive = server_dict.get('config_drive') ... ... ... try: _get_inst_type = flavors.get_flavor_by_flavor_id inst_type = _get_inst_type(flavor_id, ctxt=context, read_deleted="no") (instances, resv_id) = self.compute_api.create(context, inst_type, image_uuid, display_name=name, display_description=name, key_name=key_name, metadata=server_dict.get('metadata', {}), access_ip_v4=access_ip_v4, access_ip_v6=access_ip_v6, injected_files=injected_files, admin_password=password, min_count=min_count, max_count=max_count, requested_networks=requested_networks, security_group=sg_names, user_data=user_data, availability_zone=availability_zone, config_drive=config_drive, block_device_mapping=block_device_mapping, auto_disk_config=auto_disk_config, scheduler_hints=scheduler_hints, legacy_bdm=legacy_bdm, check_server_group_quota=check_server_group_quota) except (exception.QuotaError, exception.PortLimitExceeded) as error: raise exc.HTTPForbidden( explanation=error.format_message(), headers={'Retry-After': 0}) except messaging.RemoteError as err: msg = "%(err_type)s: %(err_msg)s" % {'err_type': err.exc_type, 'err_msg': err.value} raise exc.HTTPBadRequest(explanation=msg) except UnicodeDecodeError as error: msg = "UnicodeError: %s" % error raise exc.HTTPBadRequest(explanation=msg) except Exception as error: # The remaining cases can be handled in a standard fashion. self._handle_create_exception(*sys.exc_info()) # If the caller wanted a reservation_id, return it if ret_resv_id: return wsgi.ResponseObject({'reservation_id': resv_id}) req.cache_db_instances(instances) server = self._view_builder.create(req, instances[0]) if CONF.enable_instance_password: server['server']['adminPass'] = password robj = wsgi.ResponseObject(server) return self._add_location(robj)
当novaclient发送HTTP请求,最终通过nova-api服务启动时创建的WSGI server路由到/nova/api/openstack/compute/servers.py:Controller中的create函数,在调用nova-api的接口之前,首先对从HTTP请求的获取的req和body中的相关信息进行验证,验证完成后调用nova-api的create函数。其中nova-api有两种类型,即根据配置参数进行选择self.compute_api的类型。如下:
#/nova/compute/__init__.py CELL_TYPE_TO_CLS_NAME = {'api': 'nova.compute.cells_api.ComputeCellsAPI', 'compute': 'nova.compute.api.API', None: 'nova.compute.api.API', } def _get_compute_api_class_name(): """Returns the name of compute API class.""" cell_type = nova.cells.opts.get_cell_type() return CELL_TYPE_TO_CLS_NAME[cell_type] def API(*args, **kwargs): class_name = _get_compute_api_class_name() return importutils.import_object(class_name, *args, **kwargs) #/nova/cells/opts.py def get_cell_type(): """Return the cell type, 'api', 'compute', or None (if cells is disabled). """ if not CONF.cells.enable: return return CONF.cells.cell_type
即根据/etc/nova/nova.conf配置文件cells字段下面的enable参数来设置nova-api的类型,由于我们的环境采用默认配置,即enable=false,所以get_cell_type函数将返回None值,因此nova-api为nova.compute.api.API,即self.compute_api为nova.compute.api.API对象。所以将会调用nova.compute.api.API中的create函数。
#/nova/compute/api.py:API @hooks.add_hook("create_instance") def create(self, context, instance_type, image_href, kernel_id=None, ramdisk_id=None, min_count=None, max_count=None, display_name=None, display_description=None, key_name=None, key_data=None, security_group=None, availability_zone=None, user_data=None, metadata=None, injected_files=None, admin_password=None, block_device_mapping=None, access_ip_v4=None, access_ip_v6=None, requested_networks=None, config_drive=None, auto_disk_config=None, scheduler_hints=None, legacy_bdm=True, shutdown_terminate=False, check_server_group_quota=False): """Provision instances, sending instance information to the scheduler. The scheduler will determine where the instance(s) go and will handle creating the DB entries. Returns a tuple of (instances, reservation_id) """ self._check_create_policies(context, availability_zone, requested_networks, block_device_mapping) if requested_networks and max_count > 1: self._check_multiple_instances_and_specified_ip(requested_networks) if utils.is_neutron(): self._check_multiple_instances_neutron_ports( requested_networks) return self._create_instance( context, instance_type, image_href, kernel_id, ramdisk_id, min_count, max_count, display_name, display_description, key_name, key_data, security_group, availability_zone, user_data, metadata, injected_files, admin_password, access_ip_v4, access_ip_v6, requested_networks, config_drive, block_device_mapping, auto_disk_config, scheduler_hints=scheduler_hints, legacy_bdm=legacy_bdm, shutdown_terminate=shutdown_terminate, check_server_group_quota=check_server_group_quota) #/nova/compute/api.py:API def _create_instance(self, context, instance_type, image_href, kernel_id, ramdisk_id, min_count, max_count, display_name, display_description, key_name, key_data, security_groups, availability_zone, user_data, metadata, injected_files, admin_password, access_ip_v4, access_ip_v6, requested_networks, config_drive, block_device_mapping, auto_disk_config, reservation_id=None, scheduler_hints=None, legacy_bdm=True, shutdown_terminate=False, check_server_group_quota=False): """Verify all the input parameters regardless of the provisioning strategy being performed and schedule the instance(s) for creation. """ # Normalize and setup some parameters if reservation_id is None: reservation_id = utils.generate_uid('r') security_groups = security_groups or ['default'] min_count = min_count or 1 max_count = max_count or min_count block_device_mapping = block_device_mapping or [] if not instance_type: instance_type = flavors.get_default_flavor() if image_href: image_id, boot_meta = self._get_image(context, image_href) else: image_id = None boot_meta = self._get_bdm_image_metadata( context, block_device_mapping, legacy_bdm) self._check_auto_disk_config(image=boot_meta, auto_disk_config=auto_disk_config) handle_az = self._handle_availability_zone availability_zone, forced_host, forced_node = handle_az(context, availability_zone) if not self.skip_policy_check and (forced_host or forced_node): check_policy(context, 'create:forced_host', {}) base_options, max_net_count = self._validate_and_build_base_options( context, instance_type, boot_meta, image_href, image_id, kernel_id, ramdisk_id, display_name, display_description, key_name, key_data, security_groups, availability_zone, forced_host, user_data, metadata, injected_files, access_ip_v4, access_ip_v6, requested_networks, config_drive, auto_disk_config, reservation_id, max_count) # max_net_count is the maximum number of instances requested by the # user adjusted for any network quota constraints, including # considertaion of connections to each requested network if max_net_count == 0: raise exception.PortLimitExceeded() elif max_net_count < max_count: LOG.debug("max count reduced from %(max_count)d to " "%(max_net_count)d due to network port quota", {'max_count': max_count, 'max_net_count': max_net_count}) max_count = max_net_count block_device_mapping = self._check_and_transform_bdm(context, base_options, instance_type, boot_meta, min_count, max_count, block_device_mapping, legacy_bdm) instance_group = self._get_requested_instance_group(context, scheduler_hints, check_server_group_quota) instances = self._provision_instances(context, instance_type, min_count, max_count, base_options, boot_meta, security_groups, block_device_mapping, shutdown_terminate, instance_group, check_server_group_quota) filter_properties = self._build_filter_properties(context, scheduler_hints, forced_host, forced_node, instance_type, base_options.get('pci_requests')) for instance in instances: self._record_action_start(context, instance, instance_actions.CREATE) self.compute_task_api.build_instances(context, instances=instances, image=boot_meta, filter_properties=filter_properties, admin_password=admin_password, injected_files=injected_files, requested_networks=requested_networks, security_groups=security_groups, block_device_mapping=block_device_mapping, legacy_bdm=False) return (instances, reservation_id)
上述的代码也主要是一些验证和参数的组装操作。下一步执行nova-conductor中的build_instances函数。
#/nova/compute/api.py:API @property def compute_task_api(self): if self._compute_task_api is None: # TODO(alaski): Remove calls into here from conductor manager so # that this isn't necessary. #1180540 from nova import conductor self._compute_task_api = conductor.ComputeTaskAPI() return self._compute_task_api #/nova/conductor/__init__.py def ComputeTaskAPI(*args, **kwargs): use_local = kwargs.pop('use_local', False) if oslo_config.cfg.CONF.conductor.use_local or use_local: api = conductor_api.LocalComputeTaskAPI else: api = conductor_api.ComputeTaskAPI return api(*args, **kwargs)
这里nova-conductor的API也是根据/etc/nova/nova.conf配置文件中的conductor字段中的use_local参数值或者从上层函数传递下来的use_local值进行设置的,本环境中根据/etc/nova/nova.conf配置文件进行设置,即use_local采用默认值,use_local=false。所以self.compute_task_api为/nova/conductor/api.py:ComputeTaskAPI对象。因此最终调用/nova/conductor/api.py:ComputeTaskAPI中的build_instances函数。
#/nova/conductor/api.py:ComputeTaskAPI class ComputeTaskAPI(object): """ComputeTask API that queues up compute tasks for nova-conductor.""" def __init__(self): self.conductor_compute_rpcapi = rpcapi.ComputeTaskAPI() def build_instances(self, context, instances, image, filter_properties, admin_password, injected_files, requested_networks, security_groups, block_device_mapping, legacy_bdm=True): self.conductor_compute_rpcapi.build_instances(context, instances=instances, image=image, filter_properties=filter_properties, admin_password=admin_password, injected_files=injected_files, requested_networks=requested_networks, security_groups=security_groups, block_device_mapping=block_device_mapping, legacy_bdm=legacy_bdm) #/nova/conductor/rpcapi.py:ComputeTaskAPI def build_instances(self, context, instances, image, filter_properties, admin_password, injected_files, requested_networks, security_groups, block_device_mapping, legacy_bdm=True): image_p = jsonutils.to_primitive(image) version = '1.10' if not self.client.can_send_version(version): version = '1.9' if 'instance_type' in filter_properties: flavor = filter_properties['instance_type'] flavor_p = objects_base.obj_to_primitive(flavor) filter_properties = dict(filter_properties, instance_type=flavor_p) kw = {'instances': instances, 'image': image_p, 'filter_properties': filter_properties, 'admin_password': admin_password, 'injected_files': injected_files, 'requested_networks': requested_networks, 'security_groups': security_groups} if not self.client.can_send_version(version): version = '1.8' kw['requested_networks'] = kw['requested_networks'].as_tuples() if not self.client.can_send_version('1.7'): version = '1.5' bdm_p = objects_base.obj_to_primitive(block_device_mapping) kw.update({'block_device_mapping': bdm_p, 'legacy_bdm': legacy_bdm}) cctxt = self.client.prepare(version=version) cctxt.cast(context, 'build_instances', **kw)
从上面的代码分析可以看出,最终通过RPC的cast去调用/nova/conductor/manager.py:ComputeTaskManager中的build_instances函数,这里说明一下这个为什么通过RPC的cast调用就会调用到该位置的build_instances函数呢?这是因为在nova-conductor服务启动时,会去创建相关的RPC-server,而这些RPC-server创建时候将会去指定一些endpoints(与keystone中的endpoints含义不同,这里只是名称相同而已),而这些endpoints中包括一些对象列表,当RPC-client去调用相应的RPC-server中的函数时,则会在这些endpoints的对象列表中进行查找,然后调用相应的函数,比如这里的nova-conductor服务启动时,我们看看它加载了哪些endpoints呢?如下
#/nova/cmd/conductor.py def main(): config.parse_args(sys.argv) logging.setup(CONF, "nova") utils.monkey_patch() objects.register_all() gmr.TextGuruMeditation.setup_autorun(version) server = service.Service.create(binary='nova-conductor', topic=CONF.conductor.topic, manager=CONF.conductor.manager) workers = CONF.conductor.workers or processutils.get_worker_count() service.serve(server, workers=workers) service.wait() #/nova/service.py:Service def start(self): verstr = version.version_string_with_package() LOG.info(_LI('Starting %(topic)s node (version %(version)s)'), {'topic': self.topic, 'version': verstr}) self.basic_config_check() self.manager.init_host() self.model_disconnected = False ctxt = context.get_admin_context() ... ... ... LOG.debug("Creating RPC server for service %s", self.topic) target = messaging.Target(topic=self.topic, server=self.host) endpoints = [ self.manager, baserpc.BaseRPCAPI(self.manager.service_name, self.backdoor_port) ] endpoints.extend(self.manager.additional_endpoints) serializer = objects_base.NovaObjectSerializer() self.rpcserver = rpc.get_server(target, endpoints, serializer) self.rpcserver.start() self.manager.post_start_hook() LOG.debug("Join ServiceGroup membership for this service %s", self.topic) # Add service to the ServiceGroup membership group. self.servicegroup_api.join(self.host, self.topic, self) if self.periodic_enable: if self.periodic_fuzzy_delay: initial_delay = random.randint(0, self.periodic_fuzzy_delay) else: initial_delay = None self.tg.add_dynamic_timer(self.periodic_tasks, initial_delay=initial_delay, periodic_interval_max= self.periodic_interval_max) #/nova/conductor/manager.py:ConductorManager class ConductorManager(manager.Manager): """Mission: Conduct things. The methods in the base API for nova-conductor are various proxy operations performed on behalf of the nova-compute service running on compute nodes. Compute nodes are not allowed to directly access the database, so this set of methods allows them to get specific work done without locally accessing the database. The nova-conductor service also exposes an API in the 'compute_task' namespace. See the ComputeTaskManager class for details. """ target = messaging.Target(version='2.1') def __init__(self, *args, **kwargs): super(ConductorManager, self).__init__(service_name='conductor', *args, **kwargs) self.security_group_api = ( openstack_driver.get_openstack_security_group_driver()) self._network_api = None self._compute_api = None self.compute_task_mgr = ComputeTaskManager() self.cells_rpcapi = cells_rpcapi.CellsAPI() self.additional_endpoints.append(self.compute_task_mgr)
根据/nova/service.py:Service中创建nova-conductor的RPC-server可以看出,其endpoints有3个对象列表(其实这个接口是nova所有有关创建RPC-server都共用的接口,只是根据传递进去的manager值不同,为不同的nova服务创建不同的endpoints列表),nova-conductor的endpoints的3个对象列表分别为:
/nova/conductor/manager.py:ConductorManager
/nova/baserpc.py:BaseRPCAPI
/nova/conductor/manager.py:ComputeTaskManager
其中最后一个对象列表是通过endpoints.extend(self.manager.additional_endpoints)进行加入的。对于更加具体RPC的远程调用可参看我的其他几篇文章。
就拿本次调用build_instances来说,最终调用到/nova/conductor/manager.py:ComputeTaskManager的build_instances函数。
#/nova/conductor/Manager.py:ComputeTaskManager def build_instances(self, context, instances, image, filter_properties, admin_password, injected_files, requested_networks, security_groups, block_device_mapping=None, legacy_bdm=True): # TODO(ndipanov): Remove block_device_mapping and legacy_bdm in version # 2.0 of the RPC API. request_spec = scheduler_utils.build_request_spec(context, image, instances) # TODO(danms): Remove this in version 2.0 of the RPC API if (requested_networks and not isinstance(requested_networks, objects.NetworkRequestList)): requested_networks = objects.NetworkRequestList( objects=[objects.NetworkRequest.from_tuple(t) for t in requested_networks]) # TODO(melwitt): Remove this in version 2.0 of the RPC API flavor = filter_properties.get('instance_type') if flavor and not isinstance(flavor, objects.Flavor): # Code downstream may expect extra_specs to be populated since it # is receiving an object, so lookup the flavor to ensure this. flavor = objects.Flavor.get_by_id(context, flavor['id']) filter_properties = dict(filter_properties, instance_type=flavor) try: scheduler_utils.setup_instance_group(context, request_spec, filter_properties) # check retry policy. Rather ugly use of instances[0]... # but if we've exceeded max retries... then we really only # have a single instance. scheduler_utils.populate_retry(filter_properties, instances[0].uuid) hosts = self.scheduler_client.select_destinations(context, request_spec, filter_properties) except Exception as exc: updates = {'vm_state': vm_states.ERROR, 'task_state': None} for instance in instances: self._set_vm_state_and_notify( context, instance.uuid, 'build_instances', updates, exc, request_spec) return for (instance, host) in itertools.izip(instances, hosts): try: instance.refresh() except (exception.InstanceNotFound, exception.InstanceInfoCacheNotFound): LOG.debug('Instance deleted during build', instance=instance) continue local_filter_props = copy.deepcopy(filter_properties) scheduler_utils.populate_filter_properties(local_filter_props, host) # The block_device_mapping passed from the api doesn't contain # instance specific information bdms = objects.BlockDeviceMappingList.get_by_instance_uuid( context, instance.uuid) self.compute_rpcapi.build_and_run_instance(context, instance=instance, host=host['host'], image=image, request_spec=request_spec, filter_properties=local_filter_props, admin_password=admin_password, injected_files=injected_files, requested_networks=requested_networks, security_groups=security_groups, block_device_mapping=bdms, node=host['nodename'], limits=host['limits'])
其中
hosts =self.scheduler_client.select_destinations(context,
request_spec, filter_properties)
是nova-scheduler服务选择host的代码流程,其host的选择我们将在另外一篇文章进行学习。
在选择到合适的host后,执行下面的代码。
#/nova/compute/rpcapi.py:ComputeAPI def build_and_run_instance(self, ctxt, instance, host, image, request_spec, filter_properties, admin_password=None, injected_files=None, requested_networks=None, security_groups=None, block_device_mapping=None, node=None, limits=None): version = '4.0' if not self.client.can_send_version(version): version = '3.40' if not self.client.can_send_version(version): version = '3.36' if 'numa_topology' in limits and limits['numa_topology']: topology_limits = limits['numa_topology'] if node is not None: cnode = objects.ComputeNode.get_by_host_and_nodename( ctxt, host, node) else: cnode = ( objects.ComputeNode. get_first_node_by_host_for_old_compat( ctxt, host)) host_topology = objects.NUMATopology.obj_from_db_obj( cnode.numa_topology) limits['numa_topology'] = jsonutils.dumps( topology_limits.to_dict_legacy(host_topology)) if not self.client.can_send_version(version): version = '3.33' if 'instance_type' in filter_properties: flavor = filter_properties['instance_type'] flavor_p = objects_base.obj_to_primitive(flavor) filter_properties = dict(filter_properties, instance_type=flavor_p) if not self.client.can_send_version(version): version = '3.23' if requested_networks is not None: if utils.is_neutron(): requested_networks = [(network_id, address, port_id) for (network_id, address, port_id, _) in requested_networks.as_tuples()] else: requested_networks = [(network_id, address) for (network_id, address) in requested_networks.as_tuples()] cctxt = self.client.prepare(server=host, version=version) cctxt.cast(ctxt, 'build_and_run_instance', instance=instance, image=image, request_spec=request_spec, filter_properties=filter_properties, admin_password=admin_password, injected_files=injected_files, requested_networks=requested_networks, security_groups=security_groups, block_device_mapping=block_device_mapping, node=node, limits=limits)
最终通过RPC的cast调用去执行被选择的host上的nova-compute服务的相关代码,即执行被选择host的nova-compute服务的build_and_run_instance函数。即最终执行/nova/compute/manager.py中的build_and_run_instances函数。因为nova-compute创建RPC-server时的3个endpoints中有2个endpoints列表来自/nova/compute/manager.py中,即/nova/compute/manager.py:ComputeManager
/nova/compute/manager.py: _ComputeV4Proxy,且这两个对象都有相同的build_and_run_instances函数,不过/nova/compute/manager.py: _ComputeV4Proxy最终还是调用/nova/compute/manager.py:ComputeManager的build_and_run_instances函数。从下面可以看出。
#/nova/compute/manager.py:ComputeManager class ComputeManager(manager.Manager): """Manages the running instances from creation to destruction.""" target = messaging.Target(version='3.40') # How long to wait in seconds before re-issuing a shutdown # signal to a instance during power off. The overall # time to wait is set by CONF.shutdown_timeout. SHUTDOWN_RETRY_INTERVAL = 10 def __init__(self, compute_driver=None, *args, **kwargs): """Load configuration options and connect to the hypervisor.""" self.virtapi = ComputeVirtAPI(self) self.network_api = network.API() ... ... ... super(ComputeManager, self).__init__(service_name="compute", *args, **kwargs) self.additional_endpoints.append(_ComputeV4Proxy(self)) # NOTE(russellb) Load the driver last. It may call back into the # compute manager via the virtapi, so we want it to be fully # initialized before that happens. self.driver = driver.load_compute_driver(self.virtapi, compute_driver) self.use_legacy_block_device_info = self.driver.need_legacy_block_device_info #/nova/compute/manager.py:_ComputeV4Proxy class _ComputeV4Proxy(object): target = messaging.Target(version='4.0') def __init__(self, manager): self.manager = manager
所以最终我们可以看/nova/compute/manager.py:ComputeManager的build_and_run_instances函数的代码流程。
#/nova/compute/manager.py:ComputeManager # NOTE(mikal): No object_compat wrapper on this method because its # callers all pass objects already @wrap_exception() @reverts_task_state @wrap_instance_fault def build_and_run_instance(self, context, instance, image, request_spec, filter_properties, admin_password=None, injected_files=None, requested_networks=None, security_groups=None, block_device_mapping=None, node=None, limits=None): # NOTE(danms): Remove this in v4.0 of the RPC API if (requested_networks and not isinstance(requested_networks, objects.NetworkRequestList)): requested_networks = objects.NetworkRequestList( objects=[objects.NetworkRequest.from_tuple(t) for t in requested_networks]) # NOTE(melwitt): Remove this in v4.0 of the RPC API flavor = filter_properties.get('instance_type') if flavor and not isinstance(flavor, objects.Flavor): # Code downstream may expect extra_specs to be populated since it # is receiving an object, so lookup the flavor to ensure this. flavor = objects.Flavor.get_by_id(context, flavor['id']) filter_properties = dict(filter_properties, instance_type=flavor) # NOTE(sahid): Remove this in v4.0 of the RPC API if (limits and 'numa_topology' in limits and isinstance(limits['numa_topology'], six.string_types)): db_obj = jsonutils.loads(limits['numa_topology']) limits['numa_topology'] = ( objects.NUMATopologyLimits.obj_from_db_obj(db_obj)) @utils.synchronized(instance.uuid) def _locked_do_build_and_run_instance(*args, **kwargs): # NOTE(danms): We grab the semaphore with the instance uuid # locked because we could wait in line to build this instance # for a while and we want to make sure that nothing else tries # to do anything with this instance while we wait. with self._build_semaphore: self._do_build_and_run_instance(*args, **kwargs) # NOTE(danms): We spawn here to return the RPC worker thread back to # the pool. Since what follows could take a really long time, we don't # want to tie up RPC workers. utils.spawn_n(_locked_do_build_and_run_instance, context, instance, image, request_spec, filter_properties, admin_password, injected_files, requested_networks, security_groups, block_device_mapping, node, limits) #/nova/utils.py def spawn_n(func, *args, **kwargs): """Passthrough method for eventlet.spawn_n. This utility exists so that it can be stubbed for testing without interfering with the service spawns. It will also grab the context from the threadlocal store and add it to the store on the new thread. This allows for continuity in logging the context when using this method to spawn a new thread. """ _context = common_context.get_current() @functools.wraps(func) def context_wrapper(*args, **kwargs): # NOTE: If update_store is not called after spawn_n it won't be # available for the logger to pull from threadlocal storage. if _context is not None: _context.update_store() func(*args, **kwargs) eventlet.spawn_n(context_wrapper, *args, **kwargs)
这里开启了一个新的线程去执行_locked_do_build_and_run_instance函数。
#/nova/compute/manager.py:ComputeManager @hooks.add_hook('build_instance') @wrap_exception() @reverts_task_state @wrap_instance_event @wrap_instance_fault def _do_build_and_run_instance(self, context, instance, image, request_spec, filter_properties, admin_password, injected_files, requested_networks, security_groups, block_device_mapping, node=None, limits=None): try: LOG.info(_LI('Starting instance...'), context=context, instance=instance) instance.vm_state = vm_states.BUILDING instance.task_state = None instance.save(expected_task_state= (task_states.SCHEDULING, None)) except exception.InstanceNotFound: msg = 'Instance disappeared before build.' LOG.debug(msg, instance=instance) return build_results.FAILED except exception.UnexpectedTaskStateError as e: LOG.debug(e.format_message(), instance=instance) return build_results.FAILED # b64 decode the files to inject: decoded_files = self._decode_files(injected_files) if limits is None: limits = {} if node is None: node = self.driver.get_available_nodes(refresh=True)[0] LOG.debug('No node specified, defaulting to %s', node, instance=instance) try: self._build_and_run_instance(context, instance, image, decoded_files, admin_password, requested_networks, security_groups, block_device_mapping, node, limits, filter_properties) return build_results.ACTIVE except exception.RescheduledException as e: retry = filter_properties.get('retry', None) if not retry: # no retry information, do not reschedule. LOG.debug("Retry info not present, will not reschedule", instance=instance) self._cleanup_allocated_networks(context, instance, requested_networks) compute_utils.add_instance_fault_from_exc(context, instance, e, sys.exc_info()) self._set_instance_error_state(context, instance) return build_results.FAILED LOG.debug(e.format_message(), instance=instance) retry['exc'] = traceback.format_exception(*sys.exc_info()) # NOTE(comstud): Deallocate networks if the driver wants # us to do so. if self.driver.deallocate_networks_on_reschedule(instance): self._cleanup_allocated_networks(context, instance, requested_networks) else: # NOTE(alex_xu): Network already allocated and we don't # want to deallocate them before rescheduling. But we need # cleanup those network resource setup on this host before # rescheduling. self.network_api.cleanup_instance_network_on_host( context, instance, self.host) instance.task_state = task_states.SCHEDULING instance.save() self.compute_task_api.build_instances(context, [instance], image, filter_properties, admin_password, injected_files, requested_networks, security_groups, block_device_mapping) return build_results.RESCHEDULED except (exception.InstanceNotFound, exception.UnexpectedDeletingTaskStateError): msg = 'Instance disappeared during build.' LOG.debug(msg, instance=instance) self._cleanup_allocated_networks(context, instance, requested_networks) return build_results.FAILED except exception.BuildAbortException as e: LOG.exception(e.format_message(), instance=instance) self._cleanup_allocated_networks(context, instance, requested_networks) self._cleanup_volumes(context, instance.uuid, block_device_mapping, raise_exc=False) compute_utils.add_instance_fault_from_exc(context, instance, e, sys.exc_info()) self._set_instance_error_state(context, instance) return build_results.FAILED except Exception as e: # Should not reach here. msg = _LE('Unexpected build failure, not rescheduling build.') LOG.exception(msg, instance=instance) self._cleanup_allocated_networks(context, instance, requested_networks) self._cleanup_volumes(context, instance.uuid, block_device_mapping, raise_exc=False) compute_utils.add_instance_fault_from_exc(context, instance, e, sys.exc_info()) self._set_instance_error_state(context, instance) return build_results.FAILED
这里_do_build_and_run_instance函数调用_build_and_run_instance函数在本host去继续执行创建VM剩下的代码流程,如果在本host创建的过程中获得RescheduledException异常,则会根据filter_properties中的retry的值去判断是否重新调度到新的host上去创建VM。这里我们主要分析_build_and_run_instance函数在本host去继续执行创建VM剩下的代码流程。
#/nova/compute/manager.py:ComputeManager def _build_and_run_instance(self, context, instance, image, injected_files, admin_password, requested_networks, security_groups, block_device_mapping, node, limits, filter_properties): image_name = image.get('name') self._notify_about_instance_usage(context, instance, 'create.start', extra_usage_info={'image_name': image_name}) try: rt = self._get_resource_tracker(node) with rt.instance_claim(context, instance, limits) as inst_claim: # NOTE(russellb) It's important that this validation be done # *after* the resource tracker instance claim, as that is where # the host is set on the instance. self._validate_instance_group_policy(context, instance, filter_properties) with self._build_resources(context, instance, requested_networks, security_groups, image, block_device_mapping) as resources: instance.vm_state = vm_states.BUILDING instance.task_state = task_states.SPAWNING instance.numa_topology = inst_claim.claimed_numa_topology # NOTE(JoshNang) This also saves the changes to the # instance from _allocate_network_async, as they aren't # saved in that function to prevent races. instance.save(expected_task_state= task_states.BLOCK_DEVICE_MAPPING) block_device_info = resources['block_device_info'] network_info = resources['network_info'] self.driver.spawn(context, instance, image, injected_files, admin_password, network_info=network_info, block_device_info=block_device_info) except (exception.InstanceNotFound, exception.UnexpectedDeletingTaskStateError) as e: with excutils.save_and_reraise_exception(): self._notify_about_instance_usage(context, instance, 'create.end', fault=e) except exception.ComputeResourcesUnavailable as e: LOG.debug(e.format_message(), instance=instance) self._notify_about_instance_usage(context, instance, 'create.error', fault=e) raise exception.RescheduledException( instance_uuid=instance.uuid, reason=e.format_message()) except exception.BuildAbortException as e: with excutils.save_and_reraise_exception(): LOG.debug(e.format_message(), instance=instance) self._notify_about_instance_usage(context, instance, 'create.error', fault=e) except (exception.FixedIpLimitExceeded, exception.NoMoreNetworks, exception.NoMoreFixedIps) as e: LOG.warning(_LW('No more network or fixed IP to be allocated'), instance=instance) self._notify_about_instance_usage(context, instance, 'create.error', fault=e) msg = _('Failed to allocate the network(s) with error %s, ' 'not rescheduling.') % e.format_message() raise exception.BuildAbortException(instance_uuid=instance.uuid, reason=msg) except (exception.VirtualInterfaceCreateException, exception.VirtualInterfaceMacAddressException) as e: LOG.exception(_LE('Failed to allocate network(s)'), instance=instance) self._notify_about_instance_usage(context, instance, 'create.error', fault=e) msg = _('Failed to allocate the network(s), not rescheduling.') raise exception.BuildAbortException(instance_uuid=instance.uuid, reason=msg) except (exception.FlavorDiskTooSmall, exception.FlavorMemoryTooSmall, exception.ImageNotActive, exception.ImageUnacceptable) as e: self._notify_about_instance_usage(context, instance, 'create.error', fault=e) raise exception.BuildAbortException(instance_uuid=instance.uuid, reason=e.format_message()) except Exception as e: self._notify_about_instance_usage(context, instance, 'create.error', fault=e) raise exception.RescheduledException( instance_uuid=instance.uuid, reason=six.text_type(e)) # NOTE(alaski): This is only useful during reschedules, remove it now. instance.system_metadata.pop('network_allocated', None) self._update_instance_after_spawn(context, instance) try: instance.save(expected_task_state=task_states.SPAWNING) except (exception.InstanceNotFound, exception.UnexpectedDeletingTaskStateError) as e: with excutils.save_and_reraise_exception(): self._notify_about_instance_usage(context, instance, 'create.end', fault=e) self._update_scheduler_instance_info(context, instance) self._notify_about_instance_usage(context, instance, 'create.end', extra_usage_info={'message': _('Success')}, network_info=network_info)
在这里,首先使用Claim机制对在创建VM之间的主机的可用资源进行验证,看是否满足创建VM的要求,Claim机制主要解决的问题是:当一台主机被多个nova-scheduler同时选中并发送创建VM的请求时,这台主机并不一定有足够的资源来满足这些虚拟机的创建要求,所以需要在创建VM之前使用Claim机制对主机的资源进行验证,如果满足,则更新数据库,将VM申请的资源从主机可用的资源中减掉,如果后来创建失败或者将VM删除时,会再通过Claim加上之前减掉的部分。下面我们可以简单分析一下Claim机制的相关代码。
#/nova/compute/resource_tracker.py:ResourceTracker @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) def instance_claim(self, context, instance_ref, limits=None): """Indicate that some resources are needed for an upcoming compute instance build operation. This should be called before the compute node is about to perform an instance build operation that will consume additional resources. :param context: security context :param instance_ref: instance to reserve resources for :param limits: Dict of oversubscription limits for memory, disk, and CPUs. :returns: A Claim ticket representing the reserved resources. It can be used to revert the resource usage if an error occurs during the instance build. """ if self.disabled: # compute_driver doesn't support resource tracking, just # set the 'host' and node fields and continue the build: self._set_instance_host_and_node(context, instance_ref) return claims.NopClaim() # sanity checks: if instance_ref['host']: LOG.warning(_LW("Host field should not be set on the instance " "until resources have been claimed."), instance=instance_ref) if instance_ref['node']: LOG.warning(_LW("Node field should not be set on the instance " "until resources have been claimed."), instance=instance_ref) # get memory overhead required to build this instance: overhead = self.driver.estimate_instance_overhead(instance_ref) LOG.debug("Memory overhead for %(flavor)d MB instance; %(overhead)d " "MB", {'flavor': instance_ref['memory_mb'], 'overhead': overhead['memory_mb']}) claim = claims.Claim(context, instance_ref, self, self.compute_node, overhead=overhead, limits=limits) self._set_instance_host_and_node(context, instance_ref) instance_ref['numa_topology'] = claim.claimed_numa_topology # Mark resources in-use and update stats self._update_usage_from_instance(context, self.compute_node, instance_ref) elevated = context.elevated() # persist changes to the compute node: self._update(elevated, self.compute_node) return claim
我们重点看看instance_claim函数中创建Claim对象都做了什么操作。如下
#/nova/compute/claims.py:Claim class Claim(NopClaim): """A declaration that a compute host operation will require free resources. Claims serve as marker objects that resources are being held until the update_available_resource audit process runs to do a full reconciliation of resource usage. This information will be used to help keep the local compute hosts's ComputeNode model in sync to aid the scheduler in making efficient / more correct decisions with respect to host selection. """ def __init__(self, context, instance, tracker, resources, overhead=None, limits=None): super(Claim, self).__init__() # Stash a copy of the instance at the current point of time if isinstance(instance, obj_base.NovaObject): self.instance = instance.obj_clone() else: # This does not use copy.deepcopy() because it could be # a sqlalchemy model, and it's best to make sure we have # the primitive form. self.instance = jsonutils.to_primitive(instance) self._numa_topology_loaded = False self.tracker = tracker if not overhead: overhead = {'memory_mb': 0} self.overhead = overhead self.context = context # Check claim at constructor to avoid mess code # Raise exception ComputeResourcesUnavailable if claim failed self._claim_test(resources, limits) #/nova/compute/claims.py:Claim def _claim_test(self, resources, limits=None): """Test if this claim can be satisfied given available resources and optional oversubscription limits This should be called before the compute node actually consumes the resources required to execute the claim. :param resources: available local compute node resources :returns: Return true if resources are available to claim. """ if not limits: limits = {} # If an individual limit is None, the resource will be considered # unlimited: memory_mb_limit = limits.get('memory_mb') disk_gb_limit = limits.get('disk_gb') numa_topology_limit = limits.get('numa_topology') msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d " "GB") params = {'memory_mb': self.memory_mb, 'disk_gb': self.disk_gb} LOG.info(msg % params, instance=self.instance) reasons = [self._test_memory(resources, memory_mb_limit), self._test_disk(resources, disk_gb_limit), self._test_numa_topology(resources, numa_topology_limit), self._test_pci()] reasons = reasons + self._test_ext_resources(limits) reasons = [r for r in reasons if r is not None] if len(reasons) > 0: raise exception.ComputeResourcesUnavailable(reason= "; ".join(reasons)) LOG.info(_LI('Claim successful'), instance=self.instance)
从上述代码可以看出,在创建Claim对象时,它会调用_claim_test函数进行检测Memory和disk等信息。我们举Memory的检测方法进行简要说明。
#/nova/compute/claims.py:Claim def _test_memory(self, resources, limit): type_ = _("memory") unit = "MB" total = resources['memory_mb'] used = resources['memory_mb_used'] requested = self.memory_mb return self._test(type_, unit, total, used, requested, limit) #/nova/compute/claims.py:Claim def _test(self, type_, unit, total, used, requested, limit): """Test if the given type of resource needed for a claim can be safely allocated. """ LOG.info(_LI('Total %(type)s: %(total)d %(unit)s, used: %(used).02f ' '%(unit)s'), {'type': type_, 'total': total, 'unit': unit, 'used': used}, instance=self.instance) if limit is None: # treat resource as unlimited: LOG.info(_LI('%(type)s limit not specified, defaulting to ' 'unlimited'), {'type': type_}, instance=self.instance) return free = limit - used # Oversubscribed resource policy info: LOG.info(_LI('%(type)s limit: %(limit).02f %(unit)s, ' 'free: %(free).02f %(unit)s'), {'type': type_, 'limit': limit, 'free': free, 'unit': unit}, instance=self.instance) if requested > free: return (_('Free %(type)s %(free).02f ' '%(unit)s < requested %(requested)d %(unit)s') % {'type': type_, 'free': free, 'unit': unit, 'requested': requested})
Memory检测方法比较简单,就是简单比较底层的内存资源是否符合VM所要求的内存大小。如果不满足,则返回值(即不为None),如果满足,则默认返回None。这里需要说明的resources是#/nova/compute/resource_tracker.py:ResourceTracker对象中的compute_node属性。compute_node的信息是通过nova-conductor从数据库中更新的host资源信息的,而数据库中关于各个host的资源信息是通过nova-compute的periodic task进制去定时的上报底层资源到数据库的(具体参考nova-computePeriodic tasks 机制)。
继续回到_claim_test函数,当各种相关资源对比完成后,根据返回的值进行处理,即reasons是全部资源监测完成的返回结果,如果reasons中至少有一个不为None,即有至少一类(如disk)不满足创建VM的request规格参数,此时将raise一个ComputeResourcesUnavailable到上层,且会调用__exit__()方法将占用的资源返回给主机的可用资源中。这里我们考虑主机的可用资源满足新建VM的需求。在进行claim机制验证后,将执行self._set_instance_host_and_node(context, instance_ref) 方法,该方法将通过nova-conductor更新Instance的host、node与launched_on属性,然后执行self._update_usage_from_instance(...)方法来根据新建VM的需求去进行主机可用的资源。最后执行self._update(...)来根据上面的计算结果更新数据库。
至此,nova-compute中的claim机制分析完毕。这里需要对nova-compute中的claim机制和periodictask机制做一个说明(参考《OpenStack设计与实现》):Claim机制是在数据库当前数据的基础上去计算并更新(Claim机制获取的数据库中主机的信息是保存在/nova/compute/resource_tracker.py:ResourceTracker中的self.compute_node中),能够保证数据库里的可用资源及时更新,以便为nova-scheduler提供最新的数据。Periodictask机制是为了保证数据库内信息的准确性,它每次都会通过hypervisor获取主机的信息,并将这些信息更新到数据库中(定时更新到数据库中,默认时间间隔为60s,具体参考《nova-computePeriodic tasks 机制》)。标签:
原文地址:http://blog.csdn.net/gj19890923/article/details/51107018