码迷,mamicode.com
首页 > 其他好文 > 详细

AB逻辑

时间:2020-06-23 21:05:00      阅读:51      评论:0      收藏:0      [点我收藏+]

标签:read   bug   sync   sel   处理   条件   发送数据   import   data   

# AB机新逻辑

"""
1. 查看是否存在有效的远程灾备计划
2. 先给计划创建一个空的任务。
3. 到A端中填充任务,构造成真正可以执行的任务。(在这个过程需要到A端释放以前的以前锁定的资源,锁定现在的资源)(原子性)
这个过程容易失败(失败后我们怎么处理较好呢??)
   1. 需要释放上一次任务的资源停止之前的任务并等待结束
   1. 重试到一定次数。
   2. 重试完成后继续失败该怎么办??无限重试?(要不无限重试吧,这样也就不用释放资源了,但是之前到A端锁定的资源该怎么办呢????对了,
   可以直接用特定的标志去释放,成功过去了就会释放,也就不用每一次失败都需要去释放了,并且不能保证一定能释放成功)这儿就这么处理
4. 执行远程容灾任务,该怎么去执行呢??首先需要明确远程容灾任务是要做什么吧?
   1. 远程容灾任务要做什么?:
      1. 是不是我在前面每一个任务对应的磁盘级别的任务的参数都准备好,这个时候之前去执行就好了

"""
import threading
import time
import json
import copy
import uuid
from apiv1 import models
from box_dashboard import xlogging
from django.db import transaction

_logger = xlogging.getLogger(__name__)


class RemoteScheduleThreading(threading.Thread):
    def __init__(self):
        super(RemoteScheduleThreading, self).__init__(name=‘RemoteScheduleThreading‘)
        self.is_running = list()

    def run(self):
        while True:
            try:
                self.work()
            except Exception as e:
                _logger.debug(‘RemoteScheduleThreading Exception, because of {}‘.format(e))

    def work(self):
        for remote_schedule in models.RemoteBackupSchedule.objects.filter(enabled=True, deleted=False):
            if not self.check_remote_schedule_can_exc(remote_schedule):
                continue
            self.create_remote_task_empty(remote_schedule)

    def check_remote_schedule_can_exc(self, remote_schedule):
        """
        检测远程灾备计划此时是否可以执行
        :param remote_schedule:
        :return:

        1. 需要判断计划是否被禁用或者删除
        2. 判断计划类型,间隔类型查看是否到达执行时间,持续类型无条件继续执行,也就是不需要休眠
        3. 需要判断是否已经存在执行中的任务(通过计划去找任务)
        """
        # type: models.RemoteBackupSchedule, models.RemoteBackupTask
        if not remote_schedule.enabled or remote_schedule.deleted:
            return False
        now = time.time()
        if remote_schedule.get_remote_schedule_type == ‘interval‘:
            if now < remote_schedule.next_run_date:
                return False
        if self.is_have_running_task(remote_schedule):
            return False
        return True

    def create_remote_task_empty(self, remote_schedule):
        """
        创建远程容灾空任务,这个过程只在B端这边组装数据
        :param remote_schedule:
        :return:

        创建远程空任务我们需要什么??
        1. 需要上一次任务的信息。因为我们需要根据上一次任务的信息找到本次需要同步快照,那么找到本次需要同步的快照,需要什么去找呢??
           1. 需要上一次同步的主机快照,
           2. 需要上一次同步的时间点,
           3. 需要同步的主机
           4. 需要上一次任务的uuid(需要释放资源)
        """
        # type: models.RemoteBackupTask, models.RemoteBackupSchedule
        prev_info = self.fetch_prev_sync_info(remote_schedule)
        remote_host_ident = remote_schedule.host.ident
        remote_task = models.RemoteBackupTask.objects.create(schedule=remote_schedule)
        current_info = self.generate_current_task_info(remote_task)
        exc_config = {‘prev_info‘: prev_info, ‘remote_host_ident‘: remote_host_ident, ‘current_info‘: current_info}
        remote_task.ext_config = json.dumps(exc_config)
        remote_task.save(update_fields=[‘ext_config‘, ])

    @staticmethod
    def generate_current_task_info(remote_task):
        """
        生成本次任务的一些信息(task_uuid)
        :param remote_task:
        :return:
        """
        task_uuid = uuid.uuid4().hex
        lock_type = ‘remote_schedule_{}‘.format(remote_task.schedule.id)
        return {‘task_uuid‘: task_uuid, ‘remote_task_id‘: remote_task.id, ‘lock_type‘: lock_type}

    @staticmethod
    def fetch_prev_sync_info(remote_schedule):
        """
        获取上一次同步的信息
        :param remote_schedule:
        :return:
        """
        ext_config_dict = json.loads(remote_schedule.ext_config)
        prev_info = ext_config_dict.get(‘current_info‘, dict())
        return prev_info

    @staticmethod
    def is_have_running_task(remote_schedule):
        """
        检测计划是否存在执行中的任务
        :param remote_schedule:
        :return:
        """
        remote_task = remote_schedule.remote_backup_tasks.filter(finish_datetime__isnull=True)
        if remote_task:
            return True
        return False


class RemoteBackupTaskMonitorThreading(threading.Thread):
    def __init__(self):
        super(RemoteBackupTaskMonitorThreading, self).__init__(name=‘RemoteBackupTaskMonitorThreading‘)
        self.running_list = list()

    def run(self):
        while True:
            try:
                self.task_work()
            except Exception as e:
                _logger.debug(‘RemoteBackupTaskMonitorThreading Exception, because of {}‘.format(e))

    def task_work(self):
        for remote_backup_task in models.RemoteBackupTask.objects.filter(finish_datetime__isnull=True):
            self.clear_finish_task()
            if self.is_running(remote_backup_task):
                continue
            remote_task_thread = RemoteBackupTaskThreading(remote_backup_task)
            remote_task_thread.setDaemon(True)
            remote_task_thread.start()
            self.running_list.append(remote_task_thread)

    def is_running(self, remote_backup_task):
        for running_task_thread in self.running_list:
            if running_task_thread.remote_backup_task == remote_backup_task:
                if running_task_thread.is_alive():
                    return True
        return False

    def clear_finish_task(self):
        _running_task = copy.copy(self.running_list)
        for running_task_thread in _running_task:
            if running_task_thread.is_alive():
                continue
            self.running_list.remove(running_task_thread)


class RemoteBackupTaskThreading(threading.Thread):
    def __init__(self, remote_backup_task):
        super(RemoteBackupTaskThreading, self).__init__(name=‘RemoteBackupTaskThreading‘)
        self.remote_backup_task = remote_backup_task
        self.running_sub_task_thread = list()

    def run(self):
        with transaction.atomic():
            self.stop_all_sub_task_and_wait(self.remote_backup_task)
            update_type, sync_info = self.query_need_sync_info_from_A()  # 到A端去找需要同步的信息
            self.check_prev_and_release_other_to_A()
            self.update_local_database_info(update_type, sync_info)
        # 此时子任务已经创建完毕,需要放到线程中去执行
        self.exc_remote_sub_task()
        self.finish_current_task()

    def exc_remote_sub_task(self):
        """
        通过remote_task找到对应的子任务,然后拿到队列中去执行?
        :return:
        """
        # type: models.RemoteBackupTask
        for remote_sub_task in self.remote_backup_task.remote_backup_sub_tasks.all():
            remote_sub_task_thread = RemoteSubTaskThreading(remote_sub_task)
            remote_sub_task_thread.setDaemon(True)
            remote_sub_task_thread.start()
            self.running_sub_task_thread.append(remote_sub_task_thread)

        for remote_sub_task_thread in self.running_sub_task_thread:
            remote_sub_task_thread.join()

    def finish_current_task(self):
        """
        结束本次任务,更新任务,计划等信息
        :return:
        """

    def check_prev_and_release_other_to_A(self):
        """
        到A端去释放资源和检测上一次的资源是否被意外释放
        :return:
        1. 需要上一次任务的信息和这次任务的信息
        """
        ext_config_dict = json.loads(self.remote_backup_task.ext_config)
        prev_info = ext_config_dict.get(‘prev_info‘, dict())
        current_info = ext_config_dict[‘current_info‘]
        return {‘prev_info‘: prev_info, current_info: current_info}

    def update_local_database_info(self, update_type, sync_info):
        """
        通过A端查到的同步信息来更新本地的信息
        :param update_type:
        :param sync_info:
        :return:
        """
        self.update_local_hostsnapshot(update_type, sync_info)
        self.update_local_disksnapshot(update_type, sync_info)
        self.update_local_snapshot_storage(update_type, sync_info)
        self.update_current_info_to_schedule_and_task(sync_info)
        self.create_remote_sub_task(sync_info)

    def update_local_hostsnapshot(self, update_type, sync_info):
        """
        更新本地的主机快照级别的信息,可能是新建,可能是更新
        :param update_type:
        :param sync_info:
        :return:
        """

    def update_local_disksnapshot(self, update_type, sync_info):
        """
         更新本地的磁盘快照级别的信息,可能是新建,可能是更新
        :param update_type:
        :param sync_info:
        :return:
        """

    def update_local_snapshot_storage(self, update_type, sync_info):
        """
        更新本地的快照存储级别的信息,可能是新建,可能是更新
        :param update_type:
        :param sync_info:
        :return:
        """

    def update_current_info_to_schedule_and_task(self, sync_info):
        """
        需要更新当前任务的一些信息
        :param sync_info:
        :return:
        如 hostsnapshot,timestmap
        """

    def create_remote_sub_task(self, sync_info):
        """
        创建远程灾备子任务(一个磁盘一个任务)
        :param sync_info:
        :return:
        1. 子任务执行需要(上一次任务的快照点,上一次同步到的timestamp,本次的快照点,本次的timestamp,本次任务对应的task_uuid)
        因此我们从remote_task拿到上一次的信息prev_info和current_info, r
        还需要生成一个task_uuid保存到子任务数据库记录中
        """

    def stop_all_sub_task_and_wait(self, remote_task):  #
        """
        停止计划上一次任务对应的所有子任务,并等待结束,但是这个过程需要到A端去停止,是不是就有机会失败?
        :param remote_task:
        :return:
        去A端停止子任务,我需要什么?其实这个时候只需要每一个子任务的task_uuid。
        通过当前的任务找到上一次的RemotebackupTask, 然后找到其对应的所有子任务,最后将子任务的所有的task_uuid从数据库读取出来到A端停止并等待任务结束

        """
        # type: models.RemoteBackupSubTask

    def query_need_sync_info_from_A(self):
        """
        到A端去找需要同步的信息
        :return:
        1. 需要知道上一次任务的信息(上一次的主机快照和时间点)
        2. 需要知道远程主机
        3. 需要知道本次同步的类型(间隔还是持续)
        """
        # type: models.RemoteBackupTask
        ext_config_dict = json.loads(self.remote_backup_task.ext_config)
        prev_info = ext_config_dict.get(‘prev_info‘, dict())
        current_task_uuid = ext_config_dict[‘current_info‘][‘task_uuid‘]
        current_lock_type = ext_config_dict[‘current_info‘][‘lock_type‘]
        remote_host_ident = ext_config_dict[‘remote_host_ident‘]
        sync_type = self.get_sync_type()
        return {‘sync_type‘: sync_type, ‘prev_info‘: prev_info, ‘remote_host_ident‘: remote_host_ident,
                ‘task_uuid‘: current_task_uuid, ‘lock_type‘: current_lock_type}

    def get_sync_type(self):
        """
        根据任务找到计划,就可以知道同步的类型
        :return:
        """
        remote_schedule = self.remote_backup_task.schedule
        return remote_schedule.get_remote_schedule_type


class RemoteSubTaskThreading(threading.Thread):
    def __init__(self, remote_sub_task):
        super(RemoteSubTaskThreading, self).__init__(name=‘RemoteSubTaskThreading‘)
        self.remote_sub_task = remote_sub_task

    def run(self):
        self.check_schedule_valid()
        self.start_remote_logic()
        self.start_local_rs_module()
        self.check_remote_logic()
        self.stop_and_wait_task()
        self.finish_sub_task()

    def check_schedule_valid(self):
        """
        检测该任务对应的计划是否还有效
        :return:
        """

    def start_remote_logic(self):
        """
        开始远端的发送数据的任务, 把之前的和现在的任务的主机快照点和时刻,task_uuid传输到A端开始执行任务
        :return:
        """

    def start_local_rs_module(self):
        """
        开始本地的接收数据的任务,
        :return:
        """

    def check_remote_logic(self):
        """
        检测远端任务的状态,查看传输任务是否结束 task_uuid
        :return:
        """
        task = {‘status‘: False}
        while not task[‘status‘]:
            time.sleep(5)

    def stop_and_wait_task(self):
        """
        停止A端的传输任务并等待停止成功,不成功就一直循环,直到停止成功,通过task_uuid
        :return:
        """
        while True:
            try:
                stop_and_wait_task(task_uuid)  # 到A端停止任务
                return
            except Exception as e:
                time.sleep(5)
                _logger.info(‘stop_and_wait_task exception, because of {}‘.format(e))
                continue

    def finish_sub_task(self):
        """
        结束子任务,更新子任务数据库信息
        :return:
        """


# A端

def query_need_sync_info(host_ident, prev_info, sync_type, lock_uuid, lock_type):
    """
    找到本次需要同步的快照
    :param host_ident:
    :param prev_info:
    :param sync_type:
    :param lock_uuid
    :return: 需要返回快照点之间的关系?和本次同步的信息,而至于是否为cdp类型可以在同步信息中得到
    """
    prev_hostsnapshot = prev_info.get(‘hostsnapshot‘)
    host_obj = models.Host.objects.filter(ident=host_ident)
    if not host_obj:
        return ‘update‘, {}
    if prev_hostsnapshot is None:
        # 代表是第一次来,
        # 不管什么同步类型,都直接通过主机找到最后的一个主机快照点?需要区分A端主机快照的类型吗?
        pass
    else:
        if sync_type == ‘interval‘:
            # 间隔备份,
            # 1. 通过主机找到最新的快照点
            # 2. 和前一个任务所同步的信息作比较,(hostsnapshot 和 timestamp)一样,就返回空的同步信息回去如果比以前同步的新,
            #    就需要使用lock_uuid锁住当前的主机快照点
            pass
        else:
            # 持续备份
            # 获取上一次同步到的快照点,通过上一次同步的快照点,找到离他最近的一个快照点,然后分别打开两个快照点,
            # 1. 对于qcow类型算出他们的位图?然后同步,return create, 同步信息
            # 2. 对于cdp类型需要找到两个快照点的关系
            #    1. 追加 (在同一个链上,也在同一个主机快照上) return update ,同步信息
            #    2. 增量 (在同一个链上,但不在同一个主机快照上) return create ,同步信息
            #    3. 无关 (不在同一个链上) return create ,同步信息
            pass
        pass


def check_prev_and_release_other(prev_info, current_info):
    """
    检测上一次任务和释放资源
    :param prev_info:
    :param current_info:
    :return:
    1. 通过lock_type找到所有的主机快照锁
    2. 通过prev的task_uuid是否从主机快照锁找出数据,如果可以找到,就说明还被锁着的,没有找到呢?没有上一次任务不管
    3. 释放除current_info和prev_info中通过task_uuid生成的锁
    """
    lock_type = current_info[‘lock_type‘]
    _ = prev_info
    _ = lock_type

 

AB逻辑

标签:read   bug   sync   sel   处理   条件   发送数据   import   data   

原文地址:https://www.cnblogs.com/bao9687426/p/13184277.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!