码迷,mamicode.com
首页 > 编程语言 > 详细

python定时任务模块APScheduler

时间:2019-08-04 10:42:53      阅读:136      评论:0      收藏:0      [点我收藏+]

标签:listener   level   err   config   exe   str   函数   ted   now()   

一、简单任务

定义一个函数,然后定义一个scheduler类型,添加一个job,然后执行,就可以了

5秒整倍数,就执行这个函数

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test():
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), 你好‘)


scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, trigger=cron, second=*/5)
scheduler.start()

带参数的

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(你好,), trigger=cron, second=*/5)
scheduler.start()
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(定时任务,), trigger=cron, second=*/5)
scheduler.add_job(func=aps_test, args=(一次性任务,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=(循环任务,), trigger=interval, seconds=3)

scheduler.start()

二、日志

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,
                    datefmt=%Y-%m-%d %H:%M:%S,
                    filename=log1.txt,
                    filemode=a)


def aps_test(x):
    print 1/0
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(定时任务,), trigger=cron, second=*/5)
scheduler._logger = logging
scheduler.start()

三、删除任务

要求执行一定阶段任务以后,删除某一个循环任务,其他任务照常进行。有如下代码:

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,
                    datefmt=%Y-%m-%d %H:%M:%S,
                    filename=log1.txt,
                    filemode=a)


def aps_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)


def aps_date(x):
    scheduler.remove_job(interval_task)
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)
    

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(定时任务,), trigger=cron, second=*/5, id=cron_task)
scheduler.add_job(func=aps_date, args=(一次性任务,删除循环任务,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id=date_task)
scheduler.add_job(func=aps_test, args=(循环任务,), trigger=interval, seconds=3, id=interval_task)
scheduler._logger = logging

scheduler.start()

四、停止任务,恢复任务

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,
                    datefmt=%Y-%m-%d %H:%M:%S,
                    filename=log1.txt,
                    filemode=a)


def aps_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)


def aps_pause(x):
    scheduler.pause_job(interval_task)
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)


def aps_resume(x):
    scheduler.resume_job(interval_task)
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=(定时任务,), trigger=cron, second=*/5, id=cron_task)
scheduler.add_job(func=aps_pause, args=(一次性任务,停止循环任务,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id=pause_task)
scheduler.add_job(func=aps_resume, args=(一次性任务,恢复循环任务,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=24), id=resume_task)
scheduler.add_job(func=aps_test, args=(循环任务,), trigger=interval, seconds=3, id=interval_task)
scheduler._logger = logging

scheduler.start()

五、捕获错误

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,
                    datefmt=%Y-%m-%d %H:%M:%S,
                    filename=log1.txt,
                    filemode=a)


def aps_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)


def date_test(x):
    print (datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)
    print (1/0)


def my_listener(event):
    if event.exception:
        print (任务出错了!!!!!!‘)
    else:
        print (任务照常运行...‘)

scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=(一定性任务,会出错,), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id=date_task)
scheduler.add_job(func=aps_test, args=(循环任务,), trigger=interval, seconds=3, id=interval_task)
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger = logging

scheduler.start()

 

python定时任务模块APScheduler

标签:listener   level   err   config   exe   str   函数   ted   now()   

原文地址:https://www.cnblogs.com/angelyan/p/11297252.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!