@@@文章内容参照老男孩教育 Alex金角大王,武Sir银角大王@@@
什么是线程(thread)
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.(线程是一个执行上下文,它是一个CPU用来执行指令流的所有信息)
Suppose you‘re reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.(假设你正在读一本书,现在你想休息一下,但是你想要回来,从你停下来的地方重新开始阅读。实现这一目标的一种方法是记下页码、行号和字号。所以你阅读一本书的执行环境是这三个数字)
If you have a roommate, and she‘s using the same technique, she can take the book while you‘re not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.(如果你有一个室友,她使用的是同样的方法,她可以在你不使用这本书的时候拿这本书,然后从她停止的地方继续阅读。然后你可以把它拿回来,然后从你所在的地方重新开始)
Threads work in the same way. A CPU is giving you the illusion that it‘s doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.(线程以相同的方式工作。一个CPU给你一个错觉,它同时在做多个计算。它通过在每次计算上花费一点时间来完成。它可以这样做,因为它对每个计算都有一个执行上下文。就像你可以和你的朋友分享一本书一样,许多任务可以共享一个CPU)
On a more technical level, an execution context (therefore a thread) consists of the values of the CPU‘s registers.(在更技术的层面上,一个执行上下文(因此是一个线程)由CPU的寄存器的值组成)
Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.(最后:线程与进程不同。线程是执行的上下文,而进程是与计算相关的一组资源。一个进程可以有一个或多个线程)
Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).(与进程相关的资源包括内存页面(进程中的所有线程都具有相同的内存视图)、文件描述符(例如,打开的套接字)和安全凭据(例如,启动该进程的用户的ID))
什么是进程(process)
An executing instance of a program is called a process.(一个程序的执行实例称为进程)
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.(每个进程都提供执行程序所需的资源。进程有一个虚拟地址空间、可执行代码、对系统对象的开放句柄、一个安全上下文、一个惟一的进程标识符、环境变量、一个优先级类、最小和最大工作集大小,以及至少一个执行线程。每个进程都由一个线程启动,这个线程通常被称为主线程,但是可以从它的任何线程中创建额外的线程)
程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自已独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。
进程与线程的区别
- Threads share the address space of the process that created it; processes have their own address space.(线程共享创建它的进程的地址空间;进程有自己的地址空间)
- Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.(线程可以直接访问其进程的数据段;进程有它们自己的父进程数据段的副本)
- Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.(线程可以直接与进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信)
- New threads are easily created; new processes require duplication of the parent process.(新线程很容易创建;新进程需要复制父进程)
- Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.(线程可以对同一进程的线程进行相当大的控制;进程只能对子进程执行控制)
- Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.(对主线程的更改(取消、优先级更改等)可能会影响该进程的其他线程的行为;对父进程的更改不会影响子进程)
Python GIL(Global Interpreter Lock)
CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.(CPython实现细节:在CPython中,由于全局解释器锁,只有一个线程可以同时执行Python代码(即使某些性能导向的库可能克服这个限制)。如果您希望您的应用程序更好地利用多核机器的计算资源,那么建议您使用多处理。但是,如果您想同时运行多个i/o绑定的任务,那么线程仍然是一个合适的模型)
Python threading模块
1、线程的2种调用方式
直接调用
1 import threading 2 import time 3 4 def run(num): # 定义每个线程要运行的函数 5 print(‘number:%s‘%num) 6 time.sleep(2) 7 8 if __name__ == ‘__main__‘: 9 t1 = threading.Thread(target=run ,args=(1,)) # 生成一个线程实例 10 t2 = threading.Thread(target=run ,args=(2,)) # 生成另一个线程实例 11 12 t1.start() # 启动线程 13 t2.start() # 启动另一个线程 14 15 print(t1.getName()) # 获取线程名 16 print(t2.getName())
继承式调用
1 import threading 2 import time 3 4 class MyThread(threading.Thread): 5 def __init__(self,num): 6 threading.Thread.__init__(self) 7 self.num = num 8 9 def run(self): 10 print(‘number:%s‘ %self.num) 11 time.sleep(2) 12 13 if __name__ == ‘__main__‘: 14 15 t1 = MyThread(1) 16 t2 = MyThread(2) 17 t1.start() 18 t2.start()
2、Join & Daemon
join()
在子线程完成运行之前,这个子线程的父线程将一直被阻塞
1 import threading 2 import time 3 4 def run(n): 5 time.sleep(2) 6 print(‘task done‘,n,threading.current_thread()) 7 8 start_time = time.time() 9 t_objs = [] # 存线程实例 10 for i in range(50): 11 t = threading.Thread(target=run ,args=(‘t-%s‘%i ,)) 12 t.start() 13 t_objs.append(t) # 为了不阻塞后面线程的启动,不在这里join,先放到一个列表里、 14 15 for t in t_objs: # 循环线程实例列表,等待所有线程执行完毕 16 t.join() 17 18 print(‘----- 所有线程完成 -----‘,threading.current_thread(),threading.active_count()) 19 print(‘cost:‘,time.time() - start_time)
setDaemon(True)
将线程声明为守护线程,必须在start()方法调用之前设置,如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们在程序运行中,执行一个主线程,如果方线程又创建一个子线程,主线程和子线程就会分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemon方法了
1 import threading 2 import time 3 4 def run(n): 5 time.sleep(2) 6 print(‘task done‘,n,threading.current_thread()) 7 8 start_time = time.time() 9 for i in range(50): 10 t = threading.Thread(target=run ,args=(‘t-%s‘%i ,)) 11 t.setDaemon(True) # 把当前线程设置为守护线程 12 t.start() 13 14 time.sleep(2) 15 print(‘----- all threads has finished -----‘,threading.current_thread(),threading.active_count()) 16 print(‘cost:‘,time.time() - start_time)
threading模块提供的其他方法
1 # threading.currentThread(): 返回当前的线程变量。 2 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 3 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。 4 # 除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法: 5 # run(): 用以表示线程活动的方法。 6 # start():启动线程活动。 7 # join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。 8 # isAlive(): 返回线程是否活动的。 9 # getName(): 返回线程名。 10 # setName(): 设置线程名。
3、同步锁(Lock)
1 import time 2 import threading 3 4 def addNum(): 5 global num # 在每个线程中都获取这个全局变量 6 # print(‘num:‘,num) 7 time.sleep(1) 8 num += 1 9 10 num = 0 # 设定一个共享变量 11 thread_list = [] 12 for i in range(100): 13 t = threading.Thread(target=addNum) 14 t.start() 15 thread_list.append(t) 16 17 for t in thread_list: # 等待所有线程执行完毕 18 t.join() 19 20 print(‘final num:‘,num)
多个线程都在同时操作同一个共享资源时,有可能造成资源破坏。为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁,这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。
加锁版
1 import time 2 import threading 3 4 def addNum(): 5 global num # 在每个线程中都获取这个全局变量 6 # print(‘num:‘,num) 7 time.sleep(1) 8 lock.acquire() # 修改数据前加锁 9 num += 1 10 lock.release() # 修改后释放 11 12 num = 0 # 设定一个共享变量 13 thread_list = [] 14 lock = threading.Lock() # 生成全局锁 15 for i in range(100): 16 t = threading.Thread(target=addNum) 17 t.start() 18 thread_list.append(t) 19 20 for t in thread_list: # 等待所有线程执行完毕 21 t.join() 22 23 print(‘final num:‘,num)
GIL VS Lock
- Python的线程在GIL的控制之下,线程之间,对整个python解释器,对python提供的C API的访问都是互斥的,这可以看作是Python内核级的互斥机制。但是这种互斥是我们不能控制的,我们还需要另外一种可控的互斥机制———用户级互斥。内核级通过互斥保护了内核的共享资源,同样,用户级互斥保护了用户程序中的共享资源。
- GIL 的作用是:对于一个解释器,只能有一个thread在执行bytecode。所以每时每刻只有一条bytecode在被执行一个thread。GIL保证了bytecode 这层面上是thread safe的。
但是如果你有个操作比如 x += 1,这个操作需要多个bytecodes操作,在执行这个操作的多条bytecodes期间的时候可能中途就换thread了,这样就出现了data races的情况了。
4、线程死锁和递归锁
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所以这两个线程在无外力作用下将一直等待下去。
1 import threading , time 2 3 def run1(): 4 print(‘gotlockA‘,time.ctime()) 5 lock.acquire() 6 global num 7 num += 1 8 lock.release() 9 return num 10 11 def run2(): 12 print(‘gotlockB‘,time.ctime()) 13 lock.acquire() 14 global num2 15 num2 += 1 16 lock.release() 17 return num2 18 19 def run3(): 20 lock.acquire() 21 res = run1() 22 print(‘--- run1 and run2 ---‘) 23 res2 = run2() 24 lock.release() 25 print(res,res2) 26 27 num ,num2 = 0,0 28 lock = threading.Lock() 29 30 for i in range(10): 31 t = threading.Thread(target=run3) 32 t.start() 33 34 while threading.active_count() != 1: 35 print(threading.active_count()) 36 else: 37 print(‘------- all --------‘) 38 print(‘num1:%s , num2:%s‘%(num,num2))
递归锁
为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
1 import threading , time 2 3 def run1(): 4 print(‘gotlockA‘,time.ctime()) 5 lock.acquire() 6 global num 7 num += 1 8 lock.release() 9 return num 10 11 def run2(): 12 print(‘gotlockB‘,time.ctime()) 13 lock.acquire() 14 global num2 15 num2 += 1 16 lock.release() 17 return num2 18 19 def run3(): 20 lock.acquire() 21 res = run1() 22 print(‘--- run1 and run2 ---‘) 23 res2 = run2() 24 lock.release() 25 print(res,res2) 26 27 num ,num2 = 0,0 28 lock = threading.RLock() 29 30 for i in range(10): 31 t = threading.Thread(target=run3) 32 t.start() 33 34 while threading.active_count() != 1: 35 pass # print(threading.active_count()) 36 else: 37 print(‘------- all --------‘) 38 print(‘num1:%s , num2:%s‘%(num,num2))
5、信号量Semaphore
信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数器,每当调用acquire()时-1,调用release()时+1
计数器不能小于0,当计数器为0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()(类似于停车位的概念)
BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常
1 import threading,time 2 3 def run(n): 4 semaphore.acquire() 5 time.sleep(1) 6 print(‘run:%s\n‘%n) 7 semaphore.release() 8 9 semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行 10 for i in range(20): 11 t = threading.Thread(target=run ,args=(i,)) 12 t.start() 13 14 while threading.active_count() != 1: 15 pass 16 else: 17 print(‘----- all -----‘)
6、Timer
定时器,指定n秒后执行某操作
1 import threading 2 3 def hello(): 4 print(‘hello world‘) 5 6 t = threading.Timer(3 ,hello) 7 t.start()
7、事件event
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear
事件处理的机制:全局定义了一个"Flag",如果"Flag"值为False,那么当程序执行event.wait方法时就会阻塞,如果"Flag"值为True,那么event.wait方法时便不再阻塞。
- clear:将"Flag"设置为False
- set:将"Flag"设置为True
event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
1 import threading 2 3 def do(event): 4 print(‘start‘) 5 event.wait() 6 print(‘execute‘) 7 8 event_obj = threading.Event() 9 10 for i in range(5): 11 t = threading.Thread(target=do , args=(event_obj,)) 12 t.start() 13 14 event_obj.clear() 15 inp = input(‘input:‘) 16 if inp == ‘1‘: 17 event_obj.set()
8、queue队列
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.(队列在线程编程中尤其有用,因为必须在多个线程之间安全地交换信息)
Python Queue模块有三种队列及构造函数: 1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize) 2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize) 3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) 创建一个“队列”对象 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。 将一个值放入队列中 q.put(10) 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为 1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。 将一个值从队列中取出 q.get() 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。 此包中的常用方法(q = Queue.Queue()): q.qsize() 返回队列的大小 q.empty() 如果队列为空,返回True,反之False q.full() 如果队列满了,返回True,反之False q.full 与 maxsize 大小对应 q.get([block[, timeout]]) 获取队列,timeout等待时间 q.get_nowait() 相当q.get(False) 非阻塞 q.put(item) 写入队列,timeout等待时间 q.put_nowait(item) 相当q.put(item, False) q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 q.join() 实际上意味着等到队列为空,再执行别的操作
1 import queue 2 3 q = queue.Queue() 4 # 存数据 5 q.put(‘hello‘) 6 q.put(‘world‘) 7 q.put(‘import‘) 8 9 # 取数据 10 print(q.get()) 11 print(q.get()) 12 print(q.get()) 13 14 q = queue.LifoQueue() 15 # 存数据 16 q.put(‘hello‘) 17 q.put(‘world‘) 18 q.put(‘import‘) 19 20 # 取数据 21 print(q.get()) 22 print(q.get()) 23 print(q.get()) 24 25 q = queue.PriorityQueue() 26 # 存数据 27 q.put((3,‘hello‘)) 28 q.put((-1,‘world‘)) 29 q.put((5,‘import‘)) 30 31 # 取数据 32 print(q.get()) 33 print(q.get()) 34 print(q.get())
生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度
什么是生产者和消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
1 import threading ,time 2 import queue 3 4 q = queue.Queue(maxsize=10) 5 def producer(): 6 count = 1 7 while True: 8 q.put(‘包子 %s‘ %count) 9 print(‘生产了包子‘,count) 10 count += 1 11 time.sleep(0.1) 12 13 def consumer(n): 14 # while q.qsize() > 0: 15 while True: 16 print(‘%s 吃了 %s‘ %(n ,q.get())) 17 time.sleep(0.5) 18 19 p = threading.Thread(target=producer,) 20 c1 = threading.Thread(target=consumer, args=(‘tom‘,)) 21 c2 = threading.Thread(target=consumer, args=(‘jack‘,)) 22 p.start() 23 c1.start() 24 c2.start()
1 #实现一个线程不断生成一个随机数到一个队列中(考虑使用Queue这个模块) 2 # 实现一个线程从上面的队列里面不断的取出奇数 3 # 实现另外一个线程从上面的队列里面不断取出偶数 4 5 import random,threading,time 6 from queue import Queue 7 8 # 生产者 9 class Producer(threading.Thread): 10 def __init__(self,t_name,queue): 11 threading.Thread.__init__(self,name=t_name) 12 self.data = queue 13 14 def run(self): 15 for i in range(10): # 随机产生10个数字,可以修改为任意大小 16 num = random.randint(1,99) 17 print(‘%s:%s 生产了 %d ‘%(time.ctime(),self.getName(),num)) 18 self.data.put(num) # 将数据依次存入队列 19 time.sleep(1) 20 print(‘%s:%s 完成!‘%(time.ctime(),self.getName())) 21 22 # 消费者 23 class Consumer_even(threading.Thread): 24 def __init__(self,t_name,queue): 25 threading.Thread.__init__(self,name=t_name) 26 self.data = queue 27 28 def run(self): 29 while True: 30 try: 31 val_even = self.data.get(1,5) # 1就是阻塞等待,5是超时5秒 32 if val_even % 2 == 0: 33 print(‘%s:%s 消费了 %d ‘ %(time.ctime(),self.getName(),val_even)) 34 time.sleep(2) 35 else: 36 self.data.put(val_even) 37 time.sleep(2) 38 except:break # 等待输入,超时退出 39 40 class Consumer_odd(threading.Thread): 41 def __init__(self,name,queue): 42 threading.Thread.__init__(self,name=name) 43 self.data = queue 44 45 def run(self): 46 while True: 47 try: 48 val_odd = self.data.get(1,5) 49 if val_odd % 2 != 0: 50 print(‘%s:%s 消费了 %d ‘ %(time.ctime(),self.getName(),val_odd)) 51 time.sleep(2) 52 else: 53 self.data.put(val_odd) 54 time.sleep(2) 55 except:break # 等待输入,超时退出 56 57 def main(): 58 queue = Queue() 59 producer = Producer(‘Pro‘,queue) 60 consumer_even = Consumer_even(‘Con_even‘,queue) 61 consumer_odd = Consumer_odd(‘Con_odd‘,queue) 62 producer.start() 63 consumer_even.start() 64 consumer_odd.start() 65 producer.join() 66 consumer_even.join() 67 consumer_odd.join() 68 print(‘所有线程终止!‘) 69 70 if __name__ == ‘__main__‘: 71 main()
Python multiprocessing 模块
multiprocessing 是似于线程模块的API支持生成进程的包.多处理包提供本地和远程并发性,通过使用子进程代替线程,有效地避免了全局解释器锁。由于这个原因,多处理模块允许程序员在给定的机器上充分利用多个处理器。它在Unix和Windows上运行。
1 from multiprocessing import Process 2 3 def foo(): 4 print(‘hello world!‘) 5 6 if __name__ == ‘__main__‘: 7 8 for i in range(5): 9 p =Process(target=foo,) 10 p.start()
注意:由于进程之间的数据需要各自持有一份,所以创建进程需要非常大的开销
1、进程间通讯
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:
Queues
使用方法跟threading里的queue差不多
1 from multiprocessing import Process,Queue 2 3 def f(q): 4 q.put([42, None, ‘hello‘]) 5 6 if __name__ == ‘__main__‘: 7 q = Queue() 8 p = Process(target=f, args=(q,)) 9 p.start() 10 print(q.get()) 11 p.join()
Pipes
管道函数返回由管道连接的一对连接对象,该管道默认是双向的(双向的)
1 from multiprocessing import Process,Pipe 2 3 def f(conn): 4 conn.send([42, None, ‘hello‘]) 5 print(‘child:‘,conn.recv()) 6 conn.close() 7 8 if __name__ == ‘__main__‘: 9 parent_conn, child_conn = Pipe() 10 p = Process(target=f, args=(child_conn,)) 11 p.start() 12 print(‘parent:‘,parent_conn.recv()) 13 parent_conn.send(‘world‘) 14 p.join()
管道函数返回的两个连接对象表示管道的两端。每个连接对象都有send()和recv()方法(包括其他)。注意,如果两个进程(或线程)试图同时读取或写入管道的同一端,那么管道中的数据可能会被损坏。当然,在同一时间使用不同管道的过程中不会出现损坏的风险
Managers
manager函数返回的manager对象控制一个保存Python对象的服务器进程,并允许其他进程使用代理来操作它们
manager返回的管理器()将支持类型list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value和Array.
1 from multiprocessing import Process ,Manager 2 3 def foo(d,l): 4 d[1] = ‘1‘ 5 d[‘2‘] = 2 6 l.append(1) 7 print(l) 8 9 if __name__ == ‘__main__‘: 10 with Manager() as manager: 11 d = manager.dict() 12 l = manager.list(range(5)) 13 p_list = [] 14 for i in range(10): 15 p = Process(target=foo,args=(d, l)) 16 p.start() 17 p_list.append(p) 18 for res in p_list: 19 res.join() 20 21 print(d) 22 print(l)
进程同步
1 from multiprocessing import Process,Lock 2 3 def foo(l, i): 4 l.acquire() 5 print(‘hello world‘,i) 6 l.release() 7 8 if __name__ == ‘__main__‘: 9 lock = Lock() 10 11 for num in range(10): 12 Process(target=foo, args=(lock, num)).start()
2、进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止
进程池中有两个方法:
- apply
- apply_async
1 from multiprocessing import Process ,Pool 2 import time 3 import os 4 5 def Foo(i): 6 time.sleep(2) 7 print(‘process‘,os.getpid()) 8 return i + 100 9 10 def Bar(arg): 11 print(‘-->‘,arg,os.getpid()) 12 13 if __name__ == ‘__main__‘: 14 pool = Pool(5) # 允许进程池同时放入5个进程 15 for i in range(10): 16 pool.apply_async(func=Foo, args=(i,),callback=Bar) # callback回调 17 # pool.apply(func=Foo, args=(i,)) # 串行 18 print(‘end‘) 19 pool.close() 20 pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭