标签:snapshot zone hang append exce you inject structs mirroring
上一篇文章写了cinder服务的启动,下面讲一下openstack是如何通过openstack创建一个卷
通过查看cinder的api-paste.ini文件,并且现在是v3版本的API,可以得知目前API的router文件是cinder/api/v3/router.py文件
通过查看router.py文件,可以得知,对于volume的操作都会通过mapper重定向到cinder/api/v3/volume.py文件中进行处理
看一下创建volume的源码
1 def create(self, req, body): 2 ........ 3 ........ 4 new_volume = self.volume_api.create(context, 5 size, 6 volume.get(‘display_name‘), 7 volume.get(‘display_description‘), 8 **kwargs) 9 10 retval = self._view_builder.detail(req, new_volume) 11 12 return retval
此处调用了self.volume_api.create()去创建卷,self.volume_api 这个变量是VolumeController从V2的api继承过来来的,在初始话的时候被初始化为cinder.volume.api.API(),所以其create方法为cinder/volume/api.py中API类下的create方法
1 def create(self, context, size, name, description, snapshot=None, 2 image_id=None, volume_type=None, metadata=None, 3 availability_zone=None, source_volume=None, 4 scheduler_hints=None, 5 source_replica=None, consistencygroup=None, 6 cgsnapshot=None, multiattach=False, source_cg=None, 7 group=None, group_snapshot=None, source_group=None, 8 backup=None): 9 ......... 10 ......... 11 create_what = { 12 ‘context‘: context, 13 ‘raw_size‘: size, 14 ‘name‘: name, 15 ‘description‘: description, 16 ‘snapshot‘: snapshot, 17 ‘image_id‘: image_id, 18 ‘raw_volume_type‘: volume_type, 19 ‘metadata‘: metadata or {}, 20 ‘raw_availability_zone‘: availability_zone, 21 ‘source_volume‘: source_volume, 22 ‘scheduler_hints‘: scheduler_hints, 23 ‘key_manager‘: self.key_manager, 24 ‘optional_args‘: {‘is_quota_committed‘: False}, 25 ‘consistencygroup‘: consistencygroup, 26 ‘cgsnapshot‘: cgsnapshot, 27 ‘raw_multiattach‘: multiattach, 28 ‘group‘: group, 29 ‘group_snapshot‘: group_snapshot, 30 ‘source_group‘: source_group, 31 ‘backup‘: backup, 32 } 33 try: 34 sched_rpcapi = (self.scheduler_rpcapi if ( 35 not cgsnapshot and not source_cg and 36 not group_snapshot and not source_group) 37 else None) 38 volume_rpcapi = (self.volume_rpcapi if ( 39 not cgsnapshot and not source_cg and 40 not group_snapshot and not source_group) 41 else None) 42 flow_engine = create_volume.get_flow(self.db, 43 self.image_service, 44 availability_zones, 45 create_what, 46 sched_rpcapi, 47 volume_rpcapi) 48 except Exception: 49 msg = _(‘Failed to create api volume flow.‘) 50 LOG.exception(msg) 51 raise exception.CinderException(msg)
此处调用了create_flow中的get_flow方法,进行传参和并创建,get_flow采用了taskflow,使用了taskflow中的线性流程,依次添加了ExtractVolumeRequestTesk(), QuotaReserveTask(), EntryCreateTask(), QuotaCommitTask() 以及VolumeCastTask()五个步骤
1 def get_flow(db_api, image_service_api, availability_zones, create_what, 2 scheduler_rpcapi=None, volume_rpcapi=None): 3 """Constructs and returns the api entrypoint flow. 4 5 This flow will do the following: 6 7 1. Inject keys & values for dependent tasks. 8 2. Extracts and validates the input keys & values. 9 3. Reserves the quota (reverts quota on any failures). 10 4. Creates the database entry. 11 5. Commits the quota. 12 6. Casts to volume manager or scheduler for further processing. 13 """ 14 15 flow_name = ACTION.replace(":", "_") + "_api" 16 api_flow = linear_flow.Flow(flow_name) 17 18 api_flow.add(ExtractVolumeRequestTask( 19 image_service_api, 20 availability_zones, 21 rebind={‘size‘: ‘raw_size‘, 22 ‘availability_zone‘: ‘raw_availability_zone‘, 23 ‘volume_type‘: ‘raw_volume_type‘, 24 ‘multiattach‘: ‘raw_multiattach‘})) 25 api_flow.add(QuotaReserveTask(), 26 EntryCreateTask(), 27 QuotaCommitTask()) 28 29 if scheduler_rpcapi and volume_rpcapi: 30 # This will cast it out to either the scheduler or volume manager via 31 # the rpc apis provided. 32 api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api)) 33 34 # Now load (but do not run) the flow using the provided initial data. 35 return taskflow.engines.load(api_flow, store=create_what)
taskflow会调用添加的每个步骤类的execute方法,taskflow是openstack中的一个重要组建,用于构建逻辑需要精准步骤的业务,涉及的东西比较多,暂时不在这里记录
ExcuactVolumeRequestTask类主要对传过来的参数进行校验,提取各类参数,并根据参数进行zone、镜像等选取的操作,并为QuotaReserveTask 类传递参数
QuotaReserveTask类进行配额检查以及占用
EntryCreateTask类主要是是调用cinder.objects.volume.Volume.create()方法在database中创建记录
QuotaCommitTask类在数据库中进行配额的确认
VolumeCastTask类通过rpc对任务进行投递投递的对象为schduler_rpcapi
scheduler_rpcapi在调用get_flow时已经指定
此时cinder-scheduler接收到cinder-api传过来的请求,发送请求的代码部分为 cinder/scheduler/rpcapi.py
1 def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None, 2 request_spec=None, filter_properties=None, 3 backup_id=None): 4 volume.create_worker() 5 cctxt = self._get_cctxt() 6 msg_args = {‘snapshot_id‘: snapshot_id, ‘image_id‘: image_id, 7 ‘request_spec‘: request_spec, 8 ‘filter_properties‘: filter_properties, 9 ‘volume‘: volume, ‘backup_id‘: backup_id} 10 if not self.client.can_send_version(‘3.10‘): 11 msg_args.pop(‘backup_id‘) 12 return cctxt.cast(ctxt, ‘create_volume‘, **msg_args)
此处同样,cinder-scheduler接收为cinder-cherduler的cinder/scheduler/manager.SchedulerManager
1 @objects.Volume.set_workers 2 @append_operation_type() 3 def create_volume(self, context, volume, snapshot_id=None, image_id=None, 4 request_spec=None, filter_properties=None, 5 backup_id=None): 6 self._wait_for_scheduler() 7 8 try: 9 flow_engine = create_volume.get_flow(context, 10 self.driver, 11 request_spec, 12 filter_properties, 13 volume, 14 snapshot_id, 15 image_id, 16 backup_id) 17 except Exception: 18 msg = _("Failed to create scheduler manager volume flow") 19 LOG.exception(msg) 20 raise exception.CinderException(msg) 21 22 with flow_utils.DynamicLogListener(flow_engine, logger=LOG): 23 flow_engine.run()
此处cinder-scheduler同样使用了taskflow模型对磁盘进行创建,看一下这个get_flow中包含的几个task类
1 def get_flow(context, driver_api, request_spec=None, 2 filter_properties=None, 3 volume=None, snapshot_id=None, image_id=None, backup_id=None): 4 5 create_what = { 6 ‘context‘: context, 7 ‘raw_request_spec‘: request_spec, 8 ‘filter_properties‘: filter_properties, 9 ‘volume‘: volume, 10 ‘snapshot_id‘: snapshot_id, 11 ‘image_id‘: image_id, 12 ‘backup_id‘: backup_id, 13 } 14 15 flow_name = ACTION.replace(":", "_") + "_scheduler" 16 scheduler_flow = linear_flow.Flow(flow_name) 17 18 # This will extract and clean the spec from the starting values. 19 scheduler_flow.add(ExtractSchedulerSpecTask( 20 rebind={‘request_spec‘: ‘raw_request_spec‘})) 21 22 # This will activate the desired scheduler driver (and handle any 23 # driver related failures appropriately). 24 scheduler_flow.add(ScheduleCreateVolumeTask(driver_api)) 25 26 # Now load (but do not run) the flow using the provided initial data. 27 return taskflow.engines.load(scheduler_flow, store=create_what)
ExtractSchedulerSpecTask 同样为对请求参数进行提取加工以供后续调用
ScheduleCreateVolumeTask中execute中有两个操作,1调用drvier_api进行volume的创建,2.如果创建过程中出现失败,则通过message将消息发送给scheduler
1 def execute(self, context, request_spec, filter_properties, volume): 2 try: 3 self.driver_api.schedule_create_volume(context, request_spec, 4 filter_properties) 5 except Exception as e: 6 self.message_api.create( 7 context, 8 message_field.Action.SCHEDULE_ALLOCATE_VOLUME, 9 resource_uuid=request_spec[‘volume_id‘], 10 exception=e) 11 with excutils.save_and_reraise_exception( 12 reraise=not isinstance(e, exception.NoValidBackend)): 13 try: 14 self._handle_failure(context, request_spec, e) 15 finally: 16 common.error_out(volume, reason=e)
此时driver_api为SchedulerManager初始化时的scheduler_driver
可知driver_api为cinder.cheduler.filter_scheduler.FilterScheduler
1 def schedule_create_volume(self, context, request_spec, filter_properties): 2 backend = self._schedule(context, request_spec, filter_properties) 3 4 if not backend: 5 raise exception.NoValidBackend(reason=_("No weighed backends " 6 "available")) 7 8 backend = backend.obj 9 volume_id = request_spec[‘volume_id‘] 10 11 updated_volume = driver.volume_update_db( 12 context, volume_id, 13 backend.host, 14 backend.cluster_name, 15 availability_zone=backend.service[‘availability_zone‘]) 16 self._post_select_populate_filter_properties(filter_properties, 17 backend) 18 19 # context is not serializable 20 filter_properties.pop(‘context‘, None) 21 22 self.volume_rpcapi.create_volume(context, updated_volume, request_spec, 23 filter_properties, 24 allow_reschedule=True)
self._schedule 通过传入的参数对后端的进行选择(多个后端的情况下)
最后调用self.volume_rpcapi.create_volume进行volume的创建,volume_api为volume_rpcapi.VolumeAPI()
1 def create_volume(self, ctxt, volume, request_spec, filter_properties, 2 allow_reschedule=True): 3 cctxt = self._get_cctxt(volume.service_topic_queue) 4 cctxt.cast(ctxt, ‘create_volume‘, 5 request_spec=request_spec, 6 filter_properties=filter_properties, 7 allow_reschedule=allow_reschedule, 8 volume=volume)
此时cinder-schedule通过rpc对cinder-volume发送创建volume的消息,接收消息的是cinder-volume的VolumeManager
1 @objects.Volume.set_workers 2 def create_volume(self, context, volume, request_spec=None, 3 filter_properties=None, allow_reschedule=True): 4 """Creates the volume.""" 5 utils.log_unsupported_driver_warning(self.driver) 6 7 self._set_resource_host(volume) 8 9 self._update_allocated_capacity(volume) 10 # We lose the host value if we reschedule, so keep it here 11 original_host = volume.host 12 13 context_elevated = context.elevated() 14 if filter_properties is None: 15 filter_properties = {} 16 17 if request_spec is None: 18 request_spec = objects.RequestSpec() 19 20 try: 21 # NOTE(flaper87): Driver initialization is 22 # verified by the task itself. 23 flow_engine = create_volume.get_flow( 24 context_elevated, 25 self, 26 self.db, 27 self.driver, 28 self.scheduler_rpcapi, 29 self.host, 30 volume, 31 allow_reschedule, 32 context, 33 request_spec, 34 filter_properties, 35 image_volume_cache=self.image_volume_cache, 36 ) 37 except Exception: 38 msg = _("Create manager volume flow failed.") 39 LOG.exception(msg, resource={‘type‘: ‘volume‘, ‘id‘: volume.id}) 40 raise exception.CinderException(msg) 41 42 snapshot_id = request_spec.get(‘snapshot_id‘) 43 source_volid = request_spec.get(‘source_volid‘) 44 45 if snapshot_id is not None: 46 # Make sure the snapshot is not deleted until we are done with it. 47 locked_action = "%s-%s" % (snapshot_id, ‘delete_snapshot‘) 48 elif source_volid is not None: 49 # Make sure the volume is not deleted until we are done with it. 50 locked_action = "%s-%s" % (source_volid, ‘delete_volume‘) 51 else: 52 locked_action = None 53 54 def _run_flow(): 55 # This code executes create volume flow. If something goes wrong, 56 # flow reverts all job that was done and reraises an exception. 57 # Otherwise, all data that was generated by flow becomes available 58 # in flow engine‘s storage. 59 with flow_utils.DynamicLogListener(flow_engine, logger=LOG): 60 flow_engine.run() 61 62 # NOTE(dulek): Flag to indicate if volume was rescheduled. Used to 63 # decide if allocated_capacity should be incremented. 64 rescheduled = False 65 66 try: 67 if locked_action is None: 68 _run_flow() 69 else: 70 with coordination.COORDINATOR.get_lock(locked_action): 71 _run_flow() 72 finally: 73 try: 74 flow_engine.storage.fetch(‘refreshed‘) 75 except tfe.NotFound: 76 # If there‘s no vol_ref, then flow is reverted. Lets check out 77 # if rescheduling occurred. 78 try: 79 rescheduled = flow_engine.storage.get_revert_result( 80 create_volume.OnFailureRescheduleTask.make_name( 81 [create_volume.ACTION])) 82 except tfe.NotFound: 83 pass 84 85 if rescheduled: 86 # NOTE(geguileo): Volume was rescheduled so we need to update 87 # volume stats because the volume wasn‘t created here. 88 # Volume.host is None now, so we pass the original host value. 89 self._update_allocated_capacity(volume, decrement=True, 90 host=original_host) 91 92 # Shared targets is only relevant for iSCSI connections. 93 # We default to True to be on the safe side. 94 volume.shared_targets = ( 95 self.driver.capabilities.get(‘storage_protocol‘) == ‘iSCSI‘ and 96 self.driver.capabilities.get(‘shared_targets‘, True)) 97 # TODO(geguileo): service_uuid won‘t be enough on Active/Active 98 # deployments. There can be 2 services handling volumes from the same 99 # backend. 100 volume.service_uuid = self.service_uuid 101 volume.save() 102 103 LOG.info("Created volume successfully.", resource=volume) 104 return volume.id
上述代码中,同样使用了taskflow,
ExtractVolumeRefTask为提取数据库中volume的具体信息
OnFailureRescheduleTask中execute并无操作,但是revert中有操作,是为了以后的步骤出现错误进行回滚进行部分操作。
ExtractVolumeSpecTask 提取spec信息
NotifyVolumeActionTask 广播volume开始创建的消息
1 def get_flow(context, manager, db, driver, scheduler_rpcapi, host, volume, 2 allow_reschedule, reschedule_context, request_spec, 3 filter_properties, image_volume_cache=None): 4 5 flow_name = ACTION.replace(":", "_") + "_manager" 6 volume_flow = linear_flow.Flow(flow_name) 7 8 # This injects the initial starting flow values into the workflow so that 9 # the dependency order of the tasks provides/requires can be correctly 10 # determined. 11 create_what = { 12 ‘context‘: context, 13 ‘filter_properties‘: filter_properties, 14 ‘request_spec‘: request_spec, 15 ‘volume‘: volume, 16 } 17 18 volume_flow.add(ExtractVolumeRefTask(db, host, set_error=False)) 19 20 retry = filter_properties.get(‘retry‘, None) 21 22 # Always add OnFailureRescheduleTask and we handle the change of volume‘s 23 # status when reverting the flow. Meanwhile, no need to revert process of 24 # ExtractVolumeRefTask. 25 do_reschedule = allow_reschedule and request_spec and retry 26 volume_flow.add(OnFailureRescheduleTask(reschedule_context, db, driver, 27 scheduler_rpcapi, do_reschedule)) 28 29 LOG.debug("Volume reschedule parameters: %(allow)s " 30 "retry: %(retry)s", {‘allow‘: allow_reschedule, ‘retry‘: retry}) 31 32 volume_flow.add(ExtractVolumeSpecTask(db), 33 NotifyVolumeActionTask(db, "create.start"), 34 CreateVolumeFromSpecTask(manager, 35 db, 36 driver, 37 image_volume_cache), 38 CreateVolumeOnFinishTask(db, "create.end")) 39 40 # Now load (but do not run) the flow using the provided initial data. 41 return taskflow.engines.load(volume_flow, store=create_what)
CreateVolumeFromSpecTask此处通过传入的create_type不同,调用不同的接口进行卷的创建,以裸磁盘为例(create_type为raw)
CreateVolumeOnFinishTask广播创建磁盘完成
1 if create_type == ‘raw‘: 2 model_update = self._create_raw_volume(volume, **volume_spec) 3 elif create_type == ‘snap‘: 4 model_update = self._create_from_snapshot(context, volume, 5 **volume_spec) 6 elif create_type == ‘source_vol‘: 7 model_update = self._create_from_source_volume( 8 context, volume, **volume_spec) 9 elif create_type == ‘image‘: 10 model_update = self._create_from_image(context, 11 volume, 12 **volume_spec) 13 elif create_type == ‘backup‘: 14 model_update, need_update_volume = self._create_from_backup( 15 context, volume, **volume_spec) 16 volume_spec.update({‘need_update_volume‘: need_update_volume}) 17 else: 18 raise exception.VolumeTypeNotFound(volume_type_id=create_type)
1 def _create_raw_volume(self, volume, **kwargs): 2 try: 3 ret = self.driver.create_volume(volume) 4 finally: 5 self._cleanup_cg_in_volume(volume) 6 return ret
此处self.driver为VolumeManager初始化时进行初始化的,可以看出driver是从配置文件中读取的
1 self.configuration = config.Configuration(volume_backend_opts, 2 config_group=service_name) 3 self._set_tpool_size( 4 self.configuration.backend_native_threads_pool_size) 5 self.stats = {} 6 self.service_uuid = None 7 8 if not volume_driver: 9 # Get from configuration, which will get the default 10 # if its not using the multi backend 11 volume_driver = self.configuration.volume_driver 12 if volume_driver in MAPPING: 13 LOG.warning("Driver path %s is deprecated, update your " 14 "configuration to the new path.", volume_driver) 15 volume_driver = MAPPING[volume_driver]
配置文件中有写
1 def create_volume(self, volume): 2 """Creates a logical volume.""" 3 mirror_count = 0 4 if self.configuration.lvm_mirrors: 5 mirror_count = self.configuration.lvm_mirrors 6 7 self._create_volume(volume[‘name‘], 8 self._sizestr(volume[‘size‘]), 9 self.configuration.lvm_type, 10 mirror_count)
def _create_volume(self, name, size, lvm_type, mirror_count, vg=None): vg_ref = self.vg if vg is not None: vg_ref = vg vg_ref.create_volume(name, size, lvm_type, mirror_count)
此处self.vg是什么?全局查找一下,具体初始化时间,可以查看上一篇,cinder服务启动中的cinder-volume启动部分
此时创建卷调用的底层代码就可以得知,调用的是lvcreate对卷进行创建。
1 def create_volume(self, name, size_str, lv_type=‘default‘, mirror_count=0): 2 """Creates a logical volume on the object‘s VG. 3 4 :param name: Name to use when creating Logical Volume 5 :param size_str: Size to use when creating Logical Volume 6 :param lv_type: Type of Volume (default or thin) 7 :param mirror_count: Use LVM mirroring with specified count 8 9 """ 10 11 if lv_type == ‘thin‘: 12 pool_path = ‘%s/%s‘ % (self.vg_name, self.vg_thin_pool) 13 cmd = LVM.LVM_CMD_PREFIX + [‘lvcreate‘, ‘-T‘, ‘-V‘, size_str, ‘-n‘, 14 name, pool_path] 15 else: 16 cmd = LVM.LVM_CMD_PREFIX + [‘lvcreate‘, ‘-n‘, name, self.vg_name, 17 ‘-L‘, size_str] 18 19 if mirror_count > 0: 20 cmd.extend([‘--type=mirror‘, ‘-m‘, mirror_count, ‘--nosync‘, 21 ‘--mirrorlog‘, ‘mirrored‘]) 22 terras = int(size_str[:-1]) / 1024.0 23 if terras >= 1.5: 24 rsize = int(2 ** math.ceil(math.log(terras) / math.log(2))) 25 # NOTE(vish): Next power of two for region size. See: 26 # http://red.ht/U2BPOD 27 cmd.extend([‘-R‘, str(rsize)]) 28 29 try: 30 self._execute(*cmd, 31 root_helper=self._root_helper, 32 run_as_root=True) 33 except putils.ProcessExecutionError as err: 34 LOG.exception(‘Error creating Volume‘) 35 LOG.error(‘Cmd :%s‘, err.cmd) 36 LOG.error(‘StdOut :%s‘, err.stdout) 37 LOG.error(‘StdErr :%s‘, err.stderr) 38 LOG.error(‘Current state: %s‘, 39 self.get_all_volume_groups(self._root_helper)) 40 raise
后续就是一系列回调和通知啦
openstack Rocky系列之Cinder:(二)Cinder 创建一个卷
标签:snapshot zone hang append exce you inject structs mirroring
原文地址:https://www.cnblogs.com/sumoning/p/12141622.html