如果我们新建少量进程,可以如下: import multiprocessing import time def func(msg): for i in xrange(3): print msg time.sleep(1) if __name__ == "__main__": p = multiprocessing.Process(target=func, args=("hello", )) p.start() p.join() print "Sub-process done."
二、进程池操作函数详细
2.1 类multiprocessing.Pool
class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
2). map(func, iterable[, chunksize]) A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
3). map_async(func, iterable[, chunksize[, callback]]) A variant of the map() method which returns a result object.
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.
4). imap(func, iterable[, chunksize]) An equivalent of itertools.imap().
The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.
Also if chunksize is 1 then the next() method of the iterator returned by the imap() method has an optional timeout parameter: next(timeout) will raise multiprocessing.TimeoutError if the result cannot be returned within timeout seconds.
5). imap_unordered(func, iterable[, chunksize]) The same as imap() except that the ordering of the results from the returned iterator should be considered arbitrary. (Only when there is only one worker process is the order guaranteed to be “correct”.)
The following example demonstrates the use of a pool:
1. 示例一
from multiprocessing import Pool
def f(x): return x*x
if __name__ == ‘__main__‘: pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10)) print it.next() # prints "0" print it.next() # prints "1" print it.next(timeout=1) # prints "4" unless your computer is *very* slow
import time result = pool.apply_async(time.sleep, (10,)) print result.get(timeout=1) # raises TimeoutError
import multiprocessing import time def func(msg): for i in xrange(3): print msg time.sleep(1) if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) for i in xrange(10): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) pool.close() pool.join() print "Sub-process(es) done."
3、使用Pool,并需要关注结果
更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果, 代码如下: import multiprocessing import time def func(msg): for i in xrange(3): print msg time.sleep(1) return "done " + msg if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) result = [] for i in xrange(10): msg = "hello %d" %(i) result.append(pool.apply_async(func, (msg, ))) pool.close() pool.join() for res in result: print res.get() print "Sub-process(es) done."
#!/usr/bin/env python #coding=utf-8 """ Author: Squall Last modified: 2011-10-18 16:50 Filename: pool.py Description: a simple sample for pool class """
from multiprocessing import Pool from time import sleep
def f(x): for i in range(10): print ‘%s --- %s ‘ % (i, x) sleep(1)
def main(): pool = Pool(processes=3) # set the processes max number 3 for i in range(11,20): result = pool.apply_async(f, (i,)) pool.close() pool.join() if result.successful(): print ‘successful‘
if __name__ == "__main__": main()
先创建容量为3的进程池,然后将f(i)依次传递给它,运行脚本后利用ps aux | grep pool.py查看进程情况, 会发现最多只会有三个进程执行。pool.apply_async()用来向进程池提交目标请求, pool.join()是用来等待进程池中的worker进程执行完毕,防止主进程在worker进程结束前结束。 但必pool.join()必须使用在pool.close()或者pool.terminate()之后。 其中close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。 result.successful()表示整个调用执行的状态,如果还有worker没有执行完,则会抛出AssertionError异常。