标签:nbsp between threading 完成 drop return deque block waiting
补充一个多线程场景下常使用的工具-Queue。
第一步:__init__()
1 class Queue: 2 3 def __init__(self, maxsize=0): 4 self.maxsize = maxsize 5 self._init(maxsize) 6 7 # mutex must be held whenever the queue is mutating. All methods 8 # that acquire mutex must release it before returning. mutex 9 # is shared between the three conditions, so acquiring and 10 # releasing the conditions also acquires and releases mutex. 11 self.mutex = threading.Lock() 12 13 # Notify not_empty whenever an item is added to the queue; a 14 # thread waiting to get is notified then. 15 self.not_empty = threading.Condition(self.mutex) 16 17 # Notify not_full whenever an item is removed from the queue; 18 # a thread waiting to put is notified then. 19 self.not_full = threading.Condition(self.mutex) 20 21 # Notify all_tasks_done whenever the number of unfinished tasks 22 # drops to zero; thread waiting to join() is notified to resume 23 self.all_tasks_done = threading.Condition(self.mutex) 24 self.unfinished_tasks = 0
看看都维护了些什么内容。
4-5行,一个队列大小和队列的具体容器对象。这里的容器对象是 collections.deque ;
11-23行,一个线程锁,已经共用线程锁的三个条件对象;条件对象用于方便的唤醒阻塞的线程,共用的线程锁保证了线程安全性;
第24行,一个计数器,记录需要被完成的任务数量,包括队列中的和被取出且正在完成的;
第二步:put()和get()
1 def put(self, item, block=True, timeout=None): 2 with self.not_full: 3 if self.maxsize > 0: 4 ... 5 elif timeout is None: 6 while self._qsize() >= self.maxsize: 7 self.not_full.wait() 8 ... 9 self._put(item) 10 self.unfinished_tasks += 1 11 self.not_empty.notify() 12 13 14 def get(self, block=True, timeout=None): 15 with self.not_empty: 16 ... 17 elif timeout is None: 18 while not self._qsize(): 19 self.not_empty.wait() 20 ... 21 item = self._get() 22 self.not_full.notify() 23 return item
修剪一些干扰的代码,逻辑顿时明朗不少。
主要用到就是两个条件变量之间的互相唤醒
需要主要的就是第10行:对计数器加一,代表未完成任务加一,这个计数会在下面的 join() 中使用
第三步:join()和task_done()
1 def join(self): 2 with self.all_tasks_done: 3 while self.unfinished_tasks: 4 self.all_tasks_done.wait() 5 6 def task_done(self): 7 with self.all_tasks_done: 8 unfinished = self.unfinished_tasks - 1 9 if unfinished <= 0: 10 if unfinished < 0: 11 raise ValueError(‘task_done() called too many times‘) 12 self.all_tasks_done.notify_all() 13 self.unfinished_tasks = unfinished
2-4行,在上锁的情况下对计数器进行判断。任务未归零,继续阻塞主线程
7-8行,对计数器进行减一并唤醒主线程进行检测
理解Condition后,对Queue的解读顿时就舒适多了。利用Condition唤醒特定线程的特性,再加上线程锁Lock,实现一个线程安全的队列是在太轻松。另外,queue库里也有将列表做为底层容器的队列实现,感兴趣的可以看看。
标签:nbsp between threading 完成 drop return deque block waiting
原文地址:https://www.cnblogs.com/buwu/p/12805583.html