标签:限制 llb 实例 join() 阻塞 str RoCE process 总数
Multiprocessing.Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;
但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。
Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
(1)apply(func [,args [,kwds ] ] )
使用参数args和关键字参数kwds调用func。它会阻塞,直到结果准备就绪。鉴于此块,更适合并行执行工作。此外,func 仅在池中的一个工作程序中执行。
from multiprocessing import Pool import time def test(p): print(p) time.sleep(3) if __name__=="__main__": pool = Pool(processes=10) for i in range(500): ‘‘‘ (‘\n‘ ‘ (1)遍历500个可迭代对象,往进程池放一个子进程\n‘ ‘ (2)执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行。(同时只执行一个子进程)\n‘ ‘ for循环执行完毕,再执行print函数。\n‘ ‘ ‘) ‘‘‘ pool.apply(test, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程. print(‘test‘) pool.close() pool.join() ‘‘‘ 1 2 3 4 5 6 7 8 Process finished with exit code -1 ‘‘‘
for循环内执行的步骤顺序,往进程池中添加一个子进程,执行子进程,等待执行完毕再添加一个子进程……等500个子进程都执行完了,再执行print。(从结果来看,并没有多进程并发)
(2)apply_async(func [,args [,kwds [,callback [,error_callback ] ] ] ] )
异步进程池(非阻塞),返回结果对象的方法的变体。如果指定了回调,则它应该是可调用的,它接受单个参数。当结果变为就绪时,将对其应用回调,即除非调用失败,在这种情况下将应用error_callback。如果指定了error_callback,那么它应该是一个可调用的,它接受一个参数。如果目标函数失败,则使用异常实例调用error_callback。回调应立即完成,否则处理结果的线程将被阻止。
from multiprocessing import Pool import time def test(p): print(p) time.sleep(3) if __name__=="__main__": pool = Pool(processes=2) for i in range(500): ‘‘‘ (1)循环遍历,将500个子进程添加到进程池(相对父进程会阻塞)\n‘ (2)每次执行2个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)\n‘ ‘‘‘ pool.apply_async(test, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程. print(‘test‘) pool.close() pool.join() ‘‘‘ test 0 1 2 3 4 5 6 7 Process finished with exit code -1 ‘‘‘
调用join之前,先调用close或者terminate方法,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束。
(3)map(func,iterable [,chunksize ] )
map()内置函数的并行等价物(尽管它只支持一个可迭代的参数)。它会阻塞,直到结果准备就绪。此方法将iterable内的每一个对象作为单独的任务提交给进程池。可以通过将chunksize设置为正整数来指定这些块的(近似)大小。
from multiprocessing import Pool def test(i): print(i) if __name__ == "__main__": lists = [1, 2, 3] pool = Pool(processes=2) #定义最大的进程数 pool.map(test, lists) #p必须是一个可迭代变量。 pool.close() pool.join() ‘‘‘ 1 2 3 ‘‘‘
map()返回结果对象的方法的变体。需要传入可迭代对象iterable
from multiprocessing import Pool import time def test(p): print(p) time.sleep(3) if __name__=="__main__": pool = Pool(processes=2) # for i in range(500): # ‘‘‘ # (1)循环遍历,将500个子进程添加到进程池(相对父进程会阻塞)\n‘ # (2)每次执行2个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)\n‘ # ‘‘‘ # pool.apply_async(test, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程. pool.map_async(test, range(500)) print(‘test‘) pool.close() pool.join() ‘‘‘ test 0 63 1 64 2 65 3 66 Process finished with exit code -1 ‘‘‘
(5)imap(func,iterable [,chunksize ] )
返回迭代器,next()调用返回的迭代器的方法得到结果,imap()方法有一个可选的超时参数: next(timeout)将提高multiprocessing.TimeoutError如果结果不能内退回超时秒。
(6)close()
防止任何更多的任务被提交到池中。 一旦完成所有任务,工作进程将退出。
(7)terminate()
立即停止工作进程而不完成未完成的工作。当池对象被垃圾收集时,terminate()将立即调用。
(8)join()
等待工作进程退出。必须打电话close()或 terminate()使用之前join()。
from multiprocessing import Pool import time def f(x): return x*x if __name__ == ‘__main__‘: with Pool(processes=4) as pool: # start 4 worker processes result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]" it = pool.imap(f, range(10)) print(next(it)) # prints "0" print(next(it)) # prints "1" print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print(result.get(timeout=1)) # raises multiprocessing.TimeoutError ‘‘‘ 100 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 0 1 4 Traceback (most recent call last): File "C:/Users/BruceWong/Desktop/develop/multiprocessingpool.py", line 19, in <module> print(next(res)) TypeError: ‘MapResult‘ object is not an iterator Process finished with exit code 1 ‘‘‘
python3的multiprocessing多进程-Pool进程池模块
标签:限制 llb 实例 join() 阻塞 str RoCE process 总数
原文地址:https://www.cnblogs.com/lizm166/p/14658337.html