标签:stat done 任务创建 之间 position except tee pen 设计
任务创建使用create_task方法。
def create_task(self, coro): """Schedule a coroutine object. Return a task object. """ self._check_closed() if self._task_factory is None: task = tasks.Task(coro, loop=self) if task._source_traceback: del task._source_traceback[-1] else: task = self._task_factory(self, coro) return task
Task()实例化。
def __init__(self, coro, *, loop=None): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] if not coroutines.iscoroutine(coro): # raise after Future.__init__(), attrs are required for __del__ # prevent logging for pending task in __del__ self._log_destroy_pending = False raise TypeError(f"a coroutine was expected, got {coro!r}") self._must_cancel = False self._fut_waiter = None self._coro = coro self._context = contextvars.copy_context() self._loop.call_soon(self.__step, context=self._context) _register_task(self)
self._loop.call_soon(self.__step, context=self._context)
call_soon的作用是把方法添加到loop的预执行队列中。
也就是loop._ready,它是collections.deque()
具体实现后文单列章节。
它添加的是Task._step()方法
def __step(self, exc=None): if self.done(): raise futures.InvalidStateError( f‘_step(): already done: {self!r}, {exc!r}‘) if self._must_cancel: if not isinstance(exc, futures.CancelledError): exc = futures.CancelledError() self._must_cancel = False coro = self._coro self._fut_waiter = None _enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc is None: # We use the `send` method directly, because coroutines # don‘t have `__iter__` and `__next__` methods. result = coro.send(None) else: result = coro.throw(exc) except StopIteration as exc: if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False super().set_exception(futures.CancelledError()) else: super().set_result(exc.value) except futures.CancelledError: super().cancel() # I.e., Future.cancel(self). except Exception as exc: super().set_exception(exc)
去掉不太重要的部分后,核心就这么几句:
coro = self._coro = 传入的协程对象
try, send, throw, StopIteration,CancelledError
就就比较熟悉了,基本就是send值,再根据
def run_forever(self): """Run until stop() is called.""" self._check_closed() self._check_runnung() self._set_coroutine_origin_tracking(self._debug) self._thread_id = threading.get_ident() old_agen_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) try: events._set_running_loop(self) # 将loop,pid保存到_RunningLoop()类中,主要是保存相关信息 while True: self._run_once() # 核心句 if self._stopping: break finally: self._stopping = False self._thread_id = None events._set_running_loop(None) self._set_coroutine_origin_tracking(False) sys.set_asyncgen_hooks(*old_agen_hooks)
其它的开始循环基本都是调用这个方法。
核心就是while循环了,关键句是self._run_once()
def _run_once(self): """Run one full iteration of the event loop. This calls all currently ready callbacks, polls for I/O, schedules the resulting callbacks, and finally schedules ‘call_later‘ callbacks. """ ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: logger.warning(‘Executing %s took %.3f seconds‘, _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() #核心句
运行loop的全部迭代。它调用就绪态的callbacks,轮询I/O,安排回调,最终执行call_later。
最终做的事有两件:
def call_soon(self, callback, *args, context=None): """Arrange for a callback to be called as soon as possible. This operates as a FIFO queue: callbacks are called in the order in which they are registered. Each callback will be called exactly once. Any positional arguments after the callback will be passed to the callback when it is called. """ self._check_closed() if self._debug: self._check_thread() self._check_callback(callback, ‘call_soon‘) handle = self._call_soon(callback, args, context) if handle._source_traceback: del handle._source_traceback[-1] return handle def _call_soon(self, callback, args, context): handle = events.Handle(callback, args, self, context) if handle._source_traceback: del handle._source_traceback[-1] self._ready.append(handle) return handle
关键句
构建 handle = events.Handle(callback, args, self, context)
添加到待执行队列 self._ready.append(handle)
总结一下asyncio的实现思路
有一个任务调度器event loop,我们可以把需要执行的coroutine打包成task加入到event loop的调度列表里面(以Handle形式)。
在event loop的每个帧里面,它会检查需要执行那些task,然后运行这些task,可能拿到最终结果,也可能执行一半继续await别的任务,任务之间互相wait,通过回调来把任务串联起来(后面常用接口会继续深入介绍,实现细节见附录2)。
任务可能会依赖别的IO消息,在每一帧,event loop都会用selector处理相应的消息,执行相应的callback函数。
我们当前的介绍里,只有一个event loop,这个event loop跑在主线程里面。当然,event loop还可以开线程池处理别的任务,或者,多个线程里执行多个event loop,他们之间还有交互,我们这里不在介绍。
单个event loop跑在单个线程有个好处,只要自己不主动await,就会一直占有主线程,换句话说,同步函数一定没有数据冲突(data racing)。对比多线程方案,如果需要处理数据冲突,就需要加锁了,这在很多情况下会降低程序的性能。所以协程这种设计思路,非常适合有多个用户、但是每个用户之间没有共享数据的场景。如果需要实现并行,多开几个进程就行了。
标签:stat done 任务创建 之间 position except tee pen 设计
原文地址:https://www.cnblogs.com/wodeboke-y/p/12865509.html