标签:queue类 rom 自己的 集合 from producer strong alt 情况
1、什么是进程
进程是一个程序在一个数据集上的一次动态执行过程。
进程一般由程序,数据集,进程控制块三部分组成。
2、什么是线程
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能。线程没有自己的系统资源。
3、线程与进程的关系
线程不能独立存在,一个线程肯定有一个父进程。进程也不能独立存在,它一定也包含一个进程。可以理解为进程是线程的容器。进程可以包含多个线程,但是线程只能依托一个进程存在。
threading模块是建立在thread模块之上。thread模块以低级、原始的方式来处理和控制线程,而threading模块是对thread的二次封装。
注意:全局解释器锁GIL,决定了python的多线程只能用到一个cpu。
1、下面是两种threading的调用方式:
(1)、直接使用方式例子
1 import threading 2 3 def func(num): 4 print("Hi %s threading" % num) 5 6 7 t1 = threading.Thread(target=func,args=(1,)) 8 t2 = threading.Thread(target=func,args=(2,)) 9 t1.start() 10 t2.start()
(2)、继承threading的方式
1 import threading 2 3 class Mythread(threading.Thread): 4 def __init__(self,num): 5 threading.Thread.__init__(self) 6 self.num = num 7 8 def run(self): 9 print("Hi %s threading" % self.num) 10 11 12 t1 = Mythread(1) 13 t2 = Mythread(2) 14 t1.start() 15 t2.start()
2、threading模块的join(),setDaemon()方法
join():在子线程完成运行之前,这个子线程的父线程将被阻塞。
setDaemon(True):将线程声明为守护线程,必须在start()方法之前进行设置。当线程设置为守护线程的话,不管线程任务有没有执行完,主线程执行完毕后都会随着主线程一起退出。
例子:
1 import threading,time 2 3 def to_sing(name): 4 print("Come to sing %s" % name) 5 time.sleep(5) 6 7 def to_dance(name): 8 print("Come to dance %s" % name) 9 time.sleep(3) 10 11 if __name__=="__main__": 12 sing = threading.Thread(target=to_sing,args=("Tom",)) 13 dance = threading.Thread(target=to_dance,args=("lisa",)) 14 15 sing.start() 16 dance.start() 17 sing.join() 18 dance.join() #join方法放在这里的话起到的效果就是后面的print会等到sing方法和dance方法都执行完以后才会执行。如果不放在这里的话效果又是不一样的。 19 print("party is end...")
1 import threading,time 2 3 def to_sing(name): 4 print("Come to sing %s" % name) 5 time.sleep(5) 6 7 def to_dance(name): 8 print("Come to dance %s" % name) 9 time.sleep(30) 10 11 if __name__=="__main__": 12 sing = threading.Thread(target=to_sing,args=("Tom",)) 13 dance = threading.Thread(target=to_dance,args=("lisa",)) 14 15 sing.start() 16 dance.setDaemon(True) #必须放在start前面,这个例子就会出现运行五秒后dance的线程就和主线程一起退出了,因为设置它被设置成了守护线程。 17 dance.start() 18 sing.join() 19 print("party is end...")
其他的方法:
# run(): 线程被cpu调度后自动执行线程对象的run方法 # start():启动线程活动。 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
3、同步锁,递归锁
(1)、不用同步锁的情况下,开启多线程
1 import time 2 import threading 3 4 def addNum(): 5 global num 6 7 temp = num 8 time.sleep(0.0001) 9 num = temp-1 10 11 num = 100 12 thread_list = [] 13 for i in range(100): 14 t = threading.Thread(target=addNum) 15 t.start() 16 thread_list.append(t) 17 18 for j in thread_list: 19 j.join() 20 21 print("final num",num) 22 23 #终端打印 24 final num 93
这个例子说明:当多线程操作同一个对象的时候很容易出现取到相同值的问题
使用同步锁解决问题
1 import time 2 import threading 3 4 def addNum(): 5 global num 6 r.acquire() #将运行过程锁住 7 temp = num 8 time.sleep(0.0001) 9 num = temp-1 10 r.release() #释放锁 11 12 num = 100 13 thread_list = [] 14 15 r = threading.Lock() #拿到多线程的锁 16 for i in range(100): 17 t = threading.Thread(target=addNum) 18 t.start() 19 thread_list.append(t) 20 21 for j in thread_list: 22 j.join() 23 24 print("final num",num) 25 26 #打印 27 final num 0
这个例子也可以说明一个问题,用同步锁的情况下就是我拿到了数据,在我执行完逻辑之前,其他线程等待。是一个串行操作了。
(2)、使用多把同步锁造成死锁的例子和使用递归锁解决死锁的问题
1 import time 2 import threading 3 4 class MyThread(threading.Thread): 5 def task_A(self): 6 lock_A.acquire() 7 print(self.name,"gotLockA",time.ctime()) 8 time.sleep(2) 9 lock_B.acquire() 10 print(self.name,"gotLockB",time.ctime()) 11 lock_B.release() 12 lock_A.release() 13 14 15 def task_B(self): 16 lock_B.acquire() 17 print(self.name, "gotLockB", time.ctime()) 18 time.sleep(2) 19 lock_A.acquire() 20 print(self.name, "gotLockA", time.ctime()) 21 lock_A.release() 22 lock_B.release() 23 24 def run(self): 25 self.task_A() 26 self.task_B() 27 28 lock_A = threading.Lock() 29 lock_B = threading.Lock() 30 31 t = [] 32 for i in range(5): 33 t.append(MyThread()) 34 for i in t : 35 i.start() 36 for j in t: 37 j.join() 38 39 #例子执行的流程 40 #开起5个线程,当第一个线程执行A,同时用了a锁和b锁,这个时候其他线程都在等待,当A线程执行完task_A释放了a、b锁后,执行任务task_B,拿到了B锁。这时线程B拿到了a锁把任务锁住了,等待b锁释放,但是b锁已经被A线程拿到,所以造成了死锁。
使用递归锁来解决以上问题
1 import time 2 import threading 3 4 class MyThread(threading.Thread): 5 def task_A(self): 6 lock_R.acquire() 7 print(self.name,"gotLockA",time.ctime()) 8 time.sleep(2) 9 lock_R.acquire() 10 print(self.name,"gotLockB",time.ctime()) 11 lock_R.release() 12 lock_R.release() 13 14 15 def task_B(self): 16 lock_R.acquire() 17 print(self.name, "gotLockB", time.ctime()) 18 time.sleep(2) 19 lock_R.acquire() 20 print(self.name, "gotLockA", time.ctime()) 21 lock_R.release() 22 lock_R.release() 23 24 def run(self): 25 self.task_A() 26 self.task_B() 27 28 # lock_A = threading.Lock() 29 # lock_B = threading.Lock() 30 lock_R = threading.RLock() #用递归锁代替同步锁 31 32 t = [] 33 for i in range(5): 34 t.append(MyThread()) 35 for i in t : 36 i.start() 37 for j in t: 38 j.join()
从上面一个例子来看,说明当任务拿到了递归锁,那么其他线程任务就只能等待,拿到递归锁的任务线程所有任务执行完毕后再争抢这把锁来让自己执行任务了。
递归锁:为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
4、同步条件
1 import threading,time 2 3 class Boss(threading.Thread): 4 def run(self): 5 print("Boss:Tonight we mast working to 11:00") 6 event.set() 7 print(event.isSet()) 8 time.sleep(5) 9 print("Boss:ok! we can go home.") 10 print(event.isSet()) 11 event.set() 12 13 class Worker(threading.Thread): 14 def run(self): 15 event.wait() 16 print("Worker:Oh my god!") 17 time.sleep(1) 18 event.clear() 19 event.wait() 20 print("Worker:OhYeah!") 21 22 if __name__ == "__main__": 23 event = threading.Event() 24 threads = [] 25 for i in range(5): 26 threads.append(Worker()) 27 threads.append(Boss()) 28 for t in threads: 29 t.start() 30 for t in threads: 31 t.join()
例子讲解:5个工人线程启动的时候,会监听等待一个事件,当老板线程启动了,时间被设定为True,5个工人线程收到事件后做出反映,然后清楚本次事件,再监听新的事件。
5、信号量
信号量是用来控制线程并发数的,BoundedSemaphore或Semaphonre管理一个内置的计数器,每当调用acquired()时-1,调用released()时+1
计数器不能小于0,当计数器为0时,acquire()将阻塞线程至同步锁定状态,知道其他线程调用released().
BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
1 import threading,time 2 3 class myThread(threading.Thread): 4 def run(self): 5 if semaphore.acquire(): 6 print(self.name) 7 time.sleep(3) 8 semaphore.release() 9 10 if __name__ == "__main__": 11 semaphore = threading.Semaphore(5) 12 t = [] 13 for i in range(100): 14 t.append(myThread()) 15 for j in t: 16 j.start()
6、队列
队列的方法
1 创建一个“队列”对象 2 import Queue 3 q = Queue.Queue(maxsize = 10) 4 Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。 5 6 将一个值放入队列中 7 q.put(10) 8 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为 9 1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。 10 11 将一个值从队列中取出 12 q.get() 13 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True, 14 get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。 15 16 Python Queue模块有三种队列及构造函数: 17 1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize) 18 2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize) 19 3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) 20 21 此包中的常用方法(q = Queue.Queue()): 22 q.qsize() 返回队列的大小 23 q.empty() 如果队列为空,返回True,反之False 24 q.full() 如果队列满了,返回True,反之False 25 q.full 与 maxsize 大小对应 26 q.get([block[, timeout]]) 获取队列,timeout等待时间 27 q.get_nowait() 相当q.get(False) 28 非阻塞 q.put(item) 写入队列,timeout等待时间 29 q.put_nowait(item) 相当q.put(item, False) 30 q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 31 q.join() 实际上意味着等到队列为空,再执行别的操作
队列有两种工作方式,一个是先进先出,一个是先进后出。大多数使用场景都是先进先出场景。
用队列来实现生产者消费者模型
1 import time,random 2 import queue,threading 3 4 q = queue.Queue() 5 6 def Producer(name): 7 count = 0 8 while count<100: 9 print("making .....") 10 time.sleep(random.randrange(4)) 11 q.put(count) 12 print("Producer %s has produced %s baozi.." % (name,count)) 13 count += 1 14 print("ok....") 15 16 def Consumer(name): 17 count = 0 18 while count < 100: 19 time.sleep(random.randrange(3)) 20 if not q.empty(): 21 data = q.get() 22 # q.task_done() 23 # q.join() 24 print(data) 25 print(‘\033[32;1mConsumer %s has eat %s baozi...\033[0m‘ % (name, data)) 26 else: 27 print("-----no baozi anymore----") 28 count += 1 29 30 p1 = threading.Thread(target=Producer,args=("A",)) 31 c1 = threading.Thread(target=Consumer,args=("B",)) 32 33 p1.start() 34 c1.start()
1、进程的两种调用方式(和线程类似)
(1)、直接调用
1 from multiprocessing import Process 2 import time 3 4 def foo(name): 5 time.sleep(1) 6 print("hello",name,time.ctime()) 7 8 if __name__=="__main__": 9 p_list=[] 10 for i in range(3): 11 p = Process(target=foo,args=(‘aaa‘,)) 12 p_list.append(p) 13 p.start() 14 for i in p_list: 15 p.join() 16 print("end")
(2)、类的继承方式调用
1 from multiprocessing import Process 2 import time 3 4 class MyProcess(Process): 5 def __init__(self,num): 6 super(MyProcess,self).__init__() 7 self.num=num 8 9 def run(self): 10 time.sleep(1) 11 print("hello",self.num,time.ctime()) 12 13 if __name__=="__main__": 14 p_list = [] 15 for i in range(3): 16 p = MyProcess(i) 17 p.start() 18 p_list.append(p) 19 for p in p_list: 20 p.join() 21 22 print("end")
2、Process类的方法
构造方法: Process([group [, target [, name [, args [, kwargs]]]]]) group: 线程组,目前还没有实现,库引用中提示必须是None; target: 要执行的方法; name: 进程名; args/kwargs: 要传入方法的参数。 实例方法: is_alive():返回进程是否在运行。 join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。 start():进程准备就绪,等待CPU调度 run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。 terminate():不管任务是否完成,立即停止工作进程 属性: daemon:和线程的setDeamon功能一样 name:进程名字。 pid:进程号。
3、进程间通信
(1)进程队列Queue
注意:进程是大写的Queue模块,线程是小写的queue模块。
1 from multiprocessing import Process,Queue 2 3 def foo(q,n): 4 q.put(n*n+1) 5 print("son process",id(q)) 6 7 if __name__=="__main__": 8 q = Queue() 9 print("main process",id(q)) 10 11 for i in range(3): 12 p = Process(target=foo,args=(q,i)) 13 p.start() 14 print(q.get()) 15 print(q.get()) 16 print(q.get())
(2)管道Pipe()
1 from multiprocessing import Process,Pipe 2 3 def foo(conn): 4 conn.send([12,{"name":"ptq"},"hello"]) 5 response = conn.recv() 6 print("response:",response) 7 conn.close() 8 print("q_ID2:", id(conn)) 9 10 if __name__=="__main__": 11 parent_conn,child_conn = Pipe() 12 print("q_ID1:", id(child_conn)) 13 p1 = Process(target=foo,args=(child_conn,)) 14 p1.start() 15 print("parent_conn",parent_conn.recv()) 16 parent_conn.send("hello son process") 17 p1.join()
(3)、共享内存Managers
1 from multiprocessing import Process,Manager 2 3 def foo(d,l,i): 4 d[i]=1 5 d[‘2‘]=2 6 l.append(i) 7 print("son process:",id(d),id(l)) 8 9 if __name__=="__main__": 10 with Manager() as manager: 11 d = manager.dict() 12 l = manager.list(range(5)) 13 print("main process: ",id(d),id(l)) 14 p_list = [] 15 for i in range(10): 16 p = Process(target=foo,args=(d,l,i)) 17 p.start() 18 p_list.append(p) 19 20 for p in p_list: 21 p.join()
(4)、进程池
进程池内部维护一个进程序列,当使用时,择取进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池的两个方法:
apply 这个方法效果就是串行的效果,不管池子里面有多少个进程可用。都是一个进程一个进程运行。
apply_async
1 from multiprocessing import Process,Pool 2 import time,os 3 4 def Foo(i): 5 time.sleep(1) 6 print(i) 7 return i+100 8 9 def Bar(arg):#回调函数调用,参数是上一个函数的返回值 10 print(os.getpid()) 11 print(os.getppid()) 12 print("logger:",arg) 13 14 if __name__== "__main__": 15 pool = Pool(5) 16 # Bar(1) 17 print("-----------------") 18 for i in range(10): 19 pool.apply_async(func=Foo,args=(i,),callback=Bar)#这里的callback是回调函数。 20 21 #这里是固定写法,变了就会报错 22 pool.close() 23 pool.join() 24 print("end")
标签:queue类 rom 自己的 集合 from producer strong alt 情况
原文地址:http://www.cnblogs.com/xiaoqianghuihui/p/6848995.html