标签:读取 offset form flag 抢票 内容 无法 红绿灯 逻辑
抢票的例子:
# -*- coding:utf-8 -*- from multiprocessing import Process, Lock import time import json count = {‘count‘: 1} # 仅剩最后一张票 with open(‘db.txt‘, ‘w‘, encoding=‘utf-8‘) as f: json.dump(count, f) # 返回剩余票数 def search(): dic = json.load(open(‘db.txt‘)) print(‘剩余票数%s‘ % dic[‘count‘]) return dic def get_ticket(dic): time.sleep(0.1) # 模拟读数据的网络延迟 if dic[‘count‘] > 0: dic[‘count‘] -= 1 time.sleep(0.2) # 模拟写数据的网络延迟 json.dump(dic, open(‘db.txt‘, ‘w‘)) print(‘购票成功,剩余:{}‘.format(dic[‘count‘])) else: print(‘抢票失败,去邀请好友助力!‘) def ticket_purchase(lock, i): print(‘第{}个用户‘.format(i)) # lock.acquire() get_ticket(search()) # lock.release() if __name__ == ‘__main__‘: lock = Lock() for i in range(10): # 模拟并发10个客户端抢票 p = Process(target=ticket_purchase, args=(lock, i + 1)) p.start()
结果:
第6个用户
剩余票数1
第4个用户
剩余票数1
第7个用户
剩余票数1
第1个用户
剩余票数1
第10个用户
剩余票数1
第3个用户
剩余票数1
第5个用户
剩余票数1
第8个用户
剩余票数1
第2个用户
剩余票数1
第9个用户
剩余票数1
购票成功,剩余:0
购票成功,剩余:0
购票成功,剩余:0
购票成功,剩余:0
购票成功,剩余:0
购票成功,剩余:0
购票成功,剩余:0
购票成功,剩余:0
购票成功,剩余:0
购票成功,剩余:0
十个用户会同时把票抢走,因为每次search同一时间能查到只有一个票
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
如果抢票步骤没有加锁,那么可能会有几个人同时把票抢走,因为每次search都能查到有一个票,加了锁以后只能一个一个抢
加锁:
# -*- coding:utf-8 -*- from multiprocessing import Process, Lock import time import json count = {‘count‘: 1} # 仅剩最后一张票 with open(‘db.txt‘, ‘w‘, encoding=‘utf-8‘) as f: json.dump(count, f) # 返回剩余票数 def search(): dic = json.load(open(‘db.txt‘)) print(‘剩余票数%s‘ % dic[‘count‘]) return dic def get_ticket(dic): time.sleep(0.1) # 模拟读数据的网络延迟 if dic[‘count‘] > 0: dic[‘count‘] -= 1 time.sleep(0.2) # 模拟写数据的网络延迟 json.dump(dic, open(‘db.txt‘, ‘w‘)) print(‘购票成功,剩余:{}‘.format(dic[‘count‘])) else: print(‘抢票失败,去邀请好友助力!‘) def ticket_purchase(lock, i): print(‘第{}个用户‘.format(i)) lock.acquire() get_ticket(search()) lock.release() if __name__ == ‘__main__‘: lock = Lock() for i in range(10): # 模拟并发10个客户端抢票 p = Process(target=ticket_purchase, args=(lock, i + 1)) p.start()
结果:
第2个用户
剩余票数1
第1个用户
第9个用户
第10个用户
第5个用户
第7个用户
第8个用户
第3个用户
第6个用户
第4个用户
购票成功,剩余:0
剩余票数0
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
从结果可以看出,并不是手速最快的才能抢到
RLock
支持上下文管理器协议,因此可以在with
语句中使用。
AssertionError
。请注意,在这种情况下引发的异常类型与threading.rlock.release()中实现的行为不同 【后者引发RuntimeError
】。
若你的线程处理中会有一些比较复杂的代码逻辑过程,比如很多层的函数调用,而这些函数其实都需要进行加锁保护数据访问。这样就可能会反复的多次加锁,因而用RLock就可以进行多次加锁,解锁,直到最终锁被释放,而如果用普通的lock,当你一个函数A已经加锁,它内部调用另一个函数B,如果B内部也会对同一个锁加锁,那么这种情况就也会导致死锁。
threading.Semaphore
此类实现信号量对象。信号量管理表示release()调用数减去acquire()调用数再加上原子计数器的初始值。当计数器为0时,acquire()方法将一直阻塞,直到它可以返回而不使计数器为负为止。如果未给定,则值默认为1。acquire
方法的第一个参数被命名为block,和multiprocessing.Lock.acquire() 一致
如果内部计数器大于零,则将其递减1并立即返回true。
如果内部计数器为零,则阻塞直到调用 release()唤醒
。一旦唤醒(计数器大于0),将计数器递减1并返回true。每次调用release()都会唤醒一个进程。不应依赖进程被唤醒的顺序。
【acquire()和release()不一定一对一,是否阻塞要取决于计数器的值】
# -*- coding:utf-8 -*- from multiprocessing import Semaphore, Process import time import random def enter_room(smp, i): if smp.acquire(block=True, timeout=random.randint(1, 3)): # 超时还未获取,返回false,反之返回True print(‘用户%d进入了房间‘ % i) time.sleep(1) smp.release() print(‘用户%d离开了房间‘ % i) else: print(‘等太久,走人‘) if __name__ == ‘__main__‘: smp = Semaphore(2) for i in range(10): p = Process(target=enter_room, args=(smp, i)) p.start()
结果:
用户5进入了房间
用户8进入了房间
用户5离开了房间
用户0进入了房间
用户8离开了房间用户9进入了房间
等太久,走人
用户0离开了房间
用户2进入了房间
用户9离开了房间
用户4进入了房间
等太久,走人
等太久,走人
用户2离开了房间用户6进入了房间
用户4离开了房间
用户6离开了房间
threading.Event
set()
方法设置为true,并使用clear()方法重置为false 。flag = False,wait()
方法将阻塞,直到该flag为True。flag初始值是Flase。
将阻塞,直到set()被调用以再次将内部标志设置为true。
红绿灯:
# -*- coding:utf-8 -*- from multiprocessing import Event, Process, Lock import time # 红绿灯 def light(e): while 1: if e.is_set(): # 为True,flag为True print(‘红灯‘) e.clear() # 重置为False,调用wait()的进程阻塞 time.sleep(5) else: print(‘绿灯‘) e.set() time.sleep(5) def car(e, i, l): while 1: l.acquire() # 先获取锁,确认下一辆通行的车 e.wait() # 红灯停,绿灯行 print(‘奔驰{}以两秒的时间飘过‘.format(i)) time.sleep(2) l.release() if __name__ == ‘__main__‘: e = Event() l = Lock() p = Process(target=light, args=(e,)) p.start() for i in range(5): # 5辆车 p = Process(target=car, args=(e, i, l)) p.start()
结果:
绿灯
奔驰4以两秒的时间飘过
奔驰2以两秒的时间飘过
奔驰1以两秒的时间飘过
红灯
绿灯
奔驰3以两秒的时间飘过
奔驰0以两秒的时间飘过
奔驰4以两秒的时间飘过
红灯
# -*- coding:utf-8 -*- from multiprocessing import Event, Process, Lock import time import random # 红绿灯 def light(e): while 1: if e.is_set(): # 为True,flag为True print(‘红灯‘) e.clear() # 重置为False,调用wait()的进程阻塞 time.sleep(5) else: print(‘绿灯‘) e.set() time.sleep(5) def car(e, i, l): while 1: l.acquire() # 先获取锁,确认下一辆通行的车,如果没有锁那么就同时过红绿灯 if e.wait(random.randint(0, 3)): # 红灯停,绿灯行 print(‘奔驰{}以两秒的时间飘过‘.format(i)) else: print(‘奔驰{}闯红灯以两秒的时间飘过‘.format(i)) time.sleep(2) l.release() if __name__ == ‘__main__‘: e = Event() l = Lock() p = Process(target=light, args=(e,)) p.start() for i in range(5): # 5辆车 p = Process(target=car, args=(e, i, l)) p.start()
结果:
绿灯
奔驰4以两秒的时间飘过
奔驰2以两秒的时间飘过
奔驰1以两秒的时间飘过
红灯
奔驰3闯红灯以两秒的时间飘过
绿灯
奔驰0以两秒的时间飘过
奔驰4以两秒的时间飘过
红灯
奔驰2闯红灯以两秒的时间飘过
奔驰1闯红灯以两秒的时间飘过
(conn1, conn2)
Connection.
True(
默认值),则管道是双向的。如果duplex是False,
管道是单向的:conn1
只能用于接收消息,conn2
只能用于发送消息
recv()
。ValueError
异常。(大约32 MiB +,虽然它取决于操作系统)
send()
。阻塞直到有东西要收到。如果没有什么留下来接收,而另一端被关闭。抛出 EOFError
【若另一端已关闭,则触发BrokenPipeError异常】
ValueError
异常(大约32 MiB +,取决于操作系统)
传输字符串数据:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print(‘{}生产第{}包子‘.format(name, i + 1)) right.send(‘包子‘) time.sleep(1) right.close() def consume(left, name): while 1: try: goods = left.recv() print(‘{}消费了一个{}‘.format(name, goods)) except EOFError: # 关闭另一端,由recv触发此异常 left.close() break if __name__ == ‘__main__‘: left, right = Pipe() print(‘文件描述符:{}‘.format(left.fileno())) p = Process(target=produce, args=(right, ‘生产者1‘)) # 生产 c = Process(target=consume, args=(left, ‘消费者1‘)) # 消费 p.start() c.start() right.close() # 关闭多余的两端 left.close()
结果:
文件描述符:436
生产者1生产第1包子
消费者1消费了一个包子
生产者1生产第2包子
消费者1消费了一个包子
生产者1生产第3包子
消费者1消费了一个包子
多个消费者:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print(‘{}生产第{}包子‘.format(name, i + 1)) right.send(‘包子‘) time.sleep(1) right.close() def consume(left, name): while 1: try: goods = left.recv() print(‘{}消费了一个{}‘.format(name, goods)) except EOFError: # 关闭另一端,由recv触发此异常 left.close() break if __name__ == ‘__main__‘: left, right = Pipe() print(‘文件描述符:{}‘.format(left.fileno())) p = Process(target=produce, args=(right, ‘生产者1‘)) # 生产 c1 = Process(target=consume, args=(left, ‘消费者1‘)) # 消费 c2 = Process(target=consume, args=(left, ‘消费者2‘)) # 消费 c3 = Process(target=consume, args=(left, ‘消费者3‘)) # 消费 p.start() c1.start() c2.start() c3.start() right.close() # 关闭多余的两端 left.close()
结果:
文件描述符:432
生产者1生产第1包子
消费者2消费了一个包子
生产者1生产第2包子
消费者2消费了一个包子
生产者1生产第3包子
消费者3消费了一个包子
请注意,如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。
传输字节:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print(‘{}生产第{}包子‘.format(name, i + 1)) right.send_bytes(‘包子‘.encode()) time.sleep(1) right.send_bytes(‘包子包子包子‘.encode()) right.close() def consume(left, name): while 1: try: byte_content = bytearray(10) bytes_size = left.recv_bytes_into(byte_content) print(‘{}消费了一个{}‘.format(name, byte_content.decode())) print(‘接收了{}个数据‘.format(bytes_size)) except EOFError: # 关闭另一端,由recv触发此异常 left.close() break except BufferTooShort as e: print(‘数据太长,完整数据为:{}‘.format(e.args[0].decode())) if __name__ == ‘__main__‘: left, right = Pipe() print(‘文件描述符:{}‘.format(left.fileno())) p = Process(target=produce, args=(right, ‘生产者1‘)) # 生产 c = Process(target=consume, args=(left, ‘消费者1‘)) # 消费 p.start() c.start() right.close() # 关闭多余的两端 left.close()
结果:
文件描述符:476
生产者1生产第1包子
消费者1消费了一个包子
接收了6个数据
生产者1生产第2包子
消费者1消费了一个包子
接收了6个数据
生产者1生产第3包子
消费者1消费了一个包子
接收了6个数据
数据太长,完整数据为:包子包子包子
奇怪的poll(),分析下面两个代码结果:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print(‘{}生产第{}包子‘.format(name, i + 1)) right.send(‘包子‘) time.sleep(3) right.close() print(‘right已关闭‘) def consume(left, name): while 1: try: print(‘poll阻塞‘) print(‘是否有可供读取的数据:{}‘.format(left.poll(None))) goods = left.recv() print(‘{}消费了一个{}‘.format(name, goods)) except EOFError: # 已关闭另一端,由recv触发此异常 left.close() break if __name__ == ‘__main__‘: left, right = Pipe() print(‘文件描述符:{}‘.format(left.fileno())) p = Process(target=produce, args=(right, ‘生产者1‘)) # 生产 c = Process(target=consume, args=(left, ‘消费者1‘)) # 消费 p.start() c.start() right.close() # 关闭多余的两端 left.close()
结果:
文件描述符:544
poll阻塞
生产者1生产第1包子
生产者1生产第2包子
生产者1生产第3包子
是否有可供读取的数据:True
消费者1消费了一个包子
poll阻塞
是否有可供读取的数据:True
消费者1消费了一个包子
poll阻塞
是否有可供读取的数据:True
消费者1消费了一个包子
poll阻塞
right已关闭
是否有可供读取的数据:True
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print(‘{}生产第{}包子‘.format(name, i + 1)) right.send(‘包子‘) # time.sleep(3) right.close() print(‘right已关闭‘) def consume(left, name): while 1: try: print(‘poll阻塞‘) print(‘是否有可供读取的数据:{}‘.format(left.poll(None))) goods = left.recv() print(‘{}消费了一个{}‘.format(name, goods)) except EOFError: # 已关闭另一端,由recv触发此异常 left.close() break if __name__ == ‘__main__‘: left, right = Pipe() print(‘文件描述符:{}‘.format(left.fileno())) p = Process(target=produce, args=(right, ‘生产者1‘)) # 生产 c = Process(target=consume, args=(left, ‘消费者1‘)) # 消费 p.start() c.start() right.close() # 关闭多余的两端 left.close()
结果:
文件描述符:440
生产者1生产第1包子
生产者1生产第2包子
生产者1生产第3包子
right已关闭
poll阻塞
是否有可供读取的数据:True
消费者1消费了一个包子
poll阻塞
是否有可供读取的数据:True
消费者1消费了一个包子
poll阻塞
是否有可供读取的数据:True
消费者1消费了一个包子
poll阻塞
Process Process-2:
Traceback (most recent call last):
......
BrokenPipeError: [WinError 109] 管道已结束。
第四次循环poll(None)的执行若先于管道的right端关闭代码right.close()的执行,poll(None)返回True,并以recv引发的异常结束。反之,poll(None)引发BrokenPipeError异常
队列是线程和进程安全的。
队列进程安全
生产消费者模型,队列实现:
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue, JoinableQueue import os def consumer(q): while True: print(‘消费者进程{}等吃‘.format(os.getpid())) res = q.get() if res is None: print(‘消费者进程{}结束‘.format(os.getpid(), res)) break # 收到结束信号则结束 else: print(‘消费者进程{}吃了{}‘.format(os.getpid(), res)) def producer(food, q): for i in range(2): q.put(food) print(‘生产者进程{}生产了 第{}个{}‘.format(os.getpid(), i + 1, food)) print(‘生产者进程{}生产完成‘.format(os.getpid())) if __name__ == ‘__main__‘: q = Queue() # 生产者 p1 = Process(target=producer, args=(‘包子‘, q)) p2 = Process(target=producer, args=(‘水果‘, q)) p3 = Process(target=producer, args=(‘米饭‘, q)) # 消费者 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) # 开始 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() # 有几个消费者就put几个None q.put(None) # 必须保证生产者全部生产完毕,才应该发送结束信号 q.put(None) q.put(None)
结果:
消费者进程12108等吃
消费者进程3648等吃
生产者进程19544生产了 第1个包子
生产者进程19544生产了 第2个包子
生产者进程19544生产完成
消费者进程12108吃了包子
消费者进程12108等吃
消费者进程3648吃了包子
消费者进程3648等吃
生产者进程828生产了 第1个米饭
消费者进程12108吃了米饭生产者进程828生产了 第2个米饭
消费者进程12108等吃
生产者进程828生产完成
消费者进程3648吃了米饭
消费者进程3648等吃
生产者进程20244生产了 第1个水果
消费者进程12108吃了水果生产者进程20244生产了 第2个水果
消费者进程12108等吃
生产者进程20244生产完成
消费者进程3648吃了水果
消费者进程3648等吃
消费者进程12108结束
消费者进程3648结束
由于消费者收到None才能结束,因此要注意两个问题,None必须在队列尾部,几个消费者,尾部就应该有几个None
生产消费者模型,JoinableQueue实现
# -*- coding:utf-8 -*- from multiprocessing import Process,Queue, JoinableQueue import os def consumer(q): while 1: print(‘消费者进程{}等吃‘.format(os.getpid())) res = q.get() q.task_done() # Semaphore - 1 print(‘消费者进程{}吃了{}‘.format(os.getpid(), res)) def producer(food, q): for i in range(2): q.put(food) print(‘生产者进程{}生产了 第{}个{}‘.format(os.getpid(), i + 1, food)) print(‘生产者进程{}生产完成,等待消费者消费‘.format(os.getpid())) q.join() # 等待消费者进程 if __name__ == ‘__main__‘: q = JoinableQueue() # 生产者 p1 = Process(target=producer, args=(‘包子‘, q)) p2 = Process(target=producer, args=(‘水果‘, q)) p3 = Process(target=producer, args=(‘米饭‘, q)) # 消费者 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) c1.daemon = True c2.daemon = True # 开始 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join()
结果:
消费者进程9952等吃
消费者进程3840等吃
生产者进程10980生产了 第1个包子
生产者进程10980生产了 第2个包子
生产者进程10980生产完成,等待消费者消费
消费者进程9952吃了包子
消费者进程9952等吃
消费者进程3840吃了包子
消费者进程3840等吃
生产者进程7452生产了 第1个水果
生产者进程18556生产了 第1个米饭
消费者进程9952吃了水果
消费者进程9952等吃
生产者进程7452生产了 第2个水果
生产者进程7452生产完成,等待消费者消费
生产者进程18556生产了 第2个米饭
生产者进程18556生产完成,等待消费者消费
消费者进程3840吃了米饭
消费者进程3840等吃
消费者进程9952吃了水果
消费者进程9952等吃
消费者进程3840吃了米饭
消费者进程3840等吃
其思路就是put之后,有个信号量计数器+1 ,每get一下调用一下taskdone,计数器就会-1。如果生产者很快生产完后,调用join,进程会等待,等到计数器为0的时候,所有调用join()的生产者会被唤醒。因此,生产者唤醒了-->意味着消费者已经消费完,消费者由于死循环还在等吃的(get阻塞)。设置消费者线程为守护线程,让主进程随着生产者进程的结束而结束,主进程 结束后,中止守护线程(消费者)
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue def f(q): q.put(‘X‘ * 1000000) # q.cancel_join_thread() if __name__ == ‘__main__‘: queue = Queue() p = Process(target=f, args=(queue,)) p.start() print(‘join阻塞‘) p.join() # this deadlocks print(‘get阻塞‘) obj = queue.get() # q.cancel_join_thread()执行后,join()不阻塞,但是get()拿不到数据,数据丢失,导致阻塞 print(obj)
multiprocessing.Queue底层是基于Pipe构建的,但是数据传递时并不是直接写入Pipe,而是写入进程本地buffer,通过一个feeder线程写入底层Pipe,
因此一次put数据很大的时候,会一直等待get()取出。没有get()就join该进程,会导致死锁
你可以创建一个进程池,进程将使用Pool类执行提交给它的任务。
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def func(n): print(‘i={}, pid={}‘.format(n, os.getpid())) time.sleep(1) return n**2 if __name__ == ‘__main__‘: p = Pool(5) for i in range(10): # p.apply(func, (i,)) # 只在一个进程中执行,会阻塞主进程 p.apply_async(func, (i,)) # 适合并行,一下由五个进程处理五个任务,不阻塞主进程 print(‘主进程‘) p.close() p.join()
结果:
主进程
i=0, pid=6540
i=1, pid=1348
i=2, pid=17060
i=3, pid=7632
i=4, pid=7396
i=5, pid=6540
i=6, pid=1348
i=7, pid=7396
i=8, pid=7632
i=9, pid=17060
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n): print(‘i={}, pid={}‘.format(n, os.getpid())) time.sleep(1) return n if __name__ == ‘__main__‘: p = Pool(4) # result = p.map(fun, [(1, 2), (1, 2)], chunksize=1) # map阻塞主进程,结果出来后,再解除阻塞 result = p.map_async(fun, [(1, 2), (1, 2)], chunksize=1) # 异步,不阻塞主线程,任务还在子进程进行; print(‘主进程‘) # print(result) # map返回列表,可直接打印 print(result.get()) # map_async返回结果对象 p.close() p.join()
结果:
主进程
i=(1, 2), pid=2004
i=(1, 2), pid=5328
[(1, 2), (1, 2)]
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n): print(‘i={}, pid={}‘.format(n, os.getpid())) time.sleep(1) return n if __name__ == ‘__main__‘: p = Pool(4) # result = p.imap(fun, [(1, 2), (3, 4)], chunksize=1) # 异步 result = p.imap_unordered(fun, [(1, 2), (3, 4)], chunksize=1) # 异步,不阻塞主线程,任务还在子进程进行,结果无序; print(‘主进程‘) for i in result: # imap返回迭代器 print(i) p.close() p.join()
结果:
主进程
i=(1, 2), pid=17396
i=(3, 4), pid=12496
(1, 2)
(3, 4)
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n, k): print(‘i={}, pid={}‘.format(n, os.getpid())) time.sleep(1) return n, k if __name__ == ‘__main__‘: p = Pool(4) # result = p.starmap(fun, [(1, 2), (3, 4)], chunksize=1) # 阻塞,直到全部结果处理完 result = p.starmap_async(fun, [(1, 2), (3, 4)], chunksize=1) # 异步,不阻塞主线程,任务还在子进程进行; print(‘主进程‘) # print(result) # starmap返回列表,直接打印 print(result.get()) p.close() p.join()
结果:
主进程
i=1, pid=14660
i=3, pid=10564
[(1, 2), (3, 4)]
使用进程池实现抢票:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pool, Manager import time import json count = {‘count‘: 1} # 仅剩最后一张票 with open(‘db.txt‘, ‘w‘, encoding=‘utf-8‘) as f: json.dump(count, f) # 返回剩余票数 def search(): dic = json.load(open(‘db.txt‘)) print(‘剩余票数%s‘ % dic[‘count‘]) return dic def get_ticket(dic): time.sleep(0.1) # 模拟读数据的网络延迟 if dic[‘count‘] > 0: dic[‘count‘] -= 1 time.sleep(0.2) # 模拟写数据的网络延迟 json.dump(dic, open(‘db.txt‘, ‘w‘)) print(‘购票成功,剩余:{}‘.format(dic[‘count‘])) else: print(‘抢票失败,去邀请好友助力!‘) def ticket_purchase(lock, i): print(‘第{}个用户‘.format(i)) lock.acquire() get_ticket(search()) lock.release() if __name__ == ‘__main__‘: lock = Manager().Lock() # 要使用Manager().Lock() p = Pool(5) for i in range(10): # 模拟并发10个客户端抢票 p.apply_async(ticket_purchase, (lock, i + 1)) p.close() p.join()
结果:
第1个用户
剩余票数1
第2个用户
第3个用户
第4个用户
第5个用户
购票成功,剩余:0
剩余票数0
第6个用户
抢票失败,去邀请好友助力!
剩余票数0
第7个用户
抢票失败,去邀请好友助力!
剩余票数0
第8个用户
抢票失败,去邀请好友助力!
剩余票数0
第9个用户
抢票失败,去邀请好友助力!
剩余票数0
第10个用户
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
剩余票数0
抢票失败,去邀请好友助力!
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import sys def func(x): print("pid: ", os.getpid(), " got: ", x) sys.stdout.flush() return [x, x+1] def got(r): print("got result: ", r) if __name__ == ‘__main__‘: pool = Pool(processes=1, maxtasksperchild=9) # 进程执行了九个任务就会退出,换新的进程执行 keys = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] result = pool.map_async(func, keys, chunksize=1, callback=got) # chunksize指定每chuncksize个元素为一个任务 # result = pool.map_async(func, keys, chunksize=2, callback=got) # chunksize为2说明此时只有五个任务,没有换新的进程执行 pool.close() pool.join()
结果:
pid: 8188 got: 1
pid: 8188 got: 2
pid: 8188 got: 3
pid: 8188 got: 4
pid: 8188 got: 5
pid: 8188 got: 6
pid: 8188 got: 7
pid: 8188 got: 8
pid: 8188 got: 9
pid: 10860 got: 10
got result: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10], [10, 11]]
参考:
标签:读取 offset form flag 抢票 内容 无法 红绿灯 逻辑
原文地址:https://www.cnblogs.com/Magic-Dev/p/11432588.html