标签:
threading中一个关键的组件是threading.Thread。
1 class Thread(_Verbose): 2 3 __initialized = False 4 5 __exc_info = _sys.exc_info 6 7 __exc_clear = _sys.exc_clear 8 9 def __init__(self, group=None, target=None, name=None, 10 args=(), kwargs=None, verbose=None): 11 12 assert group is None, "group argument must be None for now" 13 _Verbose.__init__(self, verbose) 14 if kwargs is None: 15 kwargs = {} 16 self.__target = target 17 self.__name = str(name or _newname()) 18 self.__args = args 19 self.__kwargs = kwargs 20 self.__daemonic = self._set_daemon() 21 self.__ident = None 22 self.__started = Event() 23 self.__stopped = False 24 self.__block = Condition(Lock()) 25 self.__initialized = True 26 # sys.stderr is not stored in the class like 27 # sys.exc_info since it can be changed between instances 28 self.__stderr = _sys.stderr 29 30 def _reset_internal_locks(self): 31 # private! Called by _after_fork() to reset our internal locks as 32 # they may be in an invalid state leading to a deadlock or crash. 33 if hasattr(self, ‘_Thread__block‘): # DummyThread deletes self.__block 34 self.__block.__init__() 35 self.__started._reset_internal_locks() 36 37 @property 38 def _block(self): 39 # used by a unittest 40 return self.__block 41 42 def _set_daemon(self): 43 # Overridden in _MainThread and _DummyThread 44 return current_thread().daemon 45 46 def __repr__(self): 47 assert self.__initialized, "Thread.__init__() was not called" 48 status = "initial" 49 if self.__started.is_set(): 50 status = "started" 51 if self.__stopped: 52 status = "stopped" 53 if self.__daemonic: 54 status += " daemon" 55 if self.__ident is not None: 56 status += " %s" % self.__ident 57 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) 58 59 def start(self): 60 61 if not self.__initialized: 62 raise RuntimeError("thread.__init__() not called") 63 if self.__started.is_set(): 64 raise RuntimeError("threads can only be started once") 65 if __debug__: 66 self._note("%s.start(): starting thread", self) 67 with _active_limbo_lock: 68 _limbo[self] = self 69 try: 70 _start_new_thread(self.__bootstrap, ()) 71 except Exception: 72 with _active_limbo_lock: 73 del _limbo[self] 74 raise 75 self.__started.wait() 76 77 def run(self): 78 79 try: 80 if self.__target: 81 self.__target(*self.__args, **self.__kwargs) 82 finally: 83 # Avoid a refcycle if the thread is running a function with 84 # an argument that has a member that points to the thread. 85 del self.__target, self.__args, self.__kwargs 86 87 def __bootstrap(self): 88 89 try: 90 self.__bootstrap_inner() 91 except: 92 if self.__daemonic and _sys is None: 93 return 94 raise 95 96 def _set_ident(self): 97 self.__ident = _get_ident() 98 99 def __bootstrap_inner(self): 100 try: 101 self._set_ident() 102 self.__started.set() 103 with _active_limbo_lock: 104 _active[self.__ident] = self 105 del _limbo[self] 106 if __debug__: 107 self._note("%s.__bootstrap(): thread started", self) 108 109 if _trace_hook: 110 self._note("%s.__bootstrap(): registering trace hook", self) 111 _sys.settrace(_trace_hook) 112 if _profile_hook: 113 self._note("%s.__bootstrap(): registering profile hook", self) 114 _sys.setprofile(_profile_hook) 115 116 try: 117 self.run() 118 except SystemExit: 119 if __debug__: 120 self._note("%s.__bootstrap(): raised SystemExit", self) 121 except: 122 if __debug__: 123 self._note("%s.__bootstrap(): unhandled exception", self) 124 125 if _sys and _sys.stderr is not None: 126 print>>_sys.stderr, ("Exception in thread %s:\n%s" % 127 (self.name, _format_exc())) 128 elif self.__stderr is not None: 129 130 exc_type, exc_value, exc_tb = self.__exc_info() 131 try: 132 print>>self.__stderr, ( 133 "Exception in thread " + self.name + 134 " (most likely raised during interpreter shutdown):") 135 print>>self.__stderr, ( 136 "Traceback (most recent call last):") 137 while exc_tb: 138 print>>self.__stderr, ( 139 ‘ File "%s", line %s, in %s‘ % 140 (exc_tb.tb_frame.f_code.co_filename, 141 exc_tb.tb_lineno, 142 exc_tb.tb_frame.f_code.co_name)) 143 exc_tb = exc_tb.tb_next 144 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) 145 146 finally: 147 del exc_type, exc_value, exc_tb 148 else: 149 if __debug__: 150 self._note("%s.__bootstrap(): normal return", self) 151 finally: 152 153 self.__exc_clear() 154 finally: 155 with _active_limbo_lock: 156 self.__stop() 157 try: 158 # We don‘t call self.__delete() because it also 159 # grabs _active_limbo_lock. 160 del _active[_get_ident()] 161 except: 162 pass 163 164 def __stop(self): 165 166 if not hasattr(self, ‘_Thread__block‘): 167 return 168 self.__block.acquire() 169 self.__stopped = True 170 self.__block.notify_all() 171 self.__block.release() 172 173 def __delete(self): 174 "Remove current thread from the dict of currently running threads." 175 176 try: 177 with _active_limbo_lock: 178 del _active[_get_ident()] 179 180 except KeyError: 181 if ‘dummy_threading‘ not in _sys.modules: 182 raise 183 184 def join(self, timeout=None): 185 186 if not self.__initialized: 187 raise RuntimeError("Thread.__init__() not called") 188 if not self.__started.is_set(): 189 raise RuntimeError("cannot join thread before it is started") 190 if self is current_thread(): 191 raise RuntimeError("cannot join current thread") 192 193 if __debug__: 194 if not self.__stopped: 195 self._note("%s.join(): waiting until thread stops", self) 196 self.__block.acquire() 197 try: 198 if timeout is None: 199 while not self.__stopped: 200 self.__block.wait() 201 if __debug__: 202 self._note("%s.join(): thread stopped", self) 203 else: 204 deadline = _time() + timeout 205 while not self.__stopped: 206 delay = deadline - _time() 207 if delay <= 0: 208 if __debug__: 209 self._note("%s.join(): timed out", self) 210 break 211 self.__block.wait(delay) 212 else: 213 if __debug__: 214 self._note("%s.join(): thread stopped", self) 215 finally: 216 self.__block.release() 217 218 @property 219 def name(self): 220 assert self.__initialized, "Thread.__init__() not called" 221 return self.__name 222 223 @name.setter 224 def name(self, name): 225 assert self.__initialized, "Thread.__init__() not called" 226 self.__name = str(name) 227 228 @property 229 def ident(self): 230 231 assert self.__initialized, "Thread.__init__() not called" 232 return self.__ident 233 234 def isAlive(self): 235 236 assert self.__initialized, "Thread.__init__() not called" 237 return self.__started.is_set() and not self.__stopped 238 239 is_alive = isAlive 240 241 @property 242 def daemon(self): 243 244 assert self.__initialized, "Thread.__init__() not called" 245 return self.__daemonic 246 247 @daemon.setter 248 def daemon(self, daemonic): 249 if not self.__initialized: 250 raise RuntimeError("Thread.__init__() not called") 251 if self.__started.is_set(): 252 raise RuntimeError("cannot set daemon status of active thread"); 253 self.__daemonic = daemonic 254 255 def isDaemon(self): 256 return self.daemon 257 258 def setDaemon(self, daemonic): 259 self.daemon = daemonic 260 261 def getName(self): 262 return self.name 263 264 def setName(self, name): 265 self.name = name 266 267 # The timer class was contributed by Itamar Shtull-Trauring
我们看到,在调用threading.Thread.start时,会在_limbo中记录线程,然后通过thread.start_new_thread创建原生线程,线程过程为_bootstrap,在_bootstrap中,会从_limbo中删除线程记录,转而将线程记录到_active中。然后调用run,通常用户从threading.Thread派生的class都会覆盖原有的run函数,这就实现了用户自定义的线程过程。
在threading.Thread中,维护着一个Condition对象_block,在run结束后,start方法在最后会调用_stop操作。在_stop这个操作里会调用Condition对象的notifyAll函数self.__block.notify_all()。通过维护的一个__waiters列表来进行release操作,通知所有等待该对象的线程。那么会有那些线程会等待这个对象呢?凡是希望等待该线程结束消息的线程,都会通过threading.Thread.join方法里调用wait操作将线程注册到__waiters列表里成为Condition对象的等待线程。
def wait(self, timeout=None): if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() self.__waiters.append(waiter) #将等待线程添加到维护的列表中被join调用 saved_state = self._release_save()
标签:
原文地址:http://www.cnblogs.com/chenchao1990/p/5104977.html