标签:%s 组合 等于 from 执行器 wait 参数 api文档 exe
APScheduler全程为Advanced Python Scheduler,是一款轻量级的Python任务调度框架。它允许你像Cron那样安排定期执行的任务,并且支持Python函数或任意可调用的对象。
调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。
对于不同的不场景,可以选择的调度器:
# BackgroundScheduler: 调度器在后台线程中运行,不会阻塞当前线程。
from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job1():
print "%s: 执行任务" % time.asctime()
scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
while True:
pass
作业存储(job store)主要用来存储被调度的作业,默认的作业存储是简单地把作业保存在内存中。
jobstore提供对scheduler中job的增删改查接口,根据存储方式的不同,分以下几种:
执行器(executor)主要处理任务的运行,主要是把定时任务中的可调用对象(function)提交给一个一个线程或者进程来进行。当任务完成时,执行器将会通知调度器。
最常用的 执行器(executor) 有两种:
当调度一个任务时,需要为它设置一个触发器(triggers)。触发器决定在什么日期/时间、用什么样的形式来执行执行这个定时任务。
APScheduler 有三种内建的 trigger:
参数如下:
<!--# 2016-12-12运行一次job_function-->
sched.add_job(job_function, 'date', run_date=date(2016, 12, 12), args=['text'])
<!--# 2016-12-12 12:00:00运行一次job_function-->
<!--args=[]中是传给job_function的参数-->
sched.add_job(job_function, 'date', run_date=datetime(2016, 12, 12, 12, 0, 0), args=['text'])
weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None
<!--每隔2小时执行一次-->
scheduler.add_job(my_job, 'interval', hours=2)
<!--设置时间范围,在设置的时间范围内每隔2小时执行一次-->
scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00', end_date='2018-06-15 21:30:00)
<!--使用装饰器的方式添加定时任务-->
@scheduler.scheduled_job('interval', id='my_job_id', hours=2)
def my_job():
print("Hello World")
(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)
除了week和 day_of_week,它们的默认值是 *
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0
工作将在每个月的第一天以每小时20分钟的时间执行
表达式 | 参数类型 | 描述 |
---|---|---|
* | 所有 | 通配符。例: minutes=* 即每分钟触发 |
*/a | 所有 | 可被a整除的通配符。 |
a-b | 所有 | 范围a-b触发 |
a-b/c | 所有 | 范围a-b,且可被c整除时触发 |
xth y | 日 | 第几个星期几触发。x为第几个,y为星期几 |
last x | 日 | 一个月中,最后个星期几触发 |
last | 日 | 一个月最后一天触发 |
x,y,z | 所有 | 组合表达式,可以组合确定值或上方的表达式 |
sched.add_job(my_job, 'cron', hour=3, minute=30)
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-30')
@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
print("I am printed at 00:00:00 on the last Sunday of every month!")
pip install apscheduler
python setup.py install
# first.py
from apscheduler.schedulers.blocking import BlockingScheduler
import time
# 实例化一个调度器
scheduler = BlockingScheduler()
def job1():
print "%s: 执行任务" % time.asctime()
# 添加任务并设置触发方式为3s一次
scheduler.add_job(job1, 'interval', seconds=3)
# 开始运行调度器
scheduler.start()
> python first.py
Fri Sep 8 20:41:55 2017: 执行任务
Fri Sep 8 20:41:58 2017: 执行任务
...
方法一:调用add_job()方法
调用add_job()方法返回一个apscheduler.job.Job 的实例,可以用来改变或者移除 job
job = scheduler.add_job(myfunc, 'interval', minutes=2)
方法二:使用装饰器scheduled_job()
适用于应用运行期间不会改变的 job
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
# 装饰器
@sched.scheduled_job('interval', id='my_job_id', seconds=5)
def job_function():
print("Hello World")
# 开始
sched.start()
<!--方法一:通过作业ID或别名调用remove_job()删除作业-->
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
<!-- 方法二:通过add_job()返回的job实例调用remove()方法删除作业-->
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
可以通过Job实例或调度程序本身轻松暂停和恢复作业。 当作业暂停时,下一个运行时间将被清除,直到作业恢复,不会再计算运行时间。 要暂停作业,请使用以下任一方法:
<!--根据任务实例-->
job = scheduler.add_job(myfunc, 'interval', minutes=2)
<!--暂停-->
job.pause()
<!--继续-->
job.resume()
<!--根据任务id暂停-->
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')
<!--修饰:-->
job.modify(max_instances=6, name='Alternate name')
<!--重设:-->
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
- 使用get_jobs()来获取所有的job实例。
- 使用print_jobs()来输出所有格式化的作业列表。
- 使用get_job(任务ID)获取指定任务的作业列表。
<!--获取所有的job实例-->
apscheduler.get_jobs()
scheduler.start() #开启
scheduler.shotdown(wait=True|False) #关闭 False 无论任务是否执行,强制关闭
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from app.untils.log_builder import sys_logging
scheduler = BlockingScheduler() # 后台运行
# 设置为每日凌晨00:30:30时执行一次调度程序
@scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30')
def rebate():
print "schedule execute"
sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
try:
scheduler.start()
sys_logging.debug("statistic scheduler start success")
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
sys_logging.debug("statistic scheduler start-up fail")
# 每天晚上0点 - 早上8点期间,每5秒执行一次任务。
scheduler.add_job(my_job, 'cron',day_of_week='*',hour = '0-8',second = '*/5')
<!--# 可以被10整除的时间点执行任务,这个任务就表示在0、10、20、30、40、50分时都会执行任务-->
scheduler.add_job(my_job, 'cron',day_of_week='*',minute = '*/10')
<!--直到2020-05-30 00:00:00,每周星期从星期一到星期五的早上5:30都执行一次定时任务-->
sched.add_job(my_job(),'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='2020-05-30')
<!--# 截止到2016-12-30 00:00:00,每周一到周五早上五点半运行job_function-->
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')
<!--job_function将会在6,7,8,11,12月的第3个周五的1,2,3点运行-->
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
<!--#表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5-->
sched.add_job(my_job,?'cron',second?=?'*/5')
标签:%s 组合 等于 from 执行器 wait 参数 api文档 exe
原文地址:https://www.cnblogs.com/jasontang369/p/12178774.html