标签:win play bit required 管理 密码 image 场景 store
1、什么是celery
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
专注于实时处理的异步任务队列
同时也支持任务调度
celery架构:
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
版本支持情况
Celery version 4.0 runs on Python ?2.7, 3.4, 3.5? PyPy ?5.4, 5.5? This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.? If you’re running an older version of Python, you need to be running an older version of Celery:?
Python 2.6: Celery series 3.1 or earlier. Python 2.5: Celery series 3.0 or earlier. Python 2.4 was Celery series 2.2 or earlier.? Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform. 对windows系统的支持不太友好;
2、使用场景
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
3、celery的安装配置
pip3 install celery
消息中间件:RabbitMQ/Redis
app=Celery(‘任务名‘,backend=‘xxx‘,broker=‘xxx‘)
4、celery的基本使用
创建celery_task_s1.py
from celery import Celery import time # redis不加密码 broker=‘redis://127.0.0.1:6379/0‘ backend=‘redis://127.0.0.1:6379/1‘ # redis加密码 # broker=‘redis://:123456@127.0.0.1:6379/0‘ # backend=‘redis://:123456@127.0.0.1:6379/1‘ # 一定要指定一个名字,这里的名字是test app=Celery(‘test‘,backend=backend,broker=broker) # 任务其实就是一个函数 # 需要用一个装饰器,表示该任务是被celery管理的,并可以被其执行 @app.task # @app名字.task def add(x,y): time.sleep(2) return x+y
创建用于提交任务的py文件add_task.py:
# 用于提交任务的py文件 import celery_task_s1 # 正常情况下的同步执行任务 # ret = celery_task_s1.add(3, 4) # print(ret) # 提交任务到消息队列中 # 只是把任务提交到消息队列中(redis),并没有执行 ret = celery_task_s1.add.delay(3, 4) print(ret) # 0c5b8d12-dbf6-46ab-a425-875a98d2b5a5 任务号id
任务提交完成后,需要启动worker,可以用命令启动:celery worker -A celery_task_s1 -l info
但是在windows中需要用的命令是:celery worker -A celery_task_s1 -l info -P eventlet
eventlet是一个模块,需要安装: pip3 install eventlet
#参数解释 -A celery_task_s1 #要启动任务的文件名 -l info #日志级别,可以指定
启动:
可以在pycharm的命令行中启动,也可以在cmd中启动;
任务已经执行了,并且结果已经保存进了redis
可以创建py文件:celery_result.py,运行这个py文件,可以查看任务执行结果(这里的结果是7):
from celery.result import AsyncResult from celery_task_s1 import app async = AsyncResult(id="0c5b8d12-dbf6-46ab-a425-875a98d2b5a5", app=app) if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除 elif async.failed(): print(‘执行失败‘) elif async.status == ‘PENDING‘: print(‘任务等待中被执行‘) elif async.status == ‘RETRY‘: print(‘任务异常后正在重试‘) elif async.status == ‘STARTED‘: print(‘任务已经开始被执行‘)
5、celery多任务结构
结构:
pro_cel ├── celery_task # celery相关文件夹,名字可以变 │ ├── celery.py # celery连接和配置相关文件,必须叫这个名字 │ └── user_task.py #所有任务函数 │ └── order_tasks.py #所有任务函数 ├── celery_result.py # 检查结果 └── add_task.py # 提交任务
案例:
celery.py
from celery import Celery backend=‘redis://127.0.0.1:6379/0‘ broker=‘redis://127.0.0.1:6379/1‘ app=Celery(‘test‘,broker=broker, backend=backend, # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类 include=[‘celery_task.order_task‘, ‘celery_task.user_task‘ ]) # 时区 # app.conf.timezone = ‘Asia/Shanghai‘ # 是否使用UTC # app.conf.enable_utc = False
user_task.py
import time from celery_task.celery import app @app.task def user_add(x, y): time.sleep(1) return x + y
order_task.py
import time from celery_task.celery import app @app.task def order_add(x, y): time.sleep(1) return x + y
add_task.py
from celery_task.order_task import order_add from celery_task.user_task import user_add order_add.delay(5, 6) user_add.delay(10, 6)
celery_result.py
from celery.result import AsyncResult from celery_task.celery import app # id需要变换 async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=app) if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除 elif async.failed(): print(‘执行失败‘) elif async.status == ‘PENDING‘: print(‘任务等待中被执行‘) elif async.status == ‘RETRY‘: print(‘任务异常后正在重试‘) elif async.status == ‘STARTED‘: print(‘任务已经开始被执行‘)
启动:
可以先启动work,当没有任务时,它就会等着,有任务时就会执行;
在多任务结构中启动work时,直接指定包名即可:
celery worker -A celery_task -l info -P eventlet
启动完work,可以再去启动broker(add_task.py)
此时work发现有任务提交时,直接就会执行,并得到结果;
标签:win play bit required 管理 密码 image 场景 store
原文地址:https://www.cnblogs.com/weiyiming007/p/12543556.html