Queue的种类:
Queue.Queue(maxsize=0)
FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
示例:
import queue
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
输出:
Queue.LifoQueue(maxsize=0)
LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上
示例:
import queue
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
输出:
class Queue.PriorityQueue(maxsize=0)
构造一个优先队列。maxsize用法同上。
示例:
import queue
q =queue.PriorityQueue(3)
q.put((2,‘aaron‘))
q.put((-1,‘jim‘))
q.put((5,‘jack‘))
print(q.get())
print(q.get())
print(q.get())
输出:
(-1, ‘jim‘)
(2, ‘aaron‘)
(5, ‘jack‘)
基本方法:
Queue.Queue(maxsize=0) FIFO, 如果maxsize小于1就表示队列长度无限
Queue.LifoQueue(maxsize=0) LIFO, 如果maxsize小于1就表示队列长度无限
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 读队列,timeout等待时间
Queue.put(item, [block[, timeout]]) 写队列,timeout等待时间
Queue.queue.clear() 清空队列
其他:
task_done()
意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。
如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
join()
阻塞调用线程,直到队列中的所有任务被处理掉。
只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
import threading
import queue
def producer():
for i in range(10):
q.put("骨头 %s" % i )
print("开始等待所有的骨头被取走...")
q.join() #等待所有任务完成
print("所有的骨头被取完了...")
def consumer(n):
while q.qsize() >0:
print("%s 取到" %n , q.get())
q.task_done() #告知这个任务执行完了
q = queue.Queue()
p = threading.Thread(target=producer,)
p.start()
c1 = consumer("李闯")
注意:
- 如果没有q.task_done()的存在,就不会通知线程队列,生产者就会一直阻塞下去。
- 如果没有q.join()的存在,生产者就不会等待消费者,它会自行结束。
- q.task_done()方法和q.join()方法的存在,实现了生产者与消费者的同步机制。
import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
count = 0
while count <20:
time.sleep(random.randrange(3))
q.put(count)
print(‘Producer %s has produced %s baozi..‘ %(name, count))
count +=1
def Consumer(name):
count = 0
while count <20:
time.sleep(random.randrange(4))
if not q.empty():
data = q.get()
print(data)
print(‘\033[32;1mConsumer %s has eat %s baozi...\033[0m‘ %(name, data))
else:
print("-----no baozi anymore----")
count +=1
p1 = threading.Thread(target=Producer, args=(‘A‘,))
c1 = threading.Thread(target=Consumer, args=(‘B‘,))
p1.start()
c1.start()
这个实例,充分体现了线程队列在生产者消费者模型中,是如何解决生产者与消费者的强耦合的问题。
线程queue使用注意事项
1. 阻塞模式
import queue
q = queue.Queue(10) #创建一个队列
......
for i in range(10):
q.put(‘A‘)
time.sleep(0.5)
这是一段极其简单的代码(另有两个线程也在操作队列q),我期望每隔0.5秒写一个‘A‘到队列中,但总是不能如愿:间隔时间有时会远远超过0.5秒。原来,Queue.put()默认有 block = True 和 timeou 两个参数。当 block = True 时,写入是阻塞式的,阻塞时间由 timeou 确定。当队列q被(其他线程)写满后,这段代码就会阻塞,直至其他线程取走数据。Queue.put()方法加上 block=False 的参数,即可解决这个隐蔽的问题。但要注意,非阻塞方式写队列,当队列满时会抛出 exception Queue.Full 的异常。
2. 无法捕获 exception Queue.Empty 的异常
while True:
......
try:
data = q.get()
except Queue.Empty:
break
本意是用队列为空时,退出循环,但实际运行起来,却陷入了死循环。这个问题和上面有点类似:Queue.get()默认的也是阻塞方式读取数据,队列为空时,不会抛出 except Queue.Empty ,而是进入阻塞直至超时。 给get方法的参数block=False 的参数,问题迎刃而解。