码迷,mamicode.com
首页 > 其他好文 > 详细

Threading Module概述(三)

时间:2016-01-06 13:34:08      阅读:299      评论:0      收藏:0      [点我收藏+]

标签:

Threading中的Thread

  threading中一个关键的组件是threading.Thread。

 

技术分享
  1 class Thread(_Verbose):
  2 
  3     __initialized = False
  4 
  5     __exc_info = _sys.exc_info
  6 
  7     __exc_clear = _sys.exc_clear
  8 
  9     def __init__(self, group=None, target=None, name=None,
 10                  args=(), kwargs=None, verbose=None):
 11 
 12         assert group is None, "group argument must be None for now"
 13         _Verbose.__init__(self, verbose)
 14         if kwargs is None:
 15             kwargs = {}
 16         self.__target = target
 17         self.__name = str(name or _newname())
 18         self.__args = args
 19         self.__kwargs = kwargs
 20         self.__daemonic = self._set_daemon()
 21         self.__ident = None
 22         self.__started = Event()
 23         self.__stopped = False
 24         self.__block = Condition(Lock())
 25         self.__initialized = True
 26         # sys.stderr is not stored in the class like
 27         # sys.exc_info since it can be changed between instances
 28         self.__stderr = _sys.stderr
 29 
 30     def _reset_internal_locks(self):
 31         # private!  Called by _after_fork() to reset our internal locks as
 32         # they may be in an invalid state leading to a deadlock or crash.
 33         if hasattr(self, _Thread__block):  # DummyThread deletes self.__block
 34             self.__block.__init__()
 35         self.__started._reset_internal_locks()
 36 
 37     @property
 38     def _block(self):
 39         # used by a unittest
 40         return self.__block
 41 
 42     def _set_daemon(self):
 43         # Overridden in _MainThread and _DummyThread
 44         return current_thread().daemon
 45 
 46     def __repr__(self):
 47         assert self.__initialized, "Thread.__init__() was not called"
 48         status = "initial"
 49         if self.__started.is_set():
 50             status = "started"
 51         if self.__stopped:
 52             status = "stopped"
 53         if self.__daemonic:
 54             status += " daemon"
 55         if self.__ident is not None:
 56             status += " %s" % self.__ident
 57         return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
 58 
 59     def start(self):
 60 
 61         if not self.__initialized:
 62             raise RuntimeError("thread.__init__() not called")
 63         if self.__started.is_set():
 64             raise RuntimeError("threads can only be started once")
 65         if __debug__:
 66             self._note("%s.start(): starting thread", self)
 67         with _active_limbo_lock:
 68             _limbo[self] = self
 69         try:
 70             _start_new_thread(self.__bootstrap, ())
 71         except Exception:
 72             with _active_limbo_lock:
 73                 del _limbo[self]
 74             raise
 75         self.__started.wait()
 76 
 77     def run(self):
 78 
 79         try:
 80             if self.__target:
 81                 self.__target(*self.__args, **self.__kwargs)
 82         finally:
 83             # Avoid a refcycle if the thread is running a function with
 84             # an argument that has a member that points to the thread.
 85             del self.__target, self.__args, self.__kwargs
 86 
 87     def __bootstrap(self):
 88 
 89         try:
 90             self.__bootstrap_inner()
 91         except:
 92             if self.__daemonic and _sys is None:
 93                 return
 94             raise
 95 
 96     def _set_ident(self):
 97         self.__ident = _get_ident()
 98 
 99     def __bootstrap_inner(self):
100         try:
101             self._set_ident()
102             self.__started.set()
103             with _active_limbo_lock:
104                 _active[self.__ident] = self
105                 del _limbo[self]
106             if __debug__:
107                 self._note("%s.__bootstrap(): thread started", self)
108 
109             if _trace_hook:
110                 self._note("%s.__bootstrap(): registering trace hook", self)
111                 _sys.settrace(_trace_hook)
112             if _profile_hook:
113                 self._note("%s.__bootstrap(): registering profile hook", self)
114                 _sys.setprofile(_profile_hook)
115 
116             try:
117                 self.run()
118             except SystemExit:
119                 if __debug__:
120                     self._note("%s.__bootstrap(): raised SystemExit", self)
121             except:
122                 if __debug__:
123                     self._note("%s.__bootstrap(): unhandled exception", self)
124 
125                 if _sys and _sys.stderr is not None:
126                     print>>_sys.stderr, ("Exception in thread %s:\n%s" %
127                                          (self.name, _format_exc()))
128                 elif self.__stderr is not None:
129                    
130                     exc_type, exc_value, exc_tb = self.__exc_info()
131                     try:
132                         print>>self.__stderr, (
133                             "Exception in thread " + self.name +
134                             " (most likely raised during interpreter shutdown):")
135                         print>>self.__stderr, (
136                             "Traceback (most recent call last):")
137                         while exc_tb:
138                             print>>self.__stderr, (
139                                   File "%s", line %s, in %s %
140                                 (exc_tb.tb_frame.f_code.co_filename,
141                                     exc_tb.tb_lineno,
142                                     exc_tb.tb_frame.f_code.co_name))
143                             exc_tb = exc_tb.tb_next
144                         print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
145                    
146                     finally:
147                         del exc_type, exc_value, exc_tb
148             else:
149                 if __debug__:
150                     self._note("%s.__bootstrap(): normal return", self)
151             finally:
152               
153                 self.__exc_clear()
154         finally:
155             with _active_limbo_lock:
156                 self.__stop()
157                 try:
158                     # We don‘t call self.__delete() because it also
159                     # grabs _active_limbo_lock.
160                     del _active[_get_ident()]
161                 except:
162                     pass
163 
164     def __stop(self):
165 
166         if not hasattr(self, _Thread__block):
167             return
168         self.__block.acquire()
169         self.__stopped = True
170         self.__block.notify_all()
171         self.__block.release()
172 
173     def __delete(self):
174         "Remove current thread from the dict of currently running threads."
175 
176         try:
177             with _active_limbo_lock:
178                 del _active[_get_ident()]
179                
180         except KeyError:
181             if dummy_threading not in _sys.modules:
182                 raise
183 
184     def join(self, timeout=None):
185 
186         if not self.__initialized:
187             raise RuntimeError("Thread.__init__() not called")
188         if not self.__started.is_set():
189             raise RuntimeError("cannot join thread before it is started")
190         if self is current_thread():
191             raise RuntimeError("cannot join current thread")
192 
193         if __debug__:
194             if not self.__stopped:
195                 self._note("%s.join(): waiting until thread stops", self)
196         self.__block.acquire()
197         try:
198             if timeout is None:
199                 while not self.__stopped:
200                     self.__block.wait()
201                 if __debug__:
202                     self._note("%s.join(): thread stopped", self)
203             else:
204                 deadline = _time() + timeout
205                 while not self.__stopped:
206                     delay = deadline - _time()
207                     if delay <= 0:
208                         if __debug__:
209                             self._note("%s.join(): timed out", self)
210                         break
211                     self.__block.wait(delay)
212                 else:
213                     if __debug__:
214                         self._note("%s.join(): thread stopped", self)
215         finally:
216             self.__block.release()
217 
218     @property
219     def name(self):
220         assert self.__initialized, "Thread.__init__() not called"
221         return self.__name
222 
223     @name.setter
224     def name(self, name):
225         assert self.__initialized, "Thread.__init__() not called"
226         self.__name = str(name)
227 
228     @property
229     def ident(self):
230       
231         assert self.__initialized, "Thread.__init__() not called"
232         return self.__ident
233 
234     def isAlive(self):
235        
236         assert self.__initialized, "Thread.__init__() not called"
237         return self.__started.is_set() and not self.__stopped
238 
239     is_alive = isAlive
240 
241     @property
242     def daemon(self):
243       
244         assert self.__initialized, "Thread.__init__() not called"
245         return self.__daemonic
246 
247     @daemon.setter
248     def daemon(self, daemonic):
249         if not self.__initialized:
250             raise RuntimeError("Thread.__init__() not called")
251         if self.__started.is_set():
252             raise RuntimeError("cannot set daemon status of active thread");
253         self.__daemonic = daemonic
254 
255     def isDaemon(self):
256         return self.daemon
257 
258     def setDaemon(self, daemonic):
259         self.daemon = daemonic
260 
261     def getName(self):
262         return self.name
263 
264     def setName(self, name):
265         self.name = name
266 
267 # The timer class was contributed by Itamar Shtull-Trauring
threading.Thread

  我们看到,在调用threading.Thread.start时,会在_limbo中记录线程,然后通过thread.start_new_thread创建原生线程,线程过程为_bootstrap,在_bootstrap中,会从_limbo中删除线程记录,转而将线程记录到_active中。然后调用run,通常用户从threading.Thread派生的class都会覆盖原有的run函数,这就实现了用户自定义的线程过程。

  在threading.Thread中,维护着一个Condition对象_block,在run结束后,start方法在最后会调用_stop操作。在_stop这个操作里会调用Condition对象的notifyAll函数self.__block.notify_all()。通过维护的一个__waiters列表来进行release操作,通知所有等待该对象的线程。那么会有那些线程会等待这个对象呢?凡是希望等待该线程结束消息的线程,都会通过threading.Thread.join方法里调用wait操作将线程注册到__waiters列表里成为Condition对象的等待线程。

技术分享
def wait(self, timeout=None):

        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self.__waiters.append(waiter)  #将等待线程添加到维护的列表中被join调用
        saved_state = self._release_save()
__block.wait()

 

Threading Module概述(三)

标签:

原文地址:http://www.cnblogs.com/chenchao1990/p/5104977.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!