标签:
APScheduler是一个小巧而强大的Python类库,通过它你可以实现类似Unix系统cronjob类似的定时任务系统。使用之余,阅读一下源码,一方面有助于更好的使用它,另一方面,个人认为aps的架构设计质量很高,阅读它对于提升软件开发的sense很有帮助。
APScheduler整个系统可以说由这五个概念组成:
这样的划分充分发挥了软件设计中抽象的威力,我们下面对每个模块进行描述
BaseScheduler类是所有scheduler的抽象基类,它的初始化代码是这样的:
def __init__(self, gconfig={}, **options): super(BaseScheduler, self).__init__() self._executors = {} self._executors_lock = self._create_lock() self._jobstores = {} self._jobstores_lock = self._create_lock() self._listeners = [] self._listeners_lock = self._create_lock() self._pending_jobs = [] self.configure(gconfig, **options)
可以看到一个scheduler维护了自己的executor和jobstore表,通过configure方法进行初始化。在configure中,scheduler读取传入的配置,对executors和jobstores进行初始化,一个典型的配置是这样的:
APS_SCHEDULER_CONFIG = { ‘jobstores‘: { ‘default‘: {‘type‘: ‘sqlalchemy‘, ‘url‘: ‘postgres://127.0.0.1:5432/optimus‘}, }, ‘executors‘: { ‘default‘: {‘type‘: ‘processpool‘, ‘max_workers‘: 10} }, ‘job_defaults‘: { ‘coalesce‘: True, ‘max_instances‘: 5, ‘misfire_grace_time‘: 30 }, ‘timezone‘: ‘Asia/Shanghai‘ }
如果我们把APS_SCHEDULER_CONFIG作为options传入给一个scheduler,会产生什么结果呢?首先,我们添加了一个默认(名叫default)的jobstore,它的具体实现类型是sqlalchemy,数据库连接url是指向一个本地postgresql数据库,也就是说添加到这个scheduler的job会默认使用这个jobstore进行存储。其次,我们添加了一个默认的executor,他是一个多进程实现,也就是说每个job在运行时,是通过一个进程池来作为worker实际执行的,这个进程池最大size是10。job_defaults参数定义了一些特殊行为:
这里还需要指出的一点是,为什么scheduler的配置可以写成这种json形式,而scheduler会正确地找到对应的实现类进行初始化?这里运用了两个技巧:
用python egg的机制把各个组件注册了成了entry point,如下所示
[apscheduler.executors] asyncio = apscheduler.executors.asyncio:AsyncIOExecutor debug = apscheduler.executors.debug:DebugExecutor gevent = apscheduler.executors.gevent:GeventExecutor processpool = apscheduler.executors.pool:ProcessPoolExecutor threadpool = apscheduler.executors.pool:ThreadPoolExecutor twisted = apscheduler.executors.twisted:TwistedExecutor [apscheduler.jobstores] memory = apscheduler.jobstores.memory:MemoryJobStore mongodb = apscheduler.jobstores.mongodb:MongoDBJobStore redis = apscheduler.jobstores.redis:RedisJobStore sqlalchemy = apscheduler.jobstores.sqlalchemy:SQLAlchemyJobStore [apscheduler.triggers] cron = apscheduler.triggers.cron:CronTrigger date = apscheduler.triggers.date:DateTrigger interval = apscheduler.triggers.interval:IntervalTrigger
这样,在scheduler模块中就可以用entry point的名称反查出对应组件
_trigger_plugins = dict((ep.name, ep) for ep in iter_entry_points(‘apscheduler.triggers‘)) _trigger_classes = {} _executor_plugins = dict((ep.name, ep) for ep in iter_entry_points(‘apscheduler.executors‘)) _executor_classes = {} _jobstore_plugins = dict((ep.name, ep) for ep in iter_entry_points(‘apscheduler.jobstores‘)) _jobstore_classes = {} _stopped = True
从而实现了一个便利的插件机制
另外通过一个加载函数完成"apscheduler.executors.pool:ThreadPoolExecutor"字符串到ThreadPoolExecutor类对象的查询
def ref_to_obj(ref): """ Returns the object pointed to by ``ref``. :type ref: str """ if not isinstance(ref, six.string_types): raise TypeError(‘References must be strings‘) if ‘:‘ not in ref: raise ValueError(‘Invalid reference‘) modulename, rest = ref.split(‘:‘, 1) try: obj = __import__(modulename) except ImportError: raise LookupError(‘Error resolving reference %s: could not import module‘ % ref) try: for name in modulename.split(‘.‘)[1:] + rest.split(‘.‘): obj = getattr(obj, name) return obj except Exception: raise LookupError(‘Error resolving reference %s: error looking up object‘ % ref)
scheduler的主循环(main_loop),其实就是反复检查是不是有到时需要执行的任务,完成一次检查的函数是_process_jobs, 这个函数做这么几件事:
那么在这个_process_jobs的逻辑,什么时候调用合适呢?如果不间断地调用,而实际上没有要执行的job,是一种浪费。每次掉用_process_jobs后,其实可以预先判断一下,下一次要执行的job(离现在最近的)还要多长时间,作为返回值告诉main_loop, 这时主循环就可以去睡一觉,等大约这么长时间后再唤醒,执行下一次_process_jobs。这里唤醒的机制就会有IO模型的区别了
scheduler由于IO模型的不同,可以有多种实现,如
jobstore提供给scheduler一个序列化jobs的统一抽象,提供对scheduler中job的增删改查接口,根据存储backend的不同,分以下几种
除了MemoryJobStore外,其他几种都使用pickle做序列化工具,所以这里要指出一点,如果你不是在用内存做jobstore,那么必须确保你提供给job的可执行函数必须是可以被全局访问的,也就是可以通过ref_to_obj反查出来的,否则无法序列化。
使用数据库做jobstore,就会发现,其实创建了一张有三个域的的jobs表,分别是id, next_run_time, job_state,其中job_state是job对象pickle序列化后的二进制,而id和next_run_time则是支持job的两类查询(按id和按最近运行时间)
aps把任务最终的执行机制也抽象了出来,可以根据IO模型选配,不需要讲太多,最常用的是threadpool和processpoll两种(来自concurrent.futures的线程/进程池)。
不同类型的executor实现自己的_do_submit_job,完成一次实际的任务实例执行。以线程/进程池实现为例
def _do_submit_job(self, job, run_times): def callback(f): exc, tb = (f.exception_info() if hasattr(f, ‘exception_info‘) else (f.exception(), getattr(f.exception(), ‘__traceback__‘, None))) if exc: self._run_job_error(job.id, exc, tb) else: self._run_job_success(job.id, f.result()) f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) f.add_done_callback(callback)
trigger是抽象出了“一个job是何时被触发”这个策略,每种trigger实现自己的get_next_fire_time函数
@abstractmethod def get_next_fire_time(self, previous_fire_time, now): """ Returns the next datetime to fire on, If no such datetime can be calculated, returns ``None``. :param datetime.datetime previous_fire_time: the previous time the trigger was fired :param datetime.datetime now: current datetime """
aps提供的trigger包括:
简要介绍了apscheduler类库的组成,强调抽象概念的理解
标签:
原文地址:http://www.cnblogs.com/quijote/p/4385774.html