标签:基于 为我 大量 app 不可 实现 技术 管道 def
一、概念
进程:进程,是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
重点:1. 是一次运行活动,比如qq是程序,pycharm是程序,只有运行起来才是进程。
2.是系统进行资源分配和调度的基本单位,每个进程运行时,系统都会为他分配各自的内存,数据。每个进程间的空间是各自独立的。
二、实例结构
import multiprocessing import time def worker_1(interval): print "worker_1" time.sleep(interval) print "end worker_1" def worker_2(interval): print "worker_2" time.sleep(interval) print "end worker_2" def worker_3(interval): print "worker_3" time.sleep(interval) print "end worker_3" if __name__ == "__main__": p1 = multiprocessing.Process(target = worker_1, args = (2,)) p2 = multiprocessing.Process(target = worker_2, args = (3,)) p3 = multiprocessing.Process(target = worker_3, args = (4,)) p1.start() p2.start() p3.start() print("The number of CPU is:" + str(multiprocessing.cpu_count())) for p in multiprocessing.active_children(): print("child p.name:" + p.name + "\tp.id" + str(p.pid)) print ("END!!!!!!!!!!!!!!!!!")
输出:
The number of CPU is:4 child p.name:Process-1 p.id5791 child p.name:Process-3 p.id5793 child p.name:Process-2 p.id5792 END!!!!!!!!!!!!!!!!! worker_1 worker_2 worker_3 end worker_1 end worker_2 end worker_3
三、进程池
python中,进程池内部会维护一个进程序列。当需要时,程序会去进程池中获取一个进程。如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
因为每个进程需要有自己的内存,数据,和cpu消耗,我们不可能开大量的进程,进程池就为我们提供了这样的便捷,在进程池中固定好可用进程的数量,实现多进程操作。
实例:
from multiprocessing import Pool import time def func(arg): time.sleep(1) print(‘arg is ‘,arg) if __name__ == ‘__main__‘: pl = Pool(2) # 同时可以开启2个进程 for i in range(10): pl.apply_async(func=func,args=(i,)) # apply_async是异步执行,apply是同步执行 pl.close() time.sleep(1) pl.terminate() # 关闭进程池 pl.join() # 阻塞进程池 print(‘done‘)
执行结果:
arg is 1 arg is 0 done
它只输出了两个,在程序执行过程中,关闭进程池,程序会立即停止,不会再向后执行。
去掉terminate
from multiprocessing import Pool import time def func(arg): time.sleep(1) print(‘arg is ‘,arg) if __name__ == ‘__main__‘: pl = Pool(2) for i in range(10): pl.apply_async(func=func,args=(i,)) pl.close() # close执行之后不会有新的进程加入到pool time.sleep(1) # pl.terminate() pl.join() # 在join之前调用close,否则会报错 print(‘done‘)
arg is 0 arg is 1 arg is 2 arg is 3 arg is 4 arg is 5 arg is 6 arg is 7 arg is 8 arg is 9 done
四、守护进程:不阻挡主程序退出,主进程结束守护进程立即结束
def worker(interval): print("work start:{0}".format(time.ctime())) time.sleep(interval) print("work end:{0}".format(time.ctime())) if __name__ == ‘__main__‘: p = multiprocessing.Process(target=worker,args=(3,)) p.daemon = True # 将p设置为守护进程 p.start() print(‘end‘)
输出结果:end
这里将子进程设置为了守护进程,他就不会阻挡主程序的退出,即主程序运行完结束运行。
def worker(interval): print("work start:{0}".format(time.ctime())) time.sleep(interval) print("work end:{0}".format(time.ctime())) if __name__ == ‘__main__‘: p = multiprocessing.Process(target=worker,args=(3,)) p.daemon = True p.start() p.join() # 设置阻塞,等待子进程结束 print(‘end‘)
work start:Mon May 13 14:45:50 2019 work end:Mon May 13 14:45:53 2019 end
五、继承Process类的形式开启进程
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name = name def run(self) -> None: print(os.getpid()) print(f‘我是{self.name}‘) p1 = MyProcess(‘mike‘) p2 = MyProcess(‘jack‘) p3 = MyProcess(‘allen‘) lis = [p1,p2,p3] p1.start() p2.start() p3.start() for i in lis: i.join() print(‘主线程‘)
6031 我是mike 6032 我是jack 6033 我是allen 主线程
六、互斥锁
多个任务可以在几个进程之间并发处理,但他们之间没有顺序,开启后无法控制,会引发数据安全和顺序混乱的问题。
一个小例子:
def task1(): print(‘这是 task1 任务‘.center(30, ‘-‘)) print(‘task1 进了洗手间‘) time.sleep(random.randint(1, 3)) print(‘task1 办事呢...‘) time.sleep(random.randint(1, 3)) print(‘task1 走出了洗手间‘) def task2(): print(‘这是 task2 任务‘.center(30, ‘-‘)) print(‘task2 进了洗手间‘) time.sleep(random.randint(1, 3)) print(‘task2 办事呢...‘) time.sleep(random.randint(1, 3)) print(‘task2 走出了洗手间‘) def task3(): print(‘这是 task3 任务‘.center(30, ‘-‘)) print(‘task3 进了洗手间‘) time.sleep(random.randint(1, 3)) print(‘task3 办事呢...‘) time.sleep(random.randint(1, 3)) print(‘task3 走出了洗手间‘) if __name__ == ‘__main__‘: p1 = Process(target=task1) p2 = Process(target=task2) p3 = Process(target=task3) p1.start() p2.start() p3.start()
输出结果:
---------这是 task1 任务---------- task1 进了洗手间 ---------这是 task2 任务---------- task2 进了洗手间 ---------这是 task3 任务---------- task3 进了洗手间 task3 办事呢... task1 办事呢... task2 办事呢... task1 走出了洗手间 task3 走出了洗手间 task2 走出了洗手间
引入锁后:
from multiprocessing import Process, Lock import time import random # 生成一个互斥锁 mutex_lock = Lock() def task1(lock): # 锁门 lock.acquire() print(‘这是 task1 任务‘.center(30, ‘-‘)) print(‘task1 进了洗手间‘) time.sleep(random.randint(1, 3)) print(‘task1 办事呢...‘) time.sleep(random.randint(1, 3)) print(‘task1 走出了洗手间‘) # 释放锁 lock.release() def task2(lock): # 锁门 lock.acquire() print(‘这是 task2 任务‘.center(30, ‘-‘)) print(‘task2 进了洗手间‘) time.sleep(random.randint(1, 3)) print(‘task2 办事呢...‘) time.sleep(random.randint(1, 3)) print(‘task2 走出了洗手间‘) # 释放锁 lock.release() def task3(lock): # 锁门 lock.acquire() print(‘这是 task3 任务‘.center(30, ‘-‘)) print(‘task3 进了洗手间‘) time.sleep(random.randint(1, 3)) print(‘task3 办事呢...‘) time.sleep(random.randint(1, 3)) print(‘task3 走出了洗手间‘) # 释放锁 lock.release() if __name__ == ‘__main__‘: p1 = Process(target=task1, args=(mutex_lock, )) p2 = Process(target=task2, args=(mutex_lock, )) p3 = Process(target=task3, args=(mutex_lock, )) # 释放新建进程的信号,具体谁先启动无法确定 p1.start() p2.start() p3.start()
---------这是 task1 任务---------- task1 进了洗手间 task1 办事呢... task1 走出了洗手间 ---------这是 task2 任务---------- task2 进了洗手间 task2 办事呢... task2 走出了洗手间 ---------这是 task3 任务---------- task3 进了洗手间 task3 办事呢... task3 走出了洗手间
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,在牺牲速度的前提下保证数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1. 效率低(共享数据基于文件,而文件是硬盘上的数据)
2. 需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:
1. 效率高(多个进程共享一块内存的数据)
2. 帮我们处理好锁问题。
mutiprocessing模块中为我们提供了一个IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中,队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。
七、队列
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
下面是一个队列使用的简单用法。
from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(2) q.put(1) # q.put(3) # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。 # 如果队列中的数据一直不被取走,程序就会永远停在这里。 try: q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。 except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。 print(‘队列已经满了‘) # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。 print(q.full()) #满了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。 try: q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。 except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。 print(‘队列已经空了‘) print(q.empty()) #空了
进程间的数据共享:
import os import time import multiprocessing # 向queue中输入数据的函数 def inputQ(queue): info = str(os.getpid()) + ‘(put):‘ + str(time.asctime()) queue.put(info) # 向queue中输出数据的函数 def outputQ(queue): info = queue.get() print (str(os.getpid()), ‘(get):‘,info) # Main if __name__ == ‘__main__‘: multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 输入进程 for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # 输出进程 for i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join()
进程间的数据共享主要运用在生产者消费者模型之间,新建一个队列,多个生产者可以往队列中放数据,消费者直接从队列中取出数据进行处理,队列存在于内存之中,直接读取。
标签:基于 为我 大量 app 不可 实现 技术 管道 def
原文地址:https://www.cnblogs.com/jimmyhe/p/10857021.html