标签:消费者 问题 received comm 经典的 cep test range bit
from multiprocessing import Queue q = Queue q.put(data) data = q.get(data)
例子:
from multiprocessing import Queue, Process # 写数据的进程 def write(q): for i in [‘a‘,‘b‘,‘c‘,‘d‘]: q.put(i) # 把消息放入队列 print (‘put {0} to queue‘.format(i)) # 读取数据的进程 def read(q): while 1: result = q.get() # 从队列中读取消息 print ("get {0} from queue".format(result)) def main(): # 父进程创建Queue,并传给各个子进程 q = Queue() pw = Process(target=write,args=(q,)) # 使用多进程,传入的参数是消息队列 pr = Process(target=read,args=(q,)) pw.start() # 启动子进程,写入数据 pr.start() # 启动子进程,读取数据 pw.join() # 等待pw进程结束 pr.terminate() #停止 # 相当于join,等pr完成以后,while是一个死循环,这里强制结束,因为读取数据的进程应该是一直监听是否有数据产生,有就会去读取。 if __name__ == ‘__main__‘: main() 结果: put a to queue get a from queue put b to queue get b from queue put c to queue get c from queue put d to queue get d from queue
import time from multiprocessing import Pipe, Process # 发送消息的进程 def proc1(pipe): for i in xrange(1, 10): pipe.send(i) print ("send {0} to pipe".format(i)) time.sleep(1) # 接收消息的进程 def proc2(pipe): n = 9 while n > 0: result = pipe.recv() print ("recv {0} from pipe".format(result)) n -= 1 def main(): pipe = Pipe(duplex=False) # 设置半双工模式,p1只负责发送消息,p2只负责接收消息,pipe是一个tuple类型 p1 = Process(target=proc1, args=(pipe[1],)) p2 = Process(target=proc2, args=(pipe[0],)) #接收写0 p1.start() p2.start() p1.join() p2.join() pipe[0].close() pipe[1].close() if __name__ == ‘__main__‘: main() 结果: send 1 to pipe recv 1 from pipe recv 2 from pipe send 2 to pipe send 3 to pipe recv 3 from pipe recv 4 from pipe send 4 to pipe send 5 to pipe recv 5 from pipe recv 6 from pipe send 6 to pipe recv 7 from pipe send 7 to pipe recv 8 from pipe send 8 to pipe send 9 to pipe recv 9 from pipe
from multiprocessing import Queue from threading import Thread import time """ 一个生产者和两个消费者, 采用多线程继承的方式, 一个消费偶数,一个消费奇数。 """ class Proceducer(Thread): def __init__(self, queue): super(Proceducer, self).__init__() self.queue = queue def run(self): try: for i in xrange(1, 10): print ("put {0} to queue".format(i)) self.queue.put(i) except Exception as e: print ("put data error") raise e class Consumer_even(Thread): def __init__(self, queue): super(Consumer_even, self).__init__() self.queue = queue def run(self): try: while not self.queue.empty(): # 判断队列是否为空 number = self.queue.get(block=True, timeout=3) # 从队列中获取消息,block=True表示阻塞,设置超时未3s if number % 2 == 0: # 如果获取的消息是偶数 print("get {0} from queue EVEN, thread name is {1}".format(number, self.getName())) else: self.queue.put(number) # 如果获取的消息不是偶数,就接着把它放回队列中 time.sleep(1) except Exception as e: raise e class Consumer_odd(Thread): def __init__(self, queue): super(Consumer_odd, self).__init__() self.queue = queue def run(self): try: while not self.queue.empty(): number = self.queue.get(block=True, timeout=3) if number % 2 != 0: # 如果获取的消息是奇数 print("get {0} from queue ODD, thread name is {1}".format(number, self.getName())) else: self.queue.put(number) time.sleep(1) except Exception as e: raise e def main(): queue = Queue() p = Proceducer(queue=queue) # 开始产生消息 print("开始产生消息") p.start() p.join() # 等待生产消息的进程结束 time.sleep(1) # 消息生产完成之后暂停1s c1 = Consumer_even(queue=queue) c2 = Consumer_odd(queue=queue) # 开始消费消息 print("开始消费消息") c1.start() c2.start() c1.join() c2.join() print ("消息消费完成") if __name__ == ‘__main__‘: main() 结果: 开始产生消息 put 1 to queue put 2 to queue put 3 to queue put 4 to queue put 5 to queue put 6 to queue put 7 to queue put 8 to queue put 9 to queue 开始消费消息 get 1 from queue ODD, thread name is Thread-3 get 2 from queue EVEN, thread name is Thread-2 get 3 from queue ODD, thread name is Thread-3 get 4 from queue EVEN, thread name is Thread-2 get 5 from queue ODD, thread name is Thread-3 get 6 from queue EVEN, thread name is Thread-2 get 7 from queue ODD, thread name is Thread-3 get 8 from queue EVEN, thread name is Thread-2 get 9 from queue ODD, thread name is Thread-3 消息消费完成
redis://:password@hostname:port/db_number 例如: BROKER_URL = ‘redis://localhost:6379/0‘
pip install celery pip install redis pip install redis-py-with-geo # 没有安装这个会报错 File "/usr/lib/python2.7/site-packages/kombu/transport/redis.py", line 671, in _receive while c.connection.can_read(timeout=0): TypeError: can_read() got an unexpected keyword argument ‘timeout‘
vi tasks.py #/usr/bin/env python #-*- coding:utf-8 -*- from celery import Celery broker="redis://110.106.106.220:5000/5" backend="redis://110.106.106.220:5000/6" app = Celery("tasks", broker=broker, backend=backend) @app.task def add(x, y): return x+y
现在broker、backend、task都有了,接下来我们就运行worker进行工作,在tasks.py目录运行:
celery -A tasks worker -l info
启动后可以看到如下信息:
[root@izwz920j4zsv1q15yhii1qz scripts]# celery -A celery_test worker -l info /usr/lib/python2.7/site-packages/celery/platforms.py:796: RuntimeWarning: You‘re running the worker with superuser privileges: this is absolutely not recommended! Please specify a different user using the -u option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@izwz920j4zsv1q15yhii1qz v4.1.1 (latentcall) ---- **** ----- --- * *** * -- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2018-05-25 14:28:38 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: celery_test:0x25a6450 - ** ---------- .> transport: redis://110.106.106.220:5000/5 - ** ---------- .> results: redis://110.106.106.220:5000/6 - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2018-05-25 14:28:38,431: INFO/MainProcess] Connected to redis://110.106.106.220:5000/5 [2018-05-25 14:28:38,443: INFO/MainProcess] mingle: searching for neighbors [2018-05-25 14:28:39,475: INFO/MainProcess] mingle: all alone [2018-05-25 14:28:39,528: INFO/MainProcess] celery@izwz920j4zsv1q15yhii1qz ready.
意思就是运行 tasks 这个任务集合的 worker 进行工作(当然此时broker中还没有任务,worker此时相当于待命状态),最后一步,就是触发任务,最简单方式就是再写一个脚本然后调用那个被装饰成 task 的函数。
vi trigger.py from tasks import add result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用 while not result.ready(): # 是否处理 time.sleep(1) print ‘task done: {0}‘.format(result.get()) # 获取结果 print(result.task_id)
[root@izwz920j4zsv1q15yhii1qz scripts]# python trigger.py task done: 8 celery-task-meta-d64def11-6b77-443f-84c2-0cbd850972f2
在之前启动tasks.py的窗口可以看到如下信息:
[2018-05-25 14:28:38,431: INFO/MainProcess] Connected to redis://110.106.106.220:5000/5 [2018-05-25 14:28:38,443: INFO/MainProcess] mingle: searching for neighbors [2018-05-25 14:28:39,475: INFO/MainProcess] mingle: all alone [2018-05-25 14:28:39,528: INFO/MainProcess] celery@izwz920j4zsv1q15yhii1qz ready. [2018-05-25 14:33:30,340: INFO/MainProcess] Received task: tasks.add[d64def11-6b77-443f-84c2-0cbd850972f2] [2018-05-25 14:33:30,373: INFO/ForkPoolWorker-1] Task tasks.add[d64def11-6b77-443f-84c2-0cbd850972f2] succeeded in 0.0313169739966s: 8 [2018-05-25 14:33:47,082: INFO/MainProcess] Received task: tasks.add[5ae26e89-5d91-496e-8e1c-e0504fbbd39a] [2018-05-25 14:33:47,086: INFO/ForkPoolWorker-1] Task tasks.add[5ae26e89-5d91-496e-8e1c-e0504fbbd39a] succeeded in 0.00259069999447s: 8
在redis中查看:
110.106.106.220:5000[5]> select 5 OK 110.106.106.220:5000[5]> keys * 1) "_kombu.binding.celeryev" 2) "_kombu.binding.celery.pidbox" 3) "_kombu.binding.celery" 110.106.106.220:5000[5]> select 6 OK 110.106.106.220:5000[6]> keys * 1) "celery-task-meta-5ae26e89-5d91-496e-8e1c-e0504fbbd39a" 2) "celery-task-meta-d64def11-6b77-443f-84c2-0cbd850972f2" 110.106.106.220:5000[6]> get celery-task-meta-d64def11-6b77-443f-84c2-0cbd850972f2 "{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 8, \"task_id\": \"d64def11-6b77-443f-84c2-0cbd850972f2\", \"children\": []}" 110.106.106.220:5000[6]> get celery-task-meta-5ae26e89-5d91-496e-8e1c-e0504fbbd39a "{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 8, \"task_id\": \"5ae26e89-5d91-496e-8e1c-e0504fbbd39a\", \"children\": []}"
标签:消费者 问题 received comm 经典的 cep test range bit
原文地址:https://www.cnblogs.com/yangjian319/p/9089167.html