标签:一起 中间 异步 资源 empty 生产者消费者模型 实体 注意 也有
一.进程:
1.定义:进程最小的资源单位,本质就是一个程序在一个数据集上的一次动态执行(运行)的过程
2.组成:进程一般由程序,数据集,进程控制三部分组成:
(1)程序:用来描述进程要完成哪些功能以及如何完成
(2)数据集:是程序在执行过程中所需要使用的一切资源
(3)进程控制块:用来记录进程外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
3.进程的作用:是想完成多任务并发,进程之间的内存地址是相互独立的
二.线程:
1.定义:最小的执行单位,线程的出现是为了降低上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一样事的缺陷,使到进2.程内并发成为可能
2.组成:线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID,程序,计数器,寄存器集合和堆栈共同组成。
3.作用:线程的引入减小了程序并发执行的开销,提高了操作系统并发性,线程没有自己的系统资源
三.并发和并行的关系
1.并发:指系统具有处理多个任务(动作)的能力,一个CPU可以实现并发,因为它能实现多个任务的处理
2.并行:指系统具有同时处理多个任务(动作)的能力,一个程序只能对应一个CPU核数,一核不可能有俩个程序同时进行,只能是并发不是并行
3.并发和并行的关系:并行是并发的一个子集
四.进程和线程的关系:
1.一个程序至少有一个进程,一个进程至少有一个线程(进程可以理解成线程的容器)
2.进程在执行过程中拥有独立的内存单元,而多个线程共享内存(进程的内存),从而极大地提高了程序的运行效率
3.线程在执行过程中与进程的区别是,每个独立的线程有一个程序运行的入口,顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
4.进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行,一个进程里边可以有多个线程,CPU运行的是线程,进程是做资源管理的,它代表一个过程,管理这些线程,一个进程最少有一个线程,这个线程叫主线程,可以在这个线程里开多个子线程
五.python的线程与threading模块
1.threading模块建立在thread模块之上,thread模块以低级,原始的方式来处理和控制线程,而threading模块通过对thread进行二次封装,提供了更方便的api来处理线程。
(1)直接调用实现并发
import threading #引入线程模块 import time #从上到下按照主线程执行的 def Hi(num): print("hello %d"%num) time.sleep(5) #模拟时间消耗 if __name__ == ‘__main__‘: #创建第一个子线程 t1=threading.Thread(target=Hi,args=(10,)) #threading.给Thread这个类创建了一个子线程对象t1----target等于要处理的函数名字 t1.start() #Thread这个类实例出对象t1通过start启动第一个子线程 #创建第二个子线程 t2 = threading.Thread(target=Hi, args=(5,)) #threading.给Thread这个类创建了一个子线程对象t2----target等于要处理的函数名字 t2.start() #Thread这个类实例出对象t2通过start启动第二个子线程 print("结束..........") #主线程
打印:同时打印出三条后等5秒程序结束
hello 10
hello 5
结束..........
(2)直接调用实现并发:
import threading from time import ctime,sleep import time def ListenMusic(name): #定义函数一 print ("我是开始 %s. %s" %(name,ctime())) sleep(3) print("t1结束 %s"%ctime()) def RecordBlog(title): #定义函数二 print ("我是开始 %s! %s" %(title,ctime())) sleep(5) print(‘t2结束 %s‘%ctime()) threads = [] #定义一个空列表 t1 = threading.Thread(target=ListenMusic,args=(‘t1‘,)) #创建t1把t1加到列表threads里面去 t2 = threading.Thread(target=RecordBlog,args=(‘t2‘,)) #创建t2把t2加到列表threads里面去 threads.append(t1) threads.append(t2) if __name__ == ‘__main__‘: for t in threads: #对列表执行遍历 t.start() #把所有线程都开启 print ("所有都结束 %s" %ctime()) #主线程
打印结果:首先打印出我是开始 t1,我是开始 t2和所有都结束,三秒后打印t1结束,俩秒钟后打印t2结束,总共耗时5秒实现并发
我是开始 t1. Mon Nov 26 11:42:02 2018
我是开始 t2! Mon Nov 26 11:42:02 2018
所有都结束 Mon Nov 26 11:42:02 2018
t1结束 Mon Nov 26 11:42:05 2018
t2结束 Mon Nov 26 11:42:07 2018
(3)通过继承式调用
import threading import time class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): #定义每个线程要运行的函数,必须是run print("running on number:%s" % self.num) time.sleep(3) if __name__ == ‘__main__‘: t1 = MyThread(1) #实例类是自己定制的继承threading.Thread,t1就是线程对象 t2 = MyThread(2) t1.start() #激活run方法 t2.start() print("结束......") #主线程
返回结果:
running on number:1
running on number:2
结束......
2.join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞(join方法线程对象实例的方法)
(1)模拟并发效果之join方法1
import threading import time def music(): print("music开始时间 %s"%time.ctime()) time.sleep(3) print("music结束时间 %s" % time.ctime()) def game(): print("game开始时间 %s"%time.ctime()) time.sleep(5) print("game结束时间 %s" % time.ctime()) if __name__ == ‘__main__‘: #创建第一个子线程 t1= threading.Thread(target=music) #threading.给Thread这个类创建了一个子线程对象t1 #创建第二个子线程 t2 = threading.Thread(target=game) #threading.给Thread这个类创建了一个子线程对象t2 t1.start() #Thread这个类实例出对象t1通过start启动 t2.start() #Thread这个类实例出对象t2通过start启动 #join是t1,t2不执行完主线程不往下执行 t1.join() t2.join() #主线程 print("结束")
打印流程:首先打印出music开始时间和game开始时间三秒后打印music结束时间俩秒后打印game结束时间和结束,总共耗时5秒实现并发
music开始时间 Sat Nov 24 14:47:40 2018
game开始时间 Sat Nov 24 14:47:40 2018
music结束时间 Sat Nov 24 14:47:43 2018
game结束时间 Sat Nov 24 14:47:45 2018
结束
(2)模拟并发效果之join方法2把t2给join起来
import threading from time import ctime,sleep import time def ListenMusic(name): #定义函数一 print ("我是开始 %s. %s" %(name,ctime())) sleep(3) print("t1结束 %s"%ctime()) def RecordBlog(title): #定义函数二 print ("我是开始 %s! %s" %(title,ctime())) sleep(5) print(‘t2结束 %s‘%ctime()) threads = [] #定义一个空列表 t1 = threading.Thread(target=ListenMusic,args=(‘t1‘,)) #创建t1把t1加到列表threads里面去 t2 = threading.Thread(target=RecordBlog,args=(‘t2‘,)) #创建t2把t2加到列表threads里面去 threads.append(t1) threads.append(t2) if __name__ == ‘__main__‘: for t in threads: #对列表执行遍历 #t.setDaemon(True) #注意:一定在start之前设 t.start() #把所有线程都开启 t.join() #把t2给join起来 print ("所有都结束 %s" %ctime()) #主线程
打印输出:首先打印我是开始 t1和我是开始 t2,三秒钟后打印t1结束,俩秒钟后打印t2结束和所有都结束
我是开始 t1. Mon Nov 26 11:49:35 2018
我是开始 t2! Mon Nov 26 11:49:35 2018
t1结束 Mon Nov 26 11:49:38 2018
t2结束 Mon Nov 26 11:49:40 2018
所有都结束 Mon Nov 26 11:49:40 2018
(3)模拟并发效果之join无用方法串行
import threading from time import ctime,sleep import time def ListenMusic(name): #定义函数一 print ("我是开始 %s. %s" %(name,ctime())) sleep(3) print("t1结束 %s"%ctime()) def RecordBlog(title): #定义函数二 print ("我是开始 %s! %s" %(title,ctime())) sleep(5) print(‘t2结束 %s‘%ctime()) threads = [] #定义一个空列表 t1 = threading.Thread(target=ListenMusic,args=(‘t1‘,)) #创建t1把t1加到列表threads里面去 t2 = threading.Thread(target=RecordBlog,args=(‘t2‘,)) #创建t2把t2加到列表threads里面去 threads.append(t1) threads.append(t2) if __name__ == ‘__main__‘: for t in threads: #对列表执行遍历 t.start() #把所有线程都开启 t.join() #串行 print ("所有都结束 %s" %ctime())
打印输出:首先打印我是开始 t1,三秒钟后打印t1结束和我是开始 t2,5秒钟打印t2结束和所有都结束总共用时8秒没有实现并发
我是开始 t1. Mon Nov 26 11:45:10 2018
t1结束 Mon Nov 26 11:45:13 2018
我是开始 t2! Mon Nov 26 11:45:13 2018
t2结束 Mon Nov 26 11:45:18 2018
所有都结束 Mon Nov 26 11:45:18 2018
3.setDaemon(True)将线程声明为守护线程,必须在start()方法调用之前设置,如果不设置为守护线程程序会被无线挂起。这个方法基本和join是相反的(setDaemon(True)方法线程对象实例的方法)。
当我们在程序运行中,执行一个主线程,如果主线程又创建了一个子线程,主线程和子线程就分兵俩路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后在退出。但是有时候我们需要的是,只要主线程完成了,不管子线程是否完成,都要和主线程一起退出
(1)模拟并发效果值setDaemon守护线程把t1和t2设置成守护线程跟着主线程一起退
import threading from time import ctime,sleep import time def xiancheng1(name): #定义函数一(线程1) print ("我是开始 %s %s" %(name,ctime())) sleep(3) print("t1结束 %s"%ctime()) def xiancheng2(title): #定义函数二(线程2) print ("我是开始 %s %s" %(title,ctime())) sleep(5) print(‘t2结束 %s‘%ctime()) threads = [] #定义一个空列表 t1 = threading.Thread(target=xiancheng1,args=(‘t1‘,)) #创建线程对象t1 t2 = threading.Thread(target=xiancheng2,args=(‘t2‘,)) #创建线程对象t2 threads.append(t1) #把t1加到列表threads里面 threads.append(t2) #把t2加到列表threads里面 if __name__ == ‘__main__‘: for t in threads: #通过for循环对列表执t.start t.setDaemon(True) #把t1和t2都设置成守护线程,子线程守护主线程(一定在start之前设) t.start() #把所有线程都开启 print ("我是主线程 %s" %ctime())
打印结果:同时输出我是开始 t1和我是开始 t2和我是主线程结束
我是开始 t1 Tue Nov 27 11:59:49 2018
我是开始 t2 Tue Nov 27 11:59:49 2018
我是主线程 Tue Nov 27 11:59:49 2018
(2)模拟并发效果值setDaemon守护线程把t2设置成守护线程跟着主线程一起退
import threading from time import ctime,sleep import time def xiancheng1(name): #定义函数一(线程1) print ("我是开始 %s %s" %(name,ctime())) sleep(3) print("t1结束 %s"%ctime()) def xiancheng2(title): #定义函数二(线程2) print ("我是开始 %s %s" %(title,ctime())) sleep(5) print(‘t2结束 %s‘%ctime()) threads = [] #定义一个空列表 t1 = threading.Thread(target=xiancheng1,args=(‘t1‘,)) #创建线程对象t1 t2 = threading.Thread(target=xiancheng2,args=(‘t2‘,)) #创建线程对象t2 threads.append(t1) #把t1加到列表threads里面 threads.append(t2) #把t2加到列表threads里面 if __name__ == ‘__main__‘: for t in threads: #通过for循环对列表执t.start t2.setDaemon(True) #把t2设置成守护线程,t2子线程要守护主线程(一定在start之前设) t.start() #把所有线程都开启 print ("我是主线程 %s" %ctime())
打印输出:首先打印输出我是开始 t1和我是开始 t2和我是主线程,3秒钟后打印t1结束,因为t1不是主线程守护,要等待t1子线程结束后退出程序
我是开始 t1 Tue Nov 27 12:02:40 2018
我是开始 t2 Tue Nov 27 12:02:40 2018
我是主线程 Tue Nov 27 12:02:40 2018
t1结束 Tue Nov 27 12:02:43 2018
4.其它方法
(1)run():用以表示线程活动的方法
(2)start():启动线程活动(让线程处于就绪的状态,等待操作系统调cpu去执行)
(3)isAlive():返回线程是否活动的返回布尔值
(4)getName():查看当前线程名,(5)setName():设置线程名
import threading from time import ctime,sleep import time def ListenMusic(name): #定义函数一(线程1) print ("我是开始 %s. %s" %(name,ctime())) sleep(3) print("我是结束t1 %s"%ctime()) def RecordBlog(title): #定义函数二(线程2) print ("我是开始 %s! %s" %(title,ctime())) sleep(5) print(‘我是结束t2 %s‘%ctime()) threads = [] #定义一个空列表 t1 = threading.Thread(target=ListenMusic,args=(‘t1‘,)) #创建t1把t1加到列表threads里面去 t2 = threading.Thread(target=RecordBlog,args=(‘t2‘,)) #创建t2把t2加到列表threads里面去 threads.append(t1) threads.append(t2) if __name__ == ‘__main__‘: t2.setDaemon(True) #把t1设置成守护线程(一定在start之前设) for t in threads: #对列表执行遍历 t.start() #把所有线程都开启 print(t.getName()) #默认线程名字 print("count:", threading.active_count()) #修改线程名字 print ("我是主线程 %s" %ctime())
返回:
我是开始 t1. Mon Nov 26 14:09:12 2018
Thread-1
count: 2
我是开始 t2! Mon Nov 26 14:09:12 2018
Thread-2
count: 3
我是主线程 Mon Nov 26 14:09:12 2018
我是结束t1 Mon Nov 26 14:09:15 2018
5.threading模块提供的一些方法:
(1)threading.currentThread():返回当前的线程变量
(2)threading.enumerate():返回一个包含正在运行的线程的list。正在运行指线程启动后,结束前,不包括启动前和终止后的线程
(3)threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate)有相同结果
import threading from time import ctime,sleep import time def ListenMusic(name): #定义函数一(线程1) print ("我是开始 %s. %s" %(name,ctime())) sleep(3) print("我是结束t1 %s"%ctime()) def RecordBlog(title): #定义函数二(线程2) print ("我是开始 %s! %s" %(title,ctime())) sleep(5) print(‘我是结束t2 %s‘%ctime()) threads = [] #定义一个空列表 t1 = threading.Thread(target=ListenMusic,args=(‘t1‘,)) #创建t1把t1加到列表threads里面去 t2 = threading.Thread(target=RecordBlog,args=(‘t2‘,)) #创建t2把t2加到列表threads里面去 threads.append(t1) threads.append(t2) if __name__ == ‘__main__‘: for t in threads: #对列表执行遍历 t.setDaemon(True) # 把t1和t2都设置成守护线程(一定在start之前设) t.start() #把所有线程都开启 while threading.active_count() == 1: #表示此时此刻就一个主线程 print ("我是主线程 %s" %ctime())
输出结果:
我是开始 t1. Mon Nov 26 14:22:07 2018
我是开始 t2! Mon Nov 26 14:22:07 2018
六.python的GIL(全局解释锁)
1.GIL:全局解释锁:无论你启动多少个线程,你有多少CPU,python在执行的时候在同一时刻只允许一个线程被CPU执行
2.任务:分IO密集型和计算密集型,对于IO密集型的任务python的多线程不会被GIL影响,对于计算密集型的任务python的多线程会被GIL影响
3.任务解决方法:多进程+协程解决GIL
4.同步锁
5.需求开100个线程对数字100做累减100的操作
(1)未使用同步锁
import threading import time def sub(): #累减函数sub global num #在每个线程中都获取这个全局变量 temp=num #把num赋值给temp time.sleep(0.001) #等待0.001秒 num=temp-1 #对此公共变量进行-1操作赋值给num num=100 #设定一个共享变量 l=[] #定义空列表 for i in range(100): t=threading.Thread(target=sub) #创建线程执行sub赋值给t t.start() #执行t l.append(t) #创建t的时候加到列表l里 for t in l: t.join() #等着主线程,让主线程在这里不要去执行 print(num)
打印输出:多个线程都在同时操作同一个共享资源,所以造成资源损坏导致输入结果不是0
87
(2)使用同步锁
import threading import time def sub(): #累减函数sub global num #在每个线程中都获取这个全局变量 #使用同步锁把这部分改为串行 lock.acquire() #获得一把锁,在锁下面的代码谁都不可以有CPU的切换,只能有一个线程被执行 temp=num time.sleep(0.001) num=temp-1 #对此公共变量进行-1操作赋值给num lock.release() #释放这把锁,获取锁中间的代码不可以有CPU的切换 num=100 #设定一个共享变量 l=[] #定义空列表 lock=threading.Lock() #创建同步锁lock for i in range(100): t=threading.Thread(target=sub) #创建线程执行sub赋值给t t.start() #执行t l.append(t) #创建t的时候加到列表l里 for t in l: #等待所有子线程执行完毕 t.join() print(num)
打印输出:
0
5.线程死锁和递归锁
在线程间共享多个资源的时候,如果俩个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这俩个线程在无外力作用下将一直等待下去。
(1)线程死锁
import threading import time class MyThread(threading.Thread): #创建一个类MyThread def actionA(self): #实例方法1 #五个线程同时抢占 A.acquire() #创建A锁 print(self.name,"A锁",time.ctime()) #self.name是线程的名字打印A错名字 time.sleep(2) B.acquire() #创建B锁 print(self.name, "B锁", time.ctime()) time.sleep(1) B.release() #结束B锁 A.release() #结束A锁 def actionB(self): #实例方法2 B.acquire() #创建B锁 print(self.name, "B锁", time.ctime()) time.sleep(2) A.acquire() #创建A锁 print(self.name, "A锁", time.ctime()) time.sleep(1) A.release() B.release() def run(self): #先执行run方法(开了5分线程同时去执行下面俩函数) self.actionA() #run方法执行actionA() self.actionB() #run方法执行actionB() if __name__ == ‘__main__‘: #类的继承方式来启动多线程 #创建同步锁A A=threading.Lock() #创建同步锁B B=threading.Lock() L=[] #定义空列表L for i in range(5): #创建5个线程对象同时执行run方法 t=MyThread() #继承方式创建线程对象实例自己的类 t.start() #执行t L.append(t) #创建t的时候加到列表L里 for i in L: #等待所有子线程执行完毕 i.join() print("结束....")
打印结果:
Thread-1 A锁 Tue Nov 27 17:09:54 2018
Thread-1 B锁 Tue Nov 27 17:09:56 2018
Thread-1 B锁 Tue Nov 27 17:09:57 2018
Thread-2 A锁 Tue Nov 27 17:09:57 2018
造成死锁:第一个线程在执行actionB(self):的A.acquire()这位置想要一把A锁的时候,第二个线程已经把A锁拿走了,需要等待A锁什么时候被释放什么时候才可以拿来,第二个线程在执行actionA(self)的B.acquire()这位置想获得B锁但B锁已经被第一个线程拿着,俩边都不释放造成了死锁
(2)递归锁解决死锁
import threading import time class MyThread(threading.Thread): #创建一个类MyThread def actionA(self): #实例方法1 #count大于1其它线程是无法进来不会造成死锁 r_lcok.acquire() #acquire一次加一个1 内部count计数器=1 print(self.name,"A锁",time.ctime()) #self.name是线程的名字打印A错名字 time.sleep(2) r_lcok.acquire() #acquire一次加一个1 内部count计数器=2 print(self.name, "B锁", time.ctime()) time.sleep(1) r_lcok.release() #release减一个1 内部count计数器=1 r_lcok.release() #count计数器=0的时候第二个线程就可以再用这把锁 def actionB(self): #实例方法2 r_lcok.acquire() print(self.name, "B锁", time.ctime()) time.sleep(2) r_lcok.acquire() print(self.name, "A锁", time.ctime()) time.sleep(1) r_lcok.release() r_lcok.release() def run(self): #先执行run方法(开了5分线程同时去执行下面俩函数) self.actionA() #run方法执行actionA() self.actionB() #run方法执行actionB() if __name__ == ‘__main__‘: #类的继承方式来启动多线程 r_lcok = threading.RLock() #创建递归锁 L=[] #定义空列表L for i in range(5): #创建5个线程对象同时执行run方法 t=MyThread() #继承方式创建线程对象实例自己的类 t.start() #执行t L.append(t) #创建t的时候加到列表L里 for i in L: #等待所有子线程执行完毕 i.join() print("结束....")
打印结果:
Thread-1 A锁 Tue Nov 27 17:47:27 2018
Thread-1 B锁 Tue Nov 27 17:47:29 2018
Thread-2 A锁 Tue Nov 27 17:47:30 2018
Thread-2 B锁 Tue Nov 27 17:47:32 2018
Thread-3 A锁 Tue Nov 27 17:47:33 2018
Thread-3 B锁 Tue Nov 27 17:47:35 2018
Thread-4 A锁 Tue Nov 27 17:47:36 2018
Thread-4 B锁 Tue Nov 27 17:47:38 2018
Thread-5 A锁 Tue Nov 27 17:47:39 2018
Thread-5 B锁 Tue Nov 27 17:47:41 2018
Thread-1 B锁 Tue Nov 27 17:47:42 2018
Thread-1 A锁 Tue Nov 27 17:47:44 2018
Thread-2 B锁 Tue Nov 27 17:47:45 2018
Thread-2 A锁 Tue Nov 27 17:47:47 2018
Thread-3 B锁 Tue Nov 27 17:47:48 2018
Thread-3 A锁 Tue Nov 27 17:47:50 2018
Thread-4 B锁 Tue Nov 27 17:47:51 2018
Thread-4 A锁 Tue Nov 27 17:47:53 2018
Thread-5 B锁 Tue Nov 27 17:47:54 2018
Thread-5 A锁 Tue Nov 27 17:47:56 2018
结束....
七.同步条件对象(Event)
1.同步:当你一个进程执行到一个IO操作(等待外部数据)的时候,这个等的过程就是同步
2.异步:当你一个进程执行到一个IO操作(等待外部数据)的时候,不等待,一直等到数据接收成功,再回来处理
3.让两个线程之间处于同步操作
(1)event.wati():如果flag被设定,就不等待了继续往下执行
(2)event.set():设定Event
(3)event.clear():清除Event
(4)如果flag被设定了,wait方法就不会做任何事情,如果flag被清除(没有被设定)了,wait会阻塞,一直等待被设定为止,一个Event对象可以用在多个线程里去
import threading,time class Teacher(threading.Thread): def run(self): print("Teacher:今天放学都不要走啊。") #print(event.isSet()) #打印当前是否被设定event,返回False event.set() #event.set()设定event,第一个event.wait()以下的代码就可以继续执行 time.sleep(3) print("Teacher:可以下课了。") #print(event.isSet()) #打印当前是否被设定event,返回False event.set() #event.set()设定event class Student(threading.Thread): def run(self): event.wait() #遇到wait卡住返回到threads.append(Teacher())继续执行。一旦event被设定,等同于pass,下面可以继续执行 print("Student:命苦啊!") time.sleep(1) event.clear() #event.clear()清空event event.wait() #遇到wait卡住返回到time.sleep(3)继续执行。一旦event被设定,等同于pass,下面可以继续执行 print("Student:OhYeah!") if __name__=="__main__": event=threading.Event() #第一步:创建event对象 threads=[] #第二步:创建列表 for i in range(3): #创建3个Worker线程对象加到列表里 threads.append(Student()) #Student类继承了threading.Thread,里面有run方法,一旦有这个线程会执行run方法 threads.append(Teacher()) #第三步:创建一个Teacher对象,也是继承是调用 for t in threads: #第四步: t.start() for t in threads: #第五步: t.join() print("结束.....")
返回结果:
Teacher:今天放学都不要走啊。
Student:命苦啊!
Student:命苦啊!
Student:命苦啊!
Teacher:可以下课了。
Student:OhYeah!
Student:OhYeah!
Student:OhYeah!
结束.....
八.信号量(Semaphore)
1.信号量:用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数器,没当调用acquire()时-1,调用release()时+1。计数器不能小于0,当计数器为0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。
2.BoundedSemaphore或Semaphore的唯一区别在于前者将在调用release()时检查计数器是否超过了计数器的初始值,如果超过了将抛出一个异常。
import threading,time class myThread(threading.Thread): def run(self): if semaphore.acquire(): #同时有5个线程进去 print(self.name) time.sleep(3) semaphore.release() #把semaphore释放又可以进来5个线程 if __name__=="__main__": semaphore=threading.Semaphore(5) #semaphore也是锁的一种,threading.Semaphore()创建锁对象,5是每次只能进去5个线程 thrs=[] #空列表 for i in range(15): #50线程对象分别去启动执行run方法 thrs.append(myThread()) for t in thrs: t.start()
返回结果:每隔3秒打印5个线程
Thread-1
Thread-2
Thread-3
Thread-4
Thread-5
Thread-7
Thread-8
Thread-9
Thread-6
Thread-10
Thread-11
Thread-12
Thread-13
Thread-14
Thread-15
九.多线程利器--队列(queue)
1.先进先出模式
import queue #线程队列 q=queue.Queue(3) #创建queue对象,3代表最多放3个数据(默认按照先进先出) q.put(18) #将18放入队列中 q.put("hello") #将hello放入队列中 q.put({"name":"xi"}) #将{"name":"xi"}放入队列中 #q.put(34,False) #加上参数False参数如果队列满了报错 while 1: data=q.get() #通过q.get()将值从队列中取出赋值给data #data = q.get(block=False) #加上参数False参数如果队列满了报错 print(data) print("----------")
打印结果:
18
----------
hello
----------
{‘name‘: ‘xi‘}
----------
2.先进后出模式
import queue #线程队列 q=queue.LifoQueue(3) #创建queue对象,3代表最多放3个数据(LifoQueue先进后出) q.put(12) #创建格子放数据12 q.put("hello") #创建格子放数据hello q.put({"name":"xi"}) #创建格子放数据{"name":"xi"} while 1: data=q.get() #通过q.get()取值q的值赋值给data print(data) print("----------")
打印结果:
{‘name‘: ‘xi‘}
----------
hello
----------
12
----------
3.优先级模式
import queue #线程队列 q=queue.PriorityQueue(3) #创建queue对象,3代表最多放3个数据(PriorityQueue优先级) q.put([2,12]) #创建格子放数据12,优先级2 q.put([3,"hello"]) #创建格子放数据hello,优先级3 q.put([1,{"name":"xi"}]) #创建格子放数据{"name":"xi"},优先级1 while 1: data=q.get() #通过q.get()取值q的值赋值给data print(data[1]) print("----------")
打印输出:
{‘name‘: ‘xi‘}
----------
12
----------
hello
----------
4.队列中常用的方法
(1)q.qsize():返回队列的大小
(2)q.empty():如果队列为空,返回True,反之False
(3)q.full():如果队列满了,返回True,反之False
(4)q.full与maxsize大小对应
(5)q.get([block[,timeout]])获取队列,timeout等待时间
(6)q.get_nowait()相当q.get(False)
(7)非阻塞q.put(item)写入队列,timeout等待时间
(8)q.task_done():在完成一项工作之后,q.task_done()函数向任务已经完成的队列发送一个信号
(9)q.join():等着列队为空在执行别的操作
import queue #线程队列 q=queue.Queue(3) #创建queue对象,3代表最多放3个数据(默认按照先进先出) q.put(18) #将18放入队列中 q.put("hello") #将hello放入队列中 q.put({"name":"xi"}) #将{"name":"xi"}放入队列中 #q.put_nowait(11) 相当于q.get(False) print(q.qsize()) #按照具体有多少值来的 print(q.empty()) #是否为空 print(q.full()) #是否是满的 while 1: data=q.get() #通过q.get()将值从队列中取出赋值给data print(data) print("----------")
返回:
3
False
True
18
----------
hello
----------
{‘name‘: ‘xi‘}
----------
5.生产者消费者模型
(1)为什么要使用生产者和消费者
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者消费者模式。
(2)什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强行解除耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
(3)通过队列完成生产者消费者模型
import time,random import queue,threading q = queue.Queue() #首先创建队列对象q def Producer(name): #生产者函数Producer count = 0 #生产者定义count = 0 while count < 5: #当0小于5的时候 print("开始生产........") #执行 time.sleep(3) q.put(count) #把当前count的值放到队列q里去 print(‘生产者 %s 已经生产 %s 号包子..‘ %(name, count)) count +=1 #给count加一个1 q.join() #遇到q.join生产者线程等待队列为,开始执行消费者线程,当包子被吃了(队列为空)的时候再往下执行 print("结束......") def Consumer(name): #消费者函数Consumer count = 0 while count <5: time.sleep(random.randrange(4)) data = q.get() #q.get()拿到包子 print("开始吃包子....") time.sleep(2) q.task_done() #俩秒钟后把吃完包子的消息告诉生产者里的q.join(), print(‘\033[32;1m消费者 %s 已吃 %s 号包子...\033[0m‘ %(name, data)) count +=1 p1 = threading.Thread(target=Producer, args=(‘王大厨‘,)) #p1线程作为生产者执行的Producer函数 c1 = threading.Thread(target=Consumer, args=(‘西西‘,)) #c1线程作为消费者执行的Consumer函数 c2 = threading.Thread(target=Consumer, args=(‘一一‘,)) #c2线程作为消费者执行的Consumer函数 #来了3个线程去执行p1,c1,c2 p1.start() c1.start() c2.start()
打印输出:
开始生产........
生产者 王大厨 已经生产 0 号包子..
开始吃....
消费者 西西 已吃 0 号包子...
结束......
开始生产........
生产者 王大厨 已经生产 1 号包子..
开始吃....
消费者 一一 已吃 1 号包子...
结束......
开始生产........
生产者 王大厨 已经生产 2 号包子..
开始吃....
消费者 西西 已吃 2 号包子...
结束......
开始生产........
生产者 王大厨 已经生产 3 号包子..
开始吃....
消费者 一一 已吃 3 号包子...
结束......
开始生产........
生产者 王大厨 已经生产 4 号包子..
开始吃....
消费者 西西 已吃 4 号包子...
结束......
十.多进程模块(multiprocessing)
由于GIL的存在,python中的多线程其实并不是真正的多线程,如果要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程
multiprocessing包是python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在python程序内部编写的函数。该Process对象与Thread对象的用法相同。也有start(),run(),join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类(这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部分与threading使用同一套API,只不过换到了多进程的情境。
1.多进程调用方式
(1)调用方式一(直接调用)
from multiprocessing import Process import time def f(name): #定义函数f time.sleep(1) print(‘hello‘, name,time.ctime()) #1秒钟后打印参数和时间 if __name__ == ‘__main__‘: p_list=[] for i in range(3): #循环创建3个子进程 p = Process(target=f, args=(‘xixi‘,)) #Process实例化进程对象 p_list.append(p) #加到列表p_list里 p.start() #启动 for i in p_list: i.join() print(‘结束‘) #主进程
打印输出:并行
hello xixi Thu Nov 29 11:05:08 2018
hello xixi Thu Nov 29 11:05:08 2018
hello xixi Thu Nov 29 11:05:08 2018
结束
(2)调用方式二(继承的方式)
from multiprocessing import Process import time class MyProcess(Process): def run(self): time.sleep(1) print (‘hello‘, self.name,time.ctime()) #self.name是进程的名字 if __name__ == ‘__main__‘: p_list=[] for i in range(3): p = MyProcess() p.start() #启动执行def run(self):里的run方法 p_list.append(p) for p in p_list: p.join() #等待子进程执行完主进程执行 print(‘结束‘) #主进程
打印输出:并行
hello MyProcess-1 Thu Nov 29 11:17:29 2018
hello MyProcess-3 Thu Nov 29 11:17:29 2018
hello MyProcess-2 Thu Nov 29 11:17:29 2018
结束
(3)daemon守护进程
from multiprocessing import Process import time class MyProcess(Process): def run(self): time.sleep(1) print (‘hello‘, self.name,time.ctime()) #self.name是进程的名字 if __name__ == ‘__main__‘: p_list=[] for i in range(3): p = MyProcess() p.daemon=True #加上守护进程,主进程完了就结束了,不管子进程是否执行完 p.start() p_list.append(p) print(‘结束‘) #主进程
打印结果:
结束
(4)显示涉及的各个进程ID
from multiprocessing import Process import os import time def info(title): #定义函数info print("title:", title) print(‘当前运行程序父进程的进程号:‘, os.getppid()) print(‘当前程序运行的进程号:‘, os.getpid()) def f(name): #定义函数f,f函数也调用了info info(‘function f‘) #调用了info函数 print(‘hello‘, name) if __name__ == ‘__main__‘: info(‘主进程‘) #执行给info传一个字符串 time.sleep(1) print("------------------") p = Process(target=info, args=(‘子进程‘,)) #创建一个子进程p p.start() #开启 p.join()
返回结果:父进程号:37320,子进程号:109704,子子进程号:111536
title: 主进程
当前运行程序父进程的进程号: 37320
当前程序运行的进程号: 109704
------------------
title: 子进程
当前运行程序父进程的进程号: 109704
当前程序运行的进程号: 111536
3.Process类
(1)构造方法:
process(group[,target[,name[,args[,kwargs]]]])
group:线程组,目前还没有实现,库引用中提示必须是None
target:要执行的方法
name:进程名字
args/kwargs:要传入方法的参数
(2)实例方法:
is_alive():返回进程是否在运行
join([timeout]):阻塞当前上下文环境的进程,直到调用此方法的进程终止或打到指定的timeout,
start():进程准备就绪,等待CPU调度
run():start()调用run方法,如果实例进程未指定传入target,这 start执行+默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
(3)属性:
daemon:和线程的setDeamon功能一样
name:进程名字
pid:进程号
4.进程间通讯
标签:一起 中间 异步 资源 empty 生产者消费者模型 实体 注意 也有
原文地址:https://www.cnblogs.com/xixi18/p/10037523.html