标签:import multi iter one 自动 chunk 子进程 sel app
python的multiprocessing模块是跨平台的多进程模块,multiprocessing具有创建子进程,进程间通信,队列,事件,锁等功能,multiprocessing模块包含Process,Queue,Pipe,Lock等多个组件。创建进程的类
Process([group [, target [, name [, args [, kwargs]]]]])
参数介绍:
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=()
kwargs表示调用对象的字典,kwargs={‘key‘:‘value‘}
name为子进程的名称
Note:需要使用关键字方式指定参数
from multiprocessing import Process
def func():
print("first process")
if __name__ == ‘__main__‘:
# 创建进程对象,主进程和子进程是异步执行的
p = Process(target=func)
# 开启进程
p.start()
from multiprocessing import Process
def func(*args,**kwargs):
print("IPADDR:%s PORT:%d"%args)
for k in kwargs:
print("%s --> %s"%(k,kwargs[k]))
if __name__ == ‘__main__‘:
# 创建进程对象,并传递参数
p = Process(target=func,args=(‘127.0.0.1‘,8080),kwargs={‘key‘:‘value‘})
# 如果主进程中的代码已经结束了,子进程还没结束,主进程会等待子进程
# 开启进程
p.start()
import os
from multiprocessing import Process
def func():
# os模块的getpid方法可以获取当前进程的pid,getppid方法可以获取当前进程的父进程的pid
print("子进程pid:%s,父进程pid:%s"%(os.getpid(),os.getppid()))
if __name__ == ‘__main__‘:
p_l = []
# 创建多个进程
for i in range(10):
p = Process(target=func)
p.start()
p_l.append(p)
# 异步执行子进程,最后执行主进程中的代码
for p in p_l:
p.join() # 阻塞,使主进程等待子进程结束
print("------主进程------")
结果:
子进程pid:9944,父进程pid:1484
子进程pid:8932,父进程pid:1484
子进程pid:8504,父进程pid:1484
子进程pid:14884,父进程pid:1484
子进程pid:4828,父进程pid:1484
子进程pid:14644,父进程pid:1484
子进程pid:14908,父进程pid:1484
子进程pid:1980,父进程pid:1484
子进程pid:14604,父进程pid:1484
子进程pid:10008,父进程pid:1484
------主进程------
Note :因为在windows操作系统中,没有fork(),在创建子进程的时候会自动运行启动它的文件中的所有代码,因此必须将创建子进程的语句写在ifname==‘main‘:条件语句下。
import os
from multiprocessing import Process
class MyProcess(Process): # 必须继承Process类
def __init__(self,arg1,arg2,arg3):
‘‘‘
继承父类的初始化方法,加上自己需要的参数
:param arg1:
:param arg2:
:param arg3:
‘‘‘
super().__init__()
self.arg1 = arg1
self.arg2 = arg2
self.arg3 = arg3
def run(self):
‘‘‘
必须要有run方法的实现
:return:
‘‘‘
print(‘子进程:%d ,父进程:%s ‘%(os.getpid(),os.getppid()),self.arg1,self.arg2,self.arg3)
self.walk() # walk方法在子进程中执行
def walk(self):
print(‘子进程:%d‘%os.getpid())
if __name__ == ‘__main__‘:
p = MyProcess(1,2,3)
p.start() # 会默认调用run方法
p.walk() # walk方法直接在主进程中调用,并没有在子进程中执行
print(‘父进程:%d ‘%os.getpid())
结果:
子进程:1220
父进程:1220
子进程:2164 ,父进程:1220 1 2 3
子进程:2164
在为开启daemon前,主进程会等待子进程结束在结束;
开启daemon后,程序会在主进程结束时结束子进程
import time
from multiprocessing import Process
def cal_time(second):
while True:
print("current time:%s"%time.ctime())
time.sleep(second)
if __name__ == ‘__main__‘:
p = Process(target=cal_time,args=(1,))
‘‘‘
守护进程的作用:会随着主进程代码执行结束而结束
守护进程要在start前设置
守护进程中不能再开启子进程
‘‘‘
p.daemon = True
p.start()
for i in range(10):
time.sleep(0.2)
print(‘*‘*i)
未开启daemon结果:子进程一直在运行
current time:Tue Feb 12 17:48:44 2019
*
**
***
****
current time:Tue Feb 12 17:48:45 2019
*****
******
*******
********
*********
current time:Tue Feb 12 17:48:46 2019
current time:Tue Feb 12 17:48:47 2019
current time:Tue Feb 12 17:48:48 2019
current time:Tue Feb 12 17:48:49 2019
开启daemon后结果:主进程结束程序就结束了
current time:Tue Feb 12 17:49:14 2019
*
**
***
****
current time:Tue Feb 12 17:49:15 2019
*****
******
*******
********
*********
name:查看进程名
pid:查看进程id
is_alive:查看进程是否正在运行
terminate:结束进程
import time
from multiprocessing import Process
def func():
print("start")
time.sleep(3)
print("end")
if __name__ == ‘__main__‘:
p = Process(target=func)
p.start()
time.sleep(3)
print("进程名:%s,进程id:%s"%(p.name,p.pid))
# is_alive方法查看进程是否正在运行
print(p.is_alive())
# terminate方法结束进程
p.terminate()
time.sleep(3)
print(p.is_alive())
结果:
start
进程名:Process-1,进程id:17564
True
False
进程锁:当多个进程访问共享资源时,进程锁保证同一时间只能有一个任务可以进行修改,程序的运行方法有并发改为串行,这样速度慢了,但是保证了数据的安全
import os
import time
import random
from multiprocessing import Process,Lock
def func(lock,n):
lock.acquire() #加锁
print(‘%s: %s is running‘ % (n, os.getpid()))
time.sleep(random.random())
print(‘%s: %s is done‘ % (n, os.getpid()))
lock.release() #释放
if __name__ == ‘__main__‘:
lock=Lock()
for i in range(3):
p=Process(target=func,args=(lock,i))
p.start()
信号量:Lock(锁)可以保证同一时间只能有一个任务对共享数据进行操作,而Semaphore(信号量)可以在同一时间让指定数量的进程操作共享数据。
import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore
‘‘‘
迷你唱吧,20个人,同一时间只能有4个人进去
‘‘‘
def sing(i,sem):
sem.acquire() # 加锁
print(‘%s enter the ktv‘%i)
time.sleep(random.randint(1,10))
print(‘%s leave the ktv‘%i)
sem.release() # 释放
if __name__ == ‘__main__‘:
sem = Semaphore(4)
for i in range(20):
p = Process(target=sing,args=(i,sem))
p.start()
事件:Event是进程之间的状态标记通信,因为进程不共享数据,所以事件对象需要以参数形式传递到函数中使用。
e = Event() # 实例化一个事件对象
e.set() # 标记变为非阻塞
e.wait() # 默认标记为阻塞,在等待的过程中,遇到非阻塞信号就继续执行
e.clear() # 标记变为阻塞
e.is_set() # 是否阻塞 True就是非阻塞,False是阻塞
import time
import random
from multiprocessing import Event
from multiprocessing import Process
def traffic_light(e):
while True:
if e.is_set(): # True为绿灯
time.sleep(3) # 等3秒后变为红灯
print("红灯亮")
e.clear()
else: # False为红灯,等3秒后变为绿灯
time.sleep(3)
print("绿灯亮")
e.set()
def car(i,e):
e.wait() # 默认是红灯
print("%s 车通过"%i)
if __name__ == ‘__main__‘:
e = Event()
# 控制红绿灯的进程
tra = Process(target=traffic_light,args=(e,))
tra.start()
for i in range(100):
if i%6 == 0:
time.sleep(random.randint(1,3))
p = Process(target=car,args=(i,e))
p.start()
管道是进程间通信(IPC)的一种,管道是双向通信的,但它不保证数据安全
创建管道:p1,p2=Pipe()
send():发送数据
recv():接收数据
close():关闭
def func(p):
foo,son = p
foo.close() # 不使用主进程的管道一端,先行关闭
while True:
try:
print(son.recv())
# 子进程在结束数据时,如果管道无数据,且对端没有close,就会报EOFError;如果管道无数据,对端没close,进程会阻塞
except EOFError:
break
if __name__ == ‘__main__‘:
foo,son = Pipe()
p = Process(target=func,args=((foo,son),))
p.start()
son.close() # 不使用子进程的管道一端,先行关闭
foo.send("hello")
foo.send("hello")
foo.close()
队列:进程之间是独立的,要实现进程间通信(IPC);multiprocessing模块支持两种形式:队列(queue)和管道(pipe),这两种方式都是使用消息传递的,且都是双向通信的,Queue = Pipe+Lock。
q = Queue() # 创建队列对象,无长度限制
q1 = Queue(3) # 传参数,创建一个有最大长度限制的队列
q.put(1) # 放入一个数据,对于无长度限制的队列来说,永不阻塞;对于有长度限制的队列来说,放满就阻塞
q.get() # 队列中有数据就取出一个数据,队列中无数据就会阻塞;遵循先进先出原则
q.qsize() # 查看队列的数据大小,不一定准确
from multiprocessing import Process
from multiprocessing import Queue
def queue_put(q):
q.put("123") # 子进程队列中放入一个变量
if __name__ == ‘__main__‘:
q = Queue()
p = Process(target=queue_put,args=(q,))
p.start()
print(q.get()) # 主进程获取到变量
示例2:子进程与子进程之间的通信
from multiprocessing import Process
from multiprocessing import Queue
def queue_put(q):
q.put("123") # 子进程队列中放入一个变量
def queue_get(q):
print(q.get()) # 另一个子进程获取到队列中的数据
if __name__ == ‘__main__‘:
q = Queue()
p = Process(target=queue_put,args=(q,))
p.start()
p1 = Process(target=queue_get,args=(q,))
p1.start()
JoinableQueue也是multiprocessing模块的一种队列的实现,但它与Queue不同的是JoinableQueue允许项目的使用者通知生成者项目已经被成功处理。创建方式同Queue。
主要方法:put与get与Queue一致
? ? q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
? ? q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
import time
import random
from multiprocessing import JoinableQueue
from multiprocessing import Process
‘‘‘
程序执行流程
1、生产者生产的数据全部被消费 --> 2、生产者进程结束 --> 3、主进程代码执行结束 --> 4、消费者守护进程结束
‘‘‘
def producer(q,food):
for i in range(5):
q.put("%s -- %s"%(i,food))
print("生产了 %s"%(food))
time.sleep(random.random())
q.join() # 2、等待消费者消费完所有数据
def consumer(q,name):
while True:
food = q.get()
if food == None:break
print("%s 吃了 %s"%(name,food))
q.task_done() # 1、消费者每消费一个数据就返回一个task_done给生产者
if __name__ == ‘__main__‘:
q = JoinableQueue()
p1 = Process(target=producer,args=(q,‘youtiao‘))
p1.start()
p2 = Process(target=producer,args=(q,‘baozi‘))
p2.start()
c1 = Process(target=consumer,args=(q,‘daxiong‘))
c1.daemon = True # 4、消费者守护进程结束
c1.start()
c2 = Process(target=consumer,args=(q,‘chenglei‘))
c2.daemon = True
c2.start()
c3 = Process(target=consumer,args=(q,‘niu‘))
c3.daemon = True
c3.start()
p1.join() # 3、等待p1执行完毕
p2.join() # 3、等待p2执行完毕
Manager也是multiprocessing模块的一个类,这个类主要提供了进程间通信(IPC)的一个机制,它支持Python所有的数据类型,但不提供数据安全的机制。
from multiprocessing import Manager
from multiprocessing import Process
def func(d):
print(d)
d[‘num‘] -= 10
if __name__ == ‘__main__‘:
m = Manager()
d = m.dict({‘num‘:100})
l = []
for i in range(10):
p = Process(target=func,args=(d,))
p.start()
# p.join() # 同步
l.append(p)
for j in l:
j.join() # 异步
结果:
{‘num‘: 100}
{‘num‘: 90}
{‘num‘: 80}
{‘num‘: 70}
{‘num‘: 60}
{‘num‘: 50}
{‘num‘: 40}
{‘num‘: 30}
{‘num‘: 20}
{‘num‘: 10}
在执行大量并发任务时,多进程是行之有效的手段之一,但是多进程需要注意几个问题,一是操作系统不可能无限开启进程,一般是有几个核开启几个进程,二是开启进程过多,系统资源占用过多,会导致系统运行速度变慢;那么遇到这种情况时pool(进程池)便是最好的解决方案。
Pool可以指定开启一定数量的进程(一般为CPU核数+1个)等待用户使用,当有新的请求进入时,如果池中有空闲进程,便直接开启;如果池中的进程都在使用,那么该请求就会等待,直到池中有进程结束,重用该进程。
import time
from multiprocessing import Process
from multiprocessing import Pool
def func(i):
i -= 1
if __name__ == ‘__main__‘:
# 计算进程池所需事件
start1_time = time.time() # 开始时间
p = Pool(5) # 进程池中创建5个进程
p.map(func,range(100)) # 调用进程执行任务,target = func args = (1,2,3...),第二个参数要是可迭代对象
p.close() # 不允许再向进程池中添加任务
p.join() # 等待进程池中所有进程执行结束
stop1_time = time.time() - start1_time # 结束时间
print("进程池所需时间: %s "%stop1_time)
# 计算多进程所需时间
start2_time = time.time() # 开始时间
l = []
for i in range(100):
p1 = Process(target=func,args=(i,))
p1.start()
l.append(p)
for j in l:
j.join()
stop2_time = time.time() - start2_time
print("多进程所需时间: %s"%stop2_time)
结果:
进程池所需时间: 0.19990277290344238
多进程所需时间: 1.7190303802490234
由上可知,进程池在执行大量并发任务时的效率。
map(self, func, iterable, chunksize=None):将func
应用于iterable
中的每个元素,收集结果在返回的列表中。
map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):异步的map
apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):异步提交任务的机制
apply(self, func, args=(), kwds={}):同步提交任务的机制
close():不允许再提交新的任务
join():等待进程池中的进程执行结束在往下执行,此方法只能在close()或teminate()之后调用
执行apply或apply_async方法时,会返回ApplyResult类的实例对象
ApplyResult类有以下方法:
obj.get():获取进程的返回值
obj.ready():调用完成时,返回True
obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发ValueError异常
obj.wait([timeout]):等待结果变为可用
import time
from multiprocessing import Pool
‘‘‘
apply:同步提交任务的机制
apply_async:异步提交任务机制
‘‘‘
def func(i):
time.sleep(1)
i += 1
print(i)
if __name__ == ‘__main__‘:
p = Pool(5)
res_l = []
for i in range(20):
# p.apply(func,args=(i,)) # 同步,执行完毕立即获取到返回值
res = p.apply_async(func,args=(i,)) # 异步,通过get获取返回值
res_l.append(res)
p.close() # 不允许再提交新的任务
p.join() # 等待进程池中的进程执行结束在往下执行
for res in res_l:
print(res.get()) # 使用get来获取apply_aync的结果
在进程池中,一个进程任务结束就会返回一个结果,主进程则调用一个函数去处理这个结果,这就是回调函数。回调函数是在主进程中完成的,不能传参数,只能接受多进程中函数的返回值;
在爬虫中,使用回调比较多,爬虫将访问网页、下载网页的过程放到子进程中去做,分析数据,处理数据让回调函数去做,因为访问网页与下载网页有网络延时,而处理数据只占用很小的时间
import requests
from multiprocessing import Pool
def get(url):
ret = requests.get(url)
return {‘url‘:url,
‘status_code‘:ret.status_code,
‘content‘:ret.text}
def parser(dic):
print(dic[‘url‘],len(dic[‘content‘]))
parse_url = "URL:%s Size:%s"%(dic[‘url‘],len(dic[‘content‘]))
with open(‘url.txt‘,‘a‘) as f:
f.write(parse_url+‘\n‘)
if __name__ == ‘__main__‘:
url_l = [
‘http://www.baidu.com‘,
‘http://www.google.com‘,
‘https://zh.wikipedia.org/wiki/Wikipedia:%E9%A6%96%E9%A1%B5‘,
‘https://www.youtube.com/?app=desktop‘,
‘https://www.facebook.com/‘
]
p = Pool(5)
for i in url_l:
p.apply_async(get,args=(i,),callback=parser)
p.close()
p.join()
标签:import multi iter one 自动 chunk 子进程 sel app
原文地址:http://blog.51cto.com/jiayimeng/2350624