运行错误:_pickle.PicklingError: Can‘t pickle <function <lambda> at 0x000002BAAEF12F28>: attribute lookup <lambda> on __main__ failed
代码如下:
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 4 import random, time, queue 5 from multiprocessing.managers import BaseManager 6 7 # 发送任务的队列: 8 task_queue = queue.Queue() 9 # 接收结果的队列: 10 result_queue = queue.Queue() 11 12 # 从BaseManager继承的QueueManager: 13 class QueueManager(BaseManager): 14 pass 15 16 # 把两个Queue都注册到网络上, callable参数关联了Queue对象: 17 QueueManager.register(‘get_task_queue‘, callable=lambda: task_queue) 18 QueueManager.register(‘get_result_queue‘, callable=lambda: result_queue) 19 20 # 绑定端口5000, 设置验证码‘abc‘: 21 manager = QueueManager(address=(‘‘, 5000), authkey=b‘abc‘) 22 23 # 启动Queue: 24 manager.start() 25 26 # 获得通过网络访问的Queue对象: 27 task = manager.get_task_queue() 28 result = manager.get_result_queue() 29 30 # 放几个任务进去: 31 for i in range(10): 32 n = random.randint(0, 10000) 33 print(‘Put task %d...‘ % n) 34 task.put(n) 35 36 # 从result队列读取结果: 37 print(‘Try get results...‘) 38 for i in range(10): 39 r = result.get(timeout=10) 40 print(‘Result: %s‘ % r) 41 42 # 关闭: 43 manager.shutdown() 44 print(‘master exit.‘)
报错信息:
1 Traceback (most recent call last): 2 File "task_master.py", line 22, in <module> 3 manager.start() 4 File "E:\Anaconda\Anaconda3\lib\multiprocessing\managers.py", line 513, in start 5 self._process.start() 6 File "E:\Anaconda\Anaconda3\lib\multiprocessing\process.py", line 105, in start 7 self._popen = self._Popen(self) 8 File "E:\Anaconda\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen 9 return Popen(process_obj) 10 File "E:\Anaconda\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__ 11 reduction.dump(process_obj, to_child) 12 File "E:\Anaconda\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump 13 ForkingPickler(file, protocol).dump(obj) 14 _pickle.PicklingError: Can‘t pickle <function <lambda> at 0x000002BAAEF12F28>: attribute lookup <lambda> on __main__ failed
错误原因:pickle模块不能序列化lambda,需要自定义函数
修改代码如下:
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 4 import random, time, queue 5 from multiprocessing.managers import BaseManager 6 7 # 发送任务的队列: 8 task_queue = queue.Queue() 9 # 接收结果的队列: 10 result_queue = queue.Queue() 11 12 # 自定义函数re_task_queue 13 def re_task_queue(): 14 global task_queue 15 return task_queue 16 17 # 自定义函数re_result_queue 18 def re_result_queue(): 19 global result_queue 20 return result_queue 21 22 # 从BaseManager继承的QueueManager: 23 class QueueManager(BaseManager): 24 pass 25 26 if __name__ == ‘__main__‘: 27 28 # 把两个Queue都注册到网络上, callable参数关联了Queue对象: 29 QueueManager.register(‘get_task_queue‘, callable=re_task_queue) 30 QueueManager.register(‘get_result_queue‘, callable=re_result_queue) 31 32 # 绑定端口5000, 设置验证码‘abc‘: 33 manager = QueueManager(address=(‘127.0.0.1‘, 5000), authkey=b‘abc‘) 34 35 # 启动Queue: 36 manager.start() 37 38 # 获得通过网络访问的Queue对象: 39 task = manager.get_task_queue() 40 result = manager.get_result_queue() 41 42 # 放几个任务进去: 43 for i in range(10): 44 n = random.randint(0, 10000) 45 print(‘Put task %d...‘ % n) 46 task.put(n) 47 48 # 从result队列读取结果: 49 print(‘Try get results...‘) 50 for i in range(10): 51 r = result.get(timeout=10) 52 print(‘Result: %s‘ % r) 53 54 # 关闭: 55 manager.shutdown() 56 print(‘master exit.‘)
运行结果:
C:\Users\Lucky丶M\python>python task_master.py Put task 4962... Put task 3460... Put task 4774... Put task 4301... Put task 9120... Put task 7183... Put task 4915... Put task 3173... Put task 9138... Put task 5798... Try get results...