标签:
上篇博客简单介绍了多进程和多线程分别是什么,及分别使用于那种场景。
这里再稍微聊聊线程和进程相关的东西以及协程
一、队列
import queue import threading # queue.Queue,先进先出队列 # queue.LifoQueue,后进先出队列 # queue.PriorityQueue,优先级队列 # queue.deque,双向对队 # queue.Queue(2) 先进先出队列 # put放数据,是否阻塞,阻塞时的超时时间 # get取数据(默认阻塞),是否阻塞,阻塞时的超时时间 # qsize()真实个数 # maxsize 最大支持的个数 # join,task_done,阻塞进程,当队列中任务执行完毕之后,不再阻塞 q = queue.Queue(2) #先进先出队列(括号中的参数为maxsize,代表队列中最多能有几个值,默认为0,代表可以有无限个值) print(q.qsize()) #查看目前队列中有几个值 q.put(11) #put方法为给队列中放一个值 q.put(22) q.put(block=False,timeout=2) #block代表是否阻塞,默认为阻塞,为非阻塞时,当进入队列需要等待时直接报错。timeout代表最多等待时间,超过等待时间直接报错 print(q.get()) #get方法为取一个值 q.task_done() #task_done方法为告诉队列我取完值了 print(q.get()) q.task_done() q.join() #阻塞进程,当队列中任务执行完时,不再阻塞 print(q.empty()) #若当前队列中有任务,返回False,否则返回True #生产者消费者模型 #解决并发问题 q = queue.Queue(20) def productor(arg): q.put(‘包子‘) def customer(arg): q.get() for i in range(10): t1 = threading.Thread(target=productor,args=(i,)) t1.start()
二、及其及其简易版线程池
#!/usr/bin/env python # _*_ coding:utf-8_*_ import threading import queue import time class threadpool: def __init__(self,maxsize=5): self.maxsize = maxsize #默认最大线程为5 self._q = queue.Queue(maxsize) #创建队列,并设置队列中最多可以有5个值,既最大线程数为5 for i in range(maxsize): self._q.put(threading.Thread) #创建线程对象,将其放入到队列中 #self._q为[threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread] def get_thread(self): """ 获取线程函数 :return: 队列中实际存放的为一个一个的线程对象,故return的一个线程对象,即threading.Thread """ return self._q.get() def add_thread(self): self._q.put(threading.Thread) #线程池中要保持有5个线程,当使用掉一个线程时,再重新put到队列中一个线程对象 pool = threadpool() def task(arg): print(arg) time.sleep(1) pool.add_thread() for i in range(100): t = pool.get_thread() #这里t为threading.Thread obj = t(target=task, args=(i,)) obj.start()
三、线程锁
import threading import time #放行单个线程 NUM = 10 def func(l): l.acquire() #上锁 global NUM NUM -= 1 time.sleep(1) print(NUM) l.release() #解锁 lock = threading.RLock() #创建线程锁 #threading.LOCK 和 threading.RLOCK的区别:RLOCK支持多层锁,LOCK不支持,所以通常情况下使用RLOCK即可 for i in range(10): t1 = threading.Thread(target=func,args=(lock,)) t1.start() #放行指定个数的线程------信号量 NUM = 20 def func(l): l.acquire() #上锁 --- 一次放行五个线程 global NUM NUM -= 1 time.sleep(1) print(NUM) l.release() #解锁 lock = threading.BoundedSemaphore(5) #5代表依次放行几个线程 for i in range(20): t1 = threading.Thread(target=func,args=(lock,)) t1.start() #t1.start()仅代表线程创建完毕,等待CPU来调度执行,但是CPU调度是随机的,so.. 输出的结果没按照递减的顺序输出 #放行全部线程 NUM = 10 def func(e): global NUM NUM -= 1 e.wait() #检测是什么灯,(默认即为红灯),红灯停,绿灯行 print(NUM) print(NUM+100) event = threading.Event() #事务,可比喻为红绿灯,红灯全部锁住,绿灯全部放行 for i in range(10): t1 = threading.Thread(target=func,args=(event,)) t1.start() event.clear() #设置为红灯(默认即为红灯) inp = input(">>: ") if inp == ‘1‘: event.set() #设置为绿灯 #条件锁 condition #当某个条件成立,放行指定个数的锁 def func(i,conn): print(i) conn.acquire() conn.wait() print(i+100) conn.release() c = threading.Condition() for i in range(10): t = threading.Thread(target=func,args=(i,c)) t.start() while True: inp = input(">>: ") if inp == ‘q‘: break c.acquire() c.notify(int(inp)) #这三行为固定用法,notify的作用为告诉func里的函数acquire放行几个锁 c.release() #条件锁的另外一种方式 def condition(): ret = False r = input(">>: ") if r == ‘true‘: ret = True return ret def func(i,conn): print(i) conn.acquire() conn.wait_for(condition) #condithon会return两种结果,一种为True,一种为False,返回值为True时,放行一个锁,为False时,不放行 print(i+100) conn.release() c = threading.Condition() for i in range(10): t = threading.Thread(target=func,args=(i,c,)) t.start() #Timer from threading import Timer def hello(): print(‘hello world‘) t = Timer(1,hello) t.start() #一秒后,执行hello函数
四、进程池
from multiprocessing import Pool import time def foo(arg): time.sleep(0.1) print(arg) if __name__ == ‘__main__‘: pool = Pool(5) for i in range(20): pool.apply_async(func=foo,args=(i,)) pool.close() #所有任务执行完毕后关闭进程池 time.sleep(0.2) pool.terminate() #立即终止正在执行的任务,关闭进程池 pool.join() print(‘end‘)
五、实现进程间数据共享
""" 默认情况下进程之间数据是不共享的 这里分别使用queues,Array,Manager来实现进程间数据共享 """ from multiprocessing import Process from multiprocessing import queues import multiprocessing def foo(i,arg): arg.put(i) print(‘say hi‘,i,arg.qsize()) if __name__ == "__main__": li = queues.Queue(20,ctx=multiprocessing) #类似queue队列 for i in range(10): p = Process(target=foo,args=(i,li)) p.start() # p.join() from multiprocessing import Process from multiprocessing import Array #Array 数组,类似列表,但必须在创建时指定数组中存放的数据类型及数据的个数 def foo(i,arg): arg[i] = i + 100 for item in arg: print(item) print(‘===============‘) if __name__ == ‘__main__‘: li = Array(‘i‘,10) for i in range(10): p = Process(target=foo,args=(i,li,)) p.start() # p.join() from multiprocessing import Process from multiprocessing import Manager def foo(i,arg): arg[i] = i + 100 print(arg.values()) if __name__ == ‘__main__‘: obj = Manager() li = obj.dict() #Manager.dict相当于Python中的字典 for i in range(10): p = Process(target=foo,args=(i,li,)) p.start() p.join() #li属于主进程,p是子进程,主进程与子进程之间交互时会创建一个连接,这里不使用join时会报错,因为主进程执行完后主进程与子进程之间的连接就会断开,so,这里要让主进程等待子进程执行完毕
day11 队列、线程、进程、协程及Python使用缓存(redis/memcache)
标签:
原文地址:http://www.cnblogs.com/xuanouba/p/5693011.html