标签:
这里有几个概念,task、worker、broker。
顾名思义,task 就是老板交给你的各种任务,worker 就是你手下干活的人员。
那什么是 Broker 呢?
老板给你下发任务时,你需要 把它记下来, 这个它 可以是你随身携带的本子,也可以是 电脑里地记事本或者excel,或者是你的 任何时间管理工具。
Broker 则是 Celery 记录task的地方。
作为一个任务管理者的你,将老板(前端程序)发给你的 安排的工作(Task) 记录到你的本子(Broker)里。接下来,你就安排你手下的IT程序猿们(Worker),都到你的本子(Broker)里来取走工作(Task)
#tasks.py
from celery import Celery app = Celery(‘tasks‘, broker=‘amqp://admin:admin@localhost:5672‘) @app.task def add(x, y): return x + y
启动
celery -A tasks worker --loglevel=info
运行
>>> from tasks import add >>> add(1, 3) 4 >>> add.delay(1,3) <AsyncResult: 07614cef-f314-4c7b-a33f-92c080cadb83> >>>
注:delay是使用异步的方式,会压入到消息队列。否则,不会使用消息队列。
文件名为tasks.py,则其中代码app = Celery(‘tasks‘, broker=),Celery第一个参数为文件名,启动时也是celery -A tasks worker --loglevel=info
例1:
#test.py
from celery import Celery import time app = Celery(‘test‘, backend=‘amqp‘, broker=‘amqp://admin:admin@localhost:5672‘) @app.task def add(x, y): print "------>" time.sleep(5) print "<--------------" return x + y if __name__ == "__main__": app.start()
启动
python test.py worker
celery默认启动的worker数为内核个数,如果指定启动个数,用参数-c,例
python test.py worker -c 2
例2:
#test.py
from celery import Celery import time app = Celery(‘test‘, backend=‘amqp‘, broker=‘amqp://admin:admin@localhost:5672‘) @app.task def add(x, y): print "------>" time.sleep(2) print "<--------------" return x + y if __name__ == "__main__": app.start()
#eg.py
from test import * import time rev = [] for i in range(3): rev.append(add.delay(1,3)) print "len rev:", len(rev) while 1: tag = 1 for key in rev: if not key.ready(): tag = 0 time.sleep(1) print "sleep 1" if tag: break print "_____________________>"
#test_redis.py
from celery import Celery import time #app = Celery(‘test_redis‘, backend=‘amqp‘, broker=‘redis://100.69.201.116:7000‘) app = Celery(‘test_redis‘, backend=‘redis‘, broker=‘redis://100.69.201.116:7000‘) @app.task def add(x, y): print "------>" time.sleep(5) print "<--------------" return x + y if __name__ == "__main__": app.start()
启动
python test_redis.py worker -c 2
测试
from celery import group from test_redis import * g = group(add.s(2, 3)).apply_async() g = group(add.s(2, 3)).apply_async() g = group(add.s(2, 3)).apply_async() g = group(add.s(2, 3)).apply_async() g = group(add.s(2, 3)).apply_async() for ret in g.get(): print ret print "end-----------------------------------"
结果
5 end-----------------------------------
#test_redis.py
from celery import Celery import time #app = Celery(‘test_redis‘, backend=‘amqp‘, broker=‘redis://100.69.201.116:7000‘) app = Celery(‘test_redis‘, backend=‘redis‘, broker=‘redis://100.69.201.116:7000‘) @app.task def add(x, y): print "------>" time.sleep(5) print "<--------------" return x + y if __name__ == "__main__": app.start()
#test_redis_2.py
from celery import Celery import time #app = Celery(‘test_redis‘, backend=‘amqp‘, broker=‘redis://100.69.201.116:7000‘) app = Celery(‘test_redis_2‘, backend=‘redis‘, broker=‘redis://100.69.201.116:7001‘) @app.task def add_2(x, y): print "=======>" time.sleep(5) print "<=================" return x + y if __name__ == "__main__": app.start()
测试
from celery import group from test_redis import * from test_redis_2 import * ll = [(1,2), (3,4), (5,6)] g = group(add.s(key[0], key[1]) for key in ll).apply_async() for ret in g.get(): print ret print "end redis_1 -----------------------------------" ll = [(1,2), (3,4), (5,6)] g = group(add_2.s(key[0], key[1]) for key in ll).apply_async() for ret in g.get(): print ":", ret print "end redis_2 -----------------------------------"
结果
3 7 11 end redis_1 ----------------------------------- : 3 : 7 : 11 end redis_2 -----------------------------------
注释:需要提前设置下队列
##例1
#test.py
from celery import Celery import time app = Celery(‘test‘, backend=‘amqp‘, broker=‘amqp://admin:admin@localhost:5672//‘) @app.task def add(x, y): print "------>" time.sleep(5) print "<--------------" return x + y if __name__ == "__main__": app.start()
#test_2.py
from celery import Celery import time app = Celery(‘test_2‘, backend=‘amqp‘, broker=‘amqp://admin:admin@localhost:5672//hwzh‘) @app.task def add_2(x, y): print "=====>" time.sleep(5) print "<==========" return x + y if __name__ == "__main__": app.start()
测试
from celery import group from test import * from test_2 import * ll = [(1,2), (3,4), (7,8)] g = group(add.s(key[0], key[1]) for key in ll).apply_async() for ret in g.get(): print ret ll = [(1,2), (3,4), (7,8)] g = group(add_2.s(key[0], key[1]) for key in ll).apply_async() for ret in g.get(): print ret
结果
3 7 15 3 7 15
##例2
#test.py
from celery import Celery import time app = Celery(‘test‘, backend=‘amqp‘, broker=‘amqp://admin:admin@localhost:5672//mq4‘) @app.task def add(x, y): print "------>" time.sleep(2) print "<--------------" return x + y @app.task def sum(x, y): print "------>" time.sleep(2) print "<--------------" return x + y if __name__ == "__main__": app.start()
#eg2.py
from test import * import time rev = [] for i in range(3): rev.append(add.delay(1,3)) for i in range(3): rev.append(sum.delay(1,3)) print "len rev:", len(rev) while 1: tag = 1 for key in rev: if not key.ready(): tag = 0 time.sleep(1) print "sleep 1" if tag: break print "_____________________>"
from celery import Celery app = Celery(‘tasks‘, backend=‘amqp‘, broker=‘amqp://admin:admin@localhost‘) @app.task def add(x, y): return x + y
启动
celery -A tasks_1 worker --loglevel=info
与前例不同:
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f8057931810
- ** ---------- .> transport: amqp://admin:**@localhost:5672//
- ** ---------- .> results: amqp
运行
>>> from tasks_1 import add >>> result = add.delay(1, 3) >>> result.ready() True >>> result.get() 4
from celery import Celery from kombu import Exchange, Queue BROKER_URL = ‘amqp://admin:admin@localhost//‘ app = Celery(‘tasks‘, backend=‘amqp‘,broker=BROKER_URL) app.conf.update( CELERY_ROUTES={ "add1":{"queue":"queue_add1"}, "add2":{"queue":"queue_add2"}, "add3":{"queue":"queue_add3"}, "add4":{"queue":"queue_add4"}, }, ) @app.task def add1(x, y): return x + y @app.task def add2(x, y): return x + y @app.task def add3(x, y): return x + y @app.task def add4(x, y): return x + y
标签:
原文地址:http://www.cnblogs.com/zknublx/p/5943419.html