标签:div ble stop python main val super async reading
import threading import time import inspect import ctypes def _async_raise(tid, exctype): if not inspect.isclass(exctype): raise TypeError("Only types can be raised (not instances)") res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") def stop_thread(thread): _async_raise(thread.ident, SystemExit) class TestThread(threading.Thread): def run(self): print("begin run the child thread") while True: print("sleep 1s") time.sleep(1) if __name__ == "__main__": print("begin run main thread") t = TestThread() t.start() time.sleep(3) stop_thread(t) print("main thread end")
# encoding:utf-8 import time import threading class StoppableThread(threading.Thread): """Thread class with a stop() method. The thread itself has to check regularly for the stopped() condition.""" def __init__(self, *args, **kwargs): super(StoppableThread, self).__init__(*args, **kwargs) self._stop_event = threading.Event() def stop(self): self._stop_event.set() def stopped(self): return self._stop_event.is_set() def run(self): print("begin run the child thread") while True: print("sleep 1s") time.sleep(1) if self.stopped(): # 做一些必要的收尾工作 break if __name__ == "__main__": print("begin run main thread") t = StoppableThread() t.start() time.sleep(3) t.stop() print("main thread end")
标签:div ble stop python main val super async reading
原文地址:https://www.cnblogs.com/chunqiu666/p/13180605.html