标签:根据 stdout alt project imm err %s gif guide
tasks.py
from __future__ import absolute_import, unicode_literals from .celery import app from celery.task import Task class MyTask(Task): def on_success(self, retval, task_id, args, kwargs): print(‘task done==================>: {0}‘.format(retval)) return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): print(‘task fail, reason=================>: {0}‘.format(exc)) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) @app.task(base=MyTask) def add(x, y): return x + y
意思是绑定任务为实例方法,执行中的任务能获取到了自己执行任务的各种信息,可以根据这些信息做很多其他操作,例如判断链式任务是否到结尾等等。
示例:
# tasks.py from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task(bind=True) def add(self, x, y): logger.info(self.request.__dict__) return x + y
------------------目录结构-------------------
--start.py
--proj
--celery.py
--tasks.py
--------------------------------------------------
from __future__ import absolute_import, unicode_literals #导入安装的celery(所以上面要写绝对导入),而不是自己导自己(from .celery) from celery import Celery app = Celery(‘proj‘, broker=‘redis://‘, backend=‘redis://‘, include=[‘proj.tasks‘,]) # 这个celery管理了哪些task文件可以有多个,其中有个定时任务periodic_task # Optional configuration, see the application user guide. # update方法更新配置,也可以直接写在上面初始化Celery里面 app.conf.update( result_expires=3600, # 任务结果一小时内没人取就丢弃 ) if __name__ == ‘__main__‘: app.start()
from __future__ import absolute_import, unicode_literals from .celery import app #from celery import Celery import time @app.task(bind=True) def test_mes(self): for i in range(1, 11): time.sleep(1) self.update_state(state="PROGRESS", meta={‘p‘: i*10}) return ‘finish‘
from proj.tasks import test_mes import sys def pm(body): res = body.get(‘result‘) if body.get(‘status‘) == ‘PROGRESS‘: sys.stdout.write(‘\r任务进度: {0}%‘.format(res.get(‘p‘))) sys.stdout.flush() else: print(‘\r‘) print(res) r = test_mes.delay() r.get(on_message=pm, propagate=False) #‘FINISH‘
tasks.py
from .celery import app import time @app.task(bind=True) def excel_info_db(self): time.sleep(1) self.update_state(state="PROGRESS", meta={‘step‘:‘加载excel到内存‘, ‘progress‘:‘100%‘}) time.sleep(1) self.update_state(state="PROGRESS", meta={‘step‘: ‘在内存中校验excel‘, ‘progress‘: ‘100%‘}) time.sleep(1) self.update_state(state="PROGRESS", meta={‘step‘: ‘入库‘, ‘progress‘: ‘100%‘}) return ‘finish‘
有些任务可能需由几个子任务组成,此时调用各个子任务的方式就变的很重要,尽量不要以同步阻塞的方式调用子任务,而是用异步回调的方式进行链式任务的调用:
from .celery import app import time def update_page_info(url): # fetch_page -> parse_page -> store_page chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url) chain() ‘‘‘ 也可以这样写: fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)]) ‘‘‘ @app.task() def fetch_page(url): time.sleep(3) return "抓取到了%s的内容" % url @app.task() def parse_page(page): # page是fentch_page的返回值 time.sleep(3) return "对%s做解析,得到数据" % page @app.task(ignore_result=True) def store_page_info(info, url): # info是parse_page的返回值,url是调用时传的参 time.sleep(3) print(info) print("%s的数据成功入库" % url) return(‘入库成功‘)
调用:
>>> from proj.tasks import update_page_info
>>> t = update_page_info(‘www.google.com‘)
链式任务中前一个任务的返回值默认是下一个任务的输入值之一 ( 不想让返回值做默认参数可以用 si() 或者 s(immutable=True) 的方式调用 )。
这里的 s()
是方法 celery.signature()
的快捷调用方式,signature 具体作用就是生成一个包含调用任务及其调用参数与其他信息的对象,个人感觉有点类似偏函数的概念:先不执行任务,而是把任务与任务参数存起来以供其他地方调用。
调用任务方式二:fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])
前面讲了调用任务不能直接使用普通的调用方式,而是要用类似 add.delay(2, 2)
的方式调用,而链式任务中又用到了 apply_async
方法进行调用,实际上 delay
只是 apply_async
的快捷方式,二者作用相同,只是 apply_async
可以进行更多的任务属性设置,比如 callbacks/errbacks 正常回调与错误回调、执行超时、重试、重试时间等等,具体参数可以参考:https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async
标签:根据 stdout alt project imm err %s gif guide
原文地址:https://www.cnblogs.com/staff/p/13138581.html