标签:pool 允许 start sel 管理 ict sem cti server进程
之前实现的数据共享的方式只有两种结构Value和Array。Python中提供了强大的Manager专门用来做数据共享的,Manager是进程间数据共享的高级接口。 Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。Manager支持的类型有list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value和Array。
下面看一个用于多进程共享dict和list数据:
from multiprocessing import Manager, Process def worker(dt, lt): for i in range(10): dt[i] = i*i # 访问共享数据 lt += [x for x in range(11, 16)] if __name__ == ‘__main__‘: manager = Manager() dt = manager.dict() lt = manager.list() p = Process(target=worker, args=(dt, lt)) p.start() p.join(timeout=3) print(dt) print(lt)
结果: {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81} [11, 12, 13, 14, 15]
当使用Process类管理非常多(几十上百个)的进程时,就会显得比较繁琐,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。
import time import multiprocessing def fun(msg): print("#########start#### {0}".format(msg)) time.sleep(3) print("#########end###### {0}".format(msg)) if __name__ == ‘__main__‘: print("start main") pool = multiprocessing.Pool(processes=3) #在进程池中创建3个进程 for i in range(1, 7): msg = "hello {0}".format(i) pool.apply_async(fun, (msg,))# 执行时间6s+ # pool.apply(fun, (msg,)) #执行时间 6*3=18s+ pool.close() #在调用join之前,要先调用close,否则会报错,close执行完不会有新的进程加入到pool pool.join() #join 是等待所有的子进程结束,必须在close之后调用 print("end main") 结果: start main start hello 1 start hello 2 start hello 3 end hello 1 start hello 4 end hello 2 start hello 5 end hello 3 start hello 6 end hello 4 end hello 5 end hello 6 end main 6.18000006676 # 使用pool.apply_async()所花费时间 18.2129998207 # 使用pool.apply()所花费时间
pool.apply_async 非阻塞,定义的进程池最大数的同时执行
pool.apply 一个进程结束,释放回进程池,开始下一个进程
从上面的结果中我们可以看到,使用pool.apply_async所花费的时间较少。
Python中提供了threading模块来对多线程进行操作,线程是应用程序中工作的最小单元,多线程适用于密集型io。
多线程的实现方式有两种:
方法一:将要执行的方法作为参数传给Thread的构造方法(和多进程类似)
t = threading.Thread(target=action, args=(i,))
方法二:从Thread类继承,并重写run()方法。
方法一示例:
import threading def worker(args): print("开始子进程 {0}".format(args)) print("结束子进程 {0}".format(args)) if __name__ == ‘__main__‘: print("start main") t1 = threading.Thread(target=worker, args=(1,)) # 和多进程类似 t2 = threading.Thread(target=worker, args=(2,)) t1.start() t2.start() print("end main") 结果: start main 开始子进程 1 结束子进程 1 开始子进程 2 end main 结束子进程 2
方法二示例:
import threading import time class Hello(threading.Thread):
# 对run()方法进行重写 def __init__(self, args): super(Hello, self).__init__() self.args = args def run(self): print("开始子进程 {0}".format(self.args)) time.sleep(1) print("结束子进程 {0}".format(self.args)) if __name__ == ‘__main__‘: a = 1 print("start main") t1 = Hello(1) t2 = Hello(2) t1.start() t2.start() print("end main") 结果: start main 开始子进程 1 开始子进程 2 end main 结束子进程 2 结束子进程 1
当多线程争夺锁时,允许第一个获得锁的线程进入临街区,并执行代码。所有之后到达的线程将被阻塞,直到第一个线程执行结束,退出临街区,并释放锁。
需要注意,那些阻塞的线程是没有顺序的。
import time import threading def sub(): global count lock.acquire() #上锁,第一个线程如果申请到锁,会在执行公共数据的过程中持续阻塞后续线程 #即后续第二个或其他线程依次来了发现已经被上锁,只能等待第一个线程释放锁 #当第一个线程将锁释放,后续的线程会进行争抢 ‘‘‘线程的公共数据‘‘‘ temp=count print("temp is {0}".format(temp)) time.sleep(0.001) count=temp+1 print("count is {0}".format(count)) ‘‘‘线程的公共数据‘‘‘ lock.release() # 释放锁 time.sleep(2) count=0 l=[] lock=threading.Lock() for i in range(100): t=threading.Thread(target=sub,args=()) t.start() l.append(t) for t in l: t.join() print(count) 结果: 100
采用线程池,花费的时间更少。
语法结构示例:
import threadpool def ThreadFun(arg1,arg2): pass def main(): device_list=[object1,object2,object3......,objectn]#需要处理的设备个数 task_pool=threadpool.ThreadPool(8)#8是线程池中线程的个数 request_list=[]#存放任务列表 #首先构造任务列表 for device in device_list: request_list.append(threadpool.makeRequests(ThreadFun,[((device, ), {})])) #将每个任务放到线程池中,等待线程池中线程各自读取任务,然后进行处理,使用了map函数,不了解的可以去了解一下。 map(task_pool.putRequest,request_list) #等待所有任务处理完成,则返回,如果没有处理完,则一直阻塞 task_pool.poll() if __name__=="__main__": main()
pip install threadpool # 安装threadpool
from threadpool import * #导入模块
pool = ThreadPool(size) # 定义一个线程池,最多能创建size个线程
requests = makeRequests() # 调用makeRequests创建了要开启多线程的函数,以及函数相关参数和回调函数,其中回调函数可以不写,default是无,也就是说makeRequests只需要2个参数就可以运行;
[pool.putRequest(req) for req in requests] # 将所有要运行多线程的请求扔进线程池
pool.wait() # 等待所有的线程完成工作后退出。
[pool.putRequest(req) for req in requests]等同于:
for req in requests:
pool.putRequest(req)
例子:
import threadpool def hello(m, n, o): print("m = {0} n={1} o={2}".format(m, n, o)) if __name__ == ‘__main__‘: # 方法1 lst_vars_1 = [‘1‘, ‘2‘, ‘3‘] # 需要处理的线程个数 lst_vars_2 = [‘4‘, ‘5‘, ‘6‘] func_var = [(lst_vars_1, None), (lst_vars_2, None)] # 方法2 # dict_vars_1 = {‘m‘: ‘1‘, ‘n‘: ‘2‘, ‘o‘: ‘3‘} # dict_vars_2 = {‘m‘: ‘4‘, ‘n‘: ‘5‘, ‘o‘: ‘6‘} # func_var = [(None, dict_vars_1), (None, dict_vars_2)] pool = threadpool.ThreadPool(2) # 线程池中的线程个数 requests = threadpool.makeRequests(hello, func_var) # 创建了要开启多线程的函数,创建需要线程池处理的任务,只需要两个参数 [pool.putRequest(req) for req in requests] # 将每个任务放入到线程池中 pool.wait() 结果: m = 1 n=2 o=3 m = 4 n=5 o=6
标签:pool 允许 start sel 管理 ict sem cti server进程
原文地址:https://www.cnblogs.com/yangjian319/p/9079325.html