标签:结果 ssi 消费 task 直接 地址空间 pipe ESS 平衡
? 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的。而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。
from multiprocessing import Process,Lock
import os,time
def work(lock):
lock.acquire()
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
lock.release()
if __name__ == '__main__':
lock=Lock()
for i in range(3):
p=Process(target=work,args=(lock,))
p.start()
? 加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即将并行修改为串行,运行速度降低,但避免了竞争,保证了数据安全。
? 虽然可以用文件共享数据实现进程间通信,但是:1.效率低(共享数据基于文件,而文件是硬盘上的数据)2.需要自己加锁处理。
? mutiprocessing模块提供了基于消息的IPC通信机制(队列、管道),可以高效率(多个进程共享一块内存数据),改进锁的问题。
? ipc机制:进程通讯
? 管道:pipe基于共享内存空间
? 队列:pipe + 锁
? 队列和管道都是将数据存放于内存中, 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来。
? 方法:
1. q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2. q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3. q.get_nowait():同q.get(False)
4. q.put_nowait():同q.put(False)
5. q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
6. q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
7. q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同8. q.empty()和q.full()一样
? 正常情况:
from multiprocessing import Process,Queue
q = Queue()
q.put('123')
q.put([456])
q.put(000)
print(q.get())
print(q.get())
print(q.get())
print(q.get())
123
[456]
0
? 队列满了继续存值,会阻塞:
from multiprocessing import Process,Queue
q = Queue(4)
q.put('123')
q.put([456])
q.put(789)
q.put(000)
q.put('aaa')
print(q.get())
print(q.get())
print(q.get())
print(q.get())
? q.put(block=True)
from multiprocessing import Process,Queue
q = Queue(3)
q.put('123',block=True,timeout=2)
q.put('123',block=True,timeout=2)
q.put('123',block=True,timeout=2)
q.put('456',block=True,timeout=3)
print(q.get())
print(q.get())
print(q.get())
print(q.get())
raise Full
queue.Full
? q.get(block=True)
from multiprocessing import Process,Queue
q = Queue()
q.put('333')
q.get()
q.get(block=True,timeout=4)
raise Empty
queue.Empty
? q.put(block=False)
from multiprocessing import Process,Queue
q = Queue(2)
q.put('123')
q.put('123')
q.put('123',block=False,)
raise Full
queue.Full
? q.get_nowait():
from multiprocessing import Process,Queue
q = Queue(1)
q.put('123')
q.put_nowait("666")
raise Full
queue.Full
? 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
? 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
? 生产者消费者模型1:
from multiprocessing import Process,Queue
def producer(q,name,food):
for i in range(1,10):
print(f'{name}生产了{food}{i}')
res = f'{food}{i}'
q.put(res)
q.put(None)
def consumer(q,name):
while True:
res = q.get(timeout=5)
if res is None:
break
print(f'{name}吃了{res}')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer,args=(q,'狗不理','包子'))
c1 = Process(target=consumer,args=(q,'狗'))
p1.start()
c1.start()
狗不理生产了包子1
狗不理生产了包子2
狗不理生产了包子3
狗不理生产了包子4
狗不理生产了包子5
狗不理生产了包子6
狗不理生产了包子7
狗不理生产了包子8
狗不理生产了包子9
狗吃了包子1
狗吃了包子2
狗吃了包子3
狗吃了包子4
狗吃了包子5
狗吃了包子6
狗吃了包子7
狗吃了包子8
狗吃了包子9
? 生产者消费者模型2:
from multiprocessing import Process, Queue
import time,random
def producer(q,name,food):
for i in range(1,4):
print(f'{name}生产了{food}{i}')
time.sleep(random.randint(1,3))
res = f'{food}{i}'
q.put(res)
def consumer(q,name):
while True:
res = q.get(timeout= 5)
if res is None:
break
time.sleep(random.randint(1,3))
print(f'{name}吃了{res}')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer,args=(q,'狗不理','包子'))
p2 = Process(target=producer, args=(q, '猫不理', '包子'))
p3 = Process(target=producer,args=(q,'猪不理','包子'))
c1 = Process(target=consumer,args=(q,'猪'))
c2 = Process(target=consumer,args=(q,'狗'))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
狗不理生产了包子1
猪不理生产了包子1
猫不理生产了包子1
猫不理生产了包子2
猪不理生产了包子2
狗不理生产了包子2
猫不理生产了包子3
猪吃了包子1
猪不理生产了包子3
猪吃了包子1
狗吃了包子1
狗不理生产了包子3
狗吃了包子2
猪吃了包子2
狗吃了包子3
猪吃了包子3
狗吃了包子2
猪吃了包子3
? 生产者消费者模型3:
from multiprocessing import Process, Queue,JoinableQueue
import time,random
def producer(q,name,food):
for i in range(1,4):
print(f'{name}生产了{food}{i}')
time.sleep(random.randint(1,3))
res = f'{food}{i}'
q.put(res)
def consumer(q,name):
while True:
res = q.get()
time.sleep(random.randint(1,3))
print(f'{name}吃了{res}')
q.task_done()
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer,args=(q,'狗不理','包子'))
p2 = Process(target=producer, args=(q, '猫不理', '包子'))
p3 = Process(target=producer,args=(q,'猪不理','包子'))
c1 = Process(target=consumer,args=(q,'猪'))
c2 = Process(target=consumer,args=(q,'狗'))
p1.start()
p2.start()
p3.start()
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.join()
狗不理生产了包子1
猫不理生产了包子1
猪不理生产了包子1
猫不理生产了包子2
狗不理生产了包子2
猫不理生产了包子3
猪不理生产了包子2
狗吃了包子1
猪吃了包子1
狗吃了包子2
狗不理生产了包子3
猪不理生产了包子3
狗吃了包子3
猪吃了包子1
猪吃了包子2
狗吃了包子2
狗吃了包子3
猪吃了包子3
? joinableQueue内有.task_done()方法,可以将.put()方法放入队列内的一个任务完成,可将队列想象成一个计数器 :put +1 task_done -1,计数器不为0的时候join()方法阻塞等待计数器为0后通过。
? 在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程
线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程
? 车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线
? 流水线的工作需要电源,电源就相当于cpu
所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。
? 多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。
?
标签:结果 ssi 消费 task 直接 地址空间 pipe ESS 平衡
原文地址:https://www.cnblogs.com/tangceng/p/11529923.html