首页
Web开发
Windows程序
编程语言
数据库
移动开发
系统相关
微信
其他好文
会员
首页
>
其他好文
> 详细
Celery的实践指南
时间:
2016-05-02 22:55:27
阅读:
1143
评论:
0
收藏:
0
[点我收藏+]
标签:
Celery的实践指南
celery原理:
celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统,消费者(worker)和生产者(client)都可以有任意个,他们通过消息系统(broker)来通信。
典型的场景为:
客户端启动一个进程(生产者),当用户的某些操作耗时较长或者比较频繁时,考虑接入本消息系统,发送一个task任务给broker。
后台启动一个worker进程(消费者),当发现broker中保存有某个任务到了该执行的时间,他就会拿过来,根据task类型和参数执行。
实践中的典型场景:
简单的定时任务:
替换crontab的celery写法:
from celery import Celery
from celery.schedules import crontab
app = Celery("tasks", backend="redis://localhost", broker="redis://localhost")
app.conf.update(CELERYBEAT_SCHEDULE = {
"add": {
"task": "celery_demo.add",
"schedule": crontab(minute="*"),
"args": (16, 16)
},
})
@app.task
def add(x, y):
return x + y
运行celery的worker,让他作为consumer运行,自动从broker上获得任务并执行。
`celery -A celery_demo worker`
运行celery的client,让其根据schedule,自动生产出task msg,并发布到broker上。
`celery -A celery_demo beat`
安装并运行flower,方便监控task的运行状态
`celery flower -A celery_demo`
或者设置登录密码 `
celery flower -A celery_demo --basic_auth=user1:password1,user2:password2
多同步任务-链式任务-
失败自动重试的task
失败重试方法: 将task代码函数参数增加self,同时绑定bind。
demo代码:
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A(self):
try:
print("doing stuff here...")
except SomeNetworkException as e:
print("maybe do some clenup here....")
self.retry(e)
自动重试后,是否将任务重新入queue后排队,还是等待指定的时间?可以通过self.retry()参数来指定。
派发到不同Queue队列的task
一个task自动映射到多个queue中的方法, 通过配置task和queue的routing_key命名模式。
比如:把queue的exchange和routing_key配置成通用模式:
再定义task的routing_key的名称:
可用的不同exchange策略:
direct:直接根据定义routing_key
topic:exchange会根据通配符来将一个消息推送到多个queue。
fanout:将消息拆分,分别推送到不同queue,通常用于超大任务,耗时任务。
参考:
http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
高级配置
result是否保存
失败邮件通知:
关闭rate limit:
auto_reload方法(*nix系统):
celery通过监控源代码目录的改动,自动地进行reload
使用方法:1.依赖inotify(Linux) 2. kqueue(OS X / BSD)
安装依赖:
$ pip install pyinotify
(可选) 指定fsNotify的依赖:
$ env CELERYD_FSNOTIFY=stat celery worker -l info --autoreload
启动: celery -A appname worker --autoreload
auto-scale方法:
启用auto-scale
临时增加worker进程数量(增加consumer):
$ celery -A proj control add_consumer foo -d worker1.local
临时减少worker进程数量(减少consumer):
将scheduled task的配置从app.conf变成DB的方法:
需要在启动时指定custom schedule 类名,比如默认的是: celery.beat.PersistentScheduler 。
celery -A proj beat -S djcelery.schedulers.DatabaseScheduler
启动停止worker的方法:
启动 as daemon :
http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing
root用户可以使用celeryd
非特权用户:celery multi start worker1 -A appName —autoreload --pidfile="$HOME/run/celery/%n.pid" --logfile="$HOME/log/celery/%n.log"
或者 celery worker —detach
停止
ps auxww | grep ‘celery worker‘ | awk ‘{print $2}‘ | xargs kill -9
与Flask集成的方法
集成后flask将充当producer来创建并发送task给broker,在celery启动的独立worker进程将从broker中获得task并执行,同时将结果返回。
flask中异步地获得task结果的方法:add.delay(x,y),有时需要对参数进行命名后传递 或者 add.apply_async(args=(x,y), countdown=30)
flask获得
与flask集成后的启动问题
由于celery的默认routing_key是根据生产者在代码中的import级别来设定的,所以worker端在启动时应该注意其启动目录应该在项目顶级目录上,否者会出现KeyError。
性能提升: eventlet 和 greenlet
官方参考:
http://docs.celeryproject.org/en/latest/userguide/index.html
Celery的实践指南
标签:
原文地址:http://www.cnblogs.com/ToDoToTry/p/5453149.html
踩
(
0
)
赞
(
0
)
举报
评论
一句话评论(
0
)
登录后才能评论!
分享档案
更多>
2021年07月29日 (22)
2021年07月28日 (40)
2021年07月27日 (32)
2021年07月26日 (79)
2021年07月23日 (29)
2021年07月22日 (30)
2021年07月21日 (42)
2021年07月20日 (16)
2021年07月19日 (90)
2021年07月16日 (35)
周排行
更多
分布式事务
2021-07-29
OpenStack云平台命令行登录账户
2021-07-29
getLastRowNum()与getLastCellNum()/getPhysicalNumberOfRows()与getPhysicalNumberOfCells()
2021-07-29
【K8s概念】CSI 卷克隆
2021-07-29
vue3.0使用ant-design-vue进行按需加载原来这么简单
2021-07-29
stack栈
2021-07-29
抽奖动画 - 大转盘抽奖
2021-07-29
PPT写作技巧
2021-07-29
003-核心技术-IO模型-NIO-基于NIO群聊示例
2021-07-29
Bootstrap组件2
2021-07-29
友情链接
兰亭集智
国之画
百度统计
站长统计
阿里云
chrome插件
新版天听网
关于我们
-
联系我们
-
留言反馈
© 2014
mamicode.com
版权所有 联系我们:gaon5@hotmail.com
迷上了代码!