标签:current 独立 RKE upd while 目的 更新 其他 before
内容目录:
- multiprocessing.Queue()
- JoinableQueue
- 进程间的信号传递 Event
- 控制对资源的访问 Lock
- 同步操作 Condition
- 控制对资源的并发访问 Semaphore
- 管理共享状态 Manager
- 共享命名空间 mgr.Namespace()
- 进程池 multiprocessing.Pool
和线程一样,多进程的一个常见的使用模式是将一个任务划分为几个worker,以便并行运行。有效地使用多进程通常需要它们之间的一些通信,这样工作就可以被分割,结果可以被聚合。一种简单方法是使用队列multiprocessing.Queue()
来回传递消息。任何可以用pickle序列化的对象都可以通过队列。
import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print(‘Doing something fancy in {} for {}!‘.format(
proc_name, self.name))
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == ‘__main__‘:
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass(‘Fancy Dan‘))
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
结果:当q是空的时候,q.get()会等。
Doing something fancy in Process-1 for Fancy Da
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
使用None这个特殊值来判断是否结束Worker
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print(‘{}: Exiting‘.format(proc_name))
self.task_queue.task_done()
break
# next_task是Task()的一个实例,打印next_task会输出__str__
print(‘{}: {}‘.format(proc_name, next_task))
# 执行next_task()会执行__call__
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
class Task:
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take time to do the work
return ‘{self.a} * {self.b} = {product}‘.format(
self=self, product=self.a * self.b)
def __str__(self):
return ‘{self.a} * {self.b}‘.format(self=self)
if __name__ == ‘__main__‘:
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print(‘Creating {} consumers‘.format(num_consumers))
consumers = [
Consumer(tasks, results)
for i in range(num_consumers)
]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in range(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print(‘Result:‘, result)
num_jobs -= 1
执行结果:
Creating 8 consumers
Consumer-4: 0 * 0
Consumer-1: 1 * 1
Consumer-2: 2 * 2
Consumer-4: 3 * 3
Consumer-1: 4 * 4
Consumer-2: 5 * 5
Consumer-1: 6 * 6
Consumer-6: 7 * 7
Consumer-4: 8 * 8
Consumer-2: 9 * 9
Consumer-1: Exiting
Consumer-4: Exiting
Consumer-6: Exiting
Consumer-2: Exiting
Consumer-5: Exiting
Consumer-8: Exiting
Consumer-3: Exiting
Consumer-7: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
Result: 4 * 4 = 16
Result: 3 * 3 = 9
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 8 * 8 = 64
Result: 7 * 7 = 49
Result: 9 * 9 = 81
Event类提供了一种简单的方法来在进程之间传递状态信息。
当wait()超时时,它返回时不会出现错误。调用者负责使用is_set()检查事件的状态
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print(‘wait_for_event: starting‘)
e.wait()
print(‘wait_for_event: e.is_set()->‘, e.is_set())
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print(‘wait_for_event_timeout: starting‘)
e.wait(t)
print(‘wait_for_event_timeout: e.is_set()->‘, e.is_set())
if __name__ == ‘__main__‘:
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name=‘block‘,
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name=‘nonblock‘,
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print(‘main: waiting before calling Event.set()‘)
time.sleep(3)
e.set()
print(‘main: event is set‘)
执行结果:
main: waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
main: event is set
wait_for_event: e.is_set()-> True
在需要在多个进程之间共享单个资源的情况下,可以使用锁来避免冲突的访问。
import multiprocessing
import sys
def worker_with(lock):
with lock:
sys.stdout.write(‘Lock acquired via with\n‘)
def worker_no_with(lock):
lock.acquire()
try:
sys.stdout.write(‘Lock acquired directly\n‘)
finally:
lock.release()
if __name__ == ‘__main__‘:
lock = multiprocessing.Lock()
w = multiprocessing.Process(
target=worker_with,
args=(lock,),
)
nw = multiprocessing.Process(
target=worker_no_with,
args=(lock,),
)
w.start()
nw.start()
w.join()
nw.join()
运行结果:
Lock acquired via with
Lock acquired directly
cond.wait()等着,cond.notify_all()通知可以往下运行了
import multiprocessing
import time
def stage_1(cond):
"""perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process().name
print(‘Starting‘, name)
with cond:
print(‘{} done and ready for stage 2‘.format(name))
cond.notify_all()
def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print(‘Starting‘, name)
with cond:
cond.wait()
print(‘{} running‘.format(name))
if __name__ == ‘__main__‘:
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name=‘s1‘,
target=stage_1,
args=(condition,))
s2_clients = [
multiprocessing.Process(
name=‘stage_2[{}]‘.format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
运行结果:在这个例子中,两个进程并行地运行第二阶段的工作,但是只有在第一个阶段完成之后。
Starting stage_2[1]
Starting stage_2[2]
Starting s1
s1 done and ready for stage 2
stage_2[1] running
stage_2[2] running
有时,允许多个worker一次访问一个资源是很有用的,但要限制了数量。
import multiprocessing
import time
def worker(s, i):
s.acquire()
print(multiprocessing.current_process().name + "acquire");
time.sleep(i)
print(multiprocessing.current_process().name + "release\n");
s.release()
if __name__ == "__main__":
s = multiprocessing.Semaphore(2)
for i in range(5):
p = multiprocessing.Process(target = worker, args=(s, i*2))
p.start()
运行结果:
Process-2acquire
Process-3acquire
Process-2release
Process-4acquire
Process-3release
Process-1acquire
Process-1release
Process-5acquire
Process-4release
Process-5release
通过Manager共享信息,所有进程都能看得到。
import multiprocessing
import pprint
def worker(d, key, value):
d[key] = value
if __name__ == ‘__main__‘:
mgr = multiprocessing.Manager()
d = mgr.dict()
jobs = [
multiprocessing.Process(
target=worker,
args=(d, i, i * 2),
)
for i in range(10)
]
for j in jobs:
j.start()
for j in jobs:
j.join()
print(‘Results:‘, d
运行结果:通过Manager创建列表,它是共享的,并且在所有进程中都可以看到更新。字典也支持。
Results: {0: 0, 2: 4, 3: 6, 1: 2, 4: 8, 6: 12, 5: 10, 7: 14, 8: 16, 9: 18}
除了字典和列表之外,管理者还可以创建一个共享的名称空间。
import multiprocessing
def producer(ns, event):
ns.value = ‘This is the value‘
event.set()
def consumer(ns, event):
try:
print(‘Before event: {}‘.format(ns.value))
except Exception as err:
print(‘Before event, error:‘, str(err))
event.wait()
print(‘After event:‘, ns.value)
if __name__ == ‘__main__‘:
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
event = multiprocessing.Event()
p = multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c = multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
运行结果:可以看到在另一个进程中可以对mgr.Namespace()进行复制,其他进程可以访问。
Before event, error: ‘Namespace‘ object has no attribute ‘value‘
After event: This is the value
重要的是要知道mgr.Namespace()中可变值的内容的更新不会自动传播。
import multiprocessing
def producer(ns, event):
# DOES NOT UPDATE GLOBAL VALUE!
ns.my_list.append(‘This is the value‘)
event.set()
def consumer(ns, event):
print(‘Before event:‘, ns.my_list)
event.wait()
print(‘After event :‘, ns.my_list)
if __name__ == ‘__main__‘:
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
namespace.my_list = []
event = multiprocessing.Event()
p = multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c = multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
运行结果:
Before event: []
After event : []
池类可用于管理固定数量的worker,用于简单的工作,在这些情况下,可以将工作分解并独立地分配给worker。
import multiprocessing
def do_calculation(data):
return data * 2
def start_process():
print(‘Starting‘, multiprocessing.current_process().name)
if __name__ == ‘__main__‘:
inputs = list(range(10))
print(‘Input :‘, inputs)
builtin_outputs = map(do_calculation, inputs)
print(‘Built-in:‘, builtin_outputs)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks
print(‘Pool :‘, pool_outputs)
运行结果:进程的返回值被收集并作为一个列表返回。
Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x000000000256A080>
Starting SpawnPoolWorker-2
Starting SpawnPoolWorker-3
Starting SpawnPoolWorker-4
Starting SpawnPoolWorker-1
Starting SpawnPoolWorker-6
Starting SpawnPoolWorker-5
Starting SpawnPoolWorker-7
Starting SpawnPoolWorker-8
Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
在默认情况下,池会创建固定数量的worker进程,并将作业传递给他们,直到没有更多的工作。设置maxtasksperchild参数告诉池在完成了几个任务后重新启动worker进程,防止长时间运行的worker消耗更多的系统资源。
import multiprocessing
def do_calculation(data):
return data * 2
def start_process():
print(‘Starting‘, multiprocessing.current_process().name)
if __name__ == ‘__main__‘:
inputs = list(range(10))
print(‘Input :‘, inputs)
builtin_outputs = map(do_calculation, inputs)
print(‘Built-in:‘, builtin_outputs)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
maxtasksperchild=2,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks
print(‘Pool :‘, pool_outputs)
运行结果:当工人完成分配的任务时,即使没有更多的工作,他们也会重新开始工作。在这个输出中,有9个worker被创建,尽管只有10个任务,有的worker一次可以完成其中的两个任务。
Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x00000000025CA080>
Starting SpawnPoolWorker-4
Starting SpawnPoolWorker-2
Starting SpawnPoolWorker-1
Starting SpawnPoolWorker-5
Starting SpawnPoolWorker-3
Starting SpawnPoolWorker-8
Starting SpawnPoolWorker-6
Starting SpawnPoolWorker-7
Starting SpawnPoolWorker-9
Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
标签:current 独立 RKE upd while 目的 更新 其他 before
原文地址:https://www.cnblogs.com/zhang-anan/p/8996529.html