标签:des style io ar for 文件 art cti sp
cdef libev.ev_loop* _ptr #libev的loop的引用 cdef public object error_handler cdef libev.ev_prepare _prepare #这个prepare事件,每次在loop之前,都会调用它的回调 cdef public list _callbacks #当前loop对象上面挂起的所有回调的链表 cdef libev.ev_timer _timer0 #一个超时为0的timer,用于让loop立即返回
#构造函数 def __init__(self, object flags=None, object default=None, size_t ptr=0): cdef unsigned int c_flags cdef object old_handler = None #这个是注册每次event loop 之前的回调,每次loop开始之前,就会执行这里 #其实最终调用的时_run_callbacks方法 libev.ev_prepare_init(&self._prepare, <void*>gevent_run_callbacks) #ifdef _WIN32 libev.ev_timer_init(&self._periodic_signal_checker, <void*>gevent_periodic_signal_check, 0.3, 0.3) #endif #注册这个timer的回调,这个回调其实什么都不做,主要的目的就是让event loop的循环立即退出,去处理回调 libev.ev_timer_init(&self._timer0, <void*>gevent_noop, 0.0, 0.0) if ptr: self._ptr = <libev.ev_loop*>ptr else: c_flags = _flags_to_int(flags) _check_flags(c_flags) c_flags |= libev.EVFLAG_NOENV if default is None: default = True if _default_loop_destroyed: default = False if default: self._ptr = libev.gevent_ev_default_loop(c_flags) if not self._ptr: raise SystemError("ev_default_loop(%s) failed" % (c_flags, )) #ifdef _WIN32 libev.ev_timer_start(self._ptr, &self._periodic_signal_checker) libev.ev_unref(self._ptr) #endif else: #创建loop的引用,一般情况下都是在这里创建的 self._ptr = libev.ev_loop_new(c_flags) if not self._ptr: raise SystemError("ev_loop_new(%s) failed" % (c_flags, )) if default or __SYSERR_CALLBACK is None: set_syserr_cb(self._handle_syserr) #在loop上面启动prepare,这样可以保证每次loop之前都执行了prepare的回调,也就是执行在loop上面注册的callback libev.ev_prepare_start(self._ptr, &self._prepare) libev.ev_unref(self._ptr) self._callbacks = [] #初始化回调链表
#在gevent_run_callbacks中其实也是调用这个方法来具体的运行所有的回调 #在每一次运行loop之前,都会执行这些回调 cdef _run_callbacks(self): cdef callback cb cdef object callbacks cdef int count = 1000 libev.ev_timer_stop(self._ptr, &self._timer0) while self._callbacks and count > 0: callbacks = self._callbacks self._callbacks = [] for cb in callbacks: libev.ev_unref(self._ptr) gevent_call(self, cb) count -= 1 #在运行回调的时候有可能又加入了回调,所以这里进行定时 #因为这个定时的超时是0,所以让loop立即返回,可以处理回调 if self._callbacks: libev.ev_timer_start(self._ptr, &self._timer0)
#运行loop对象 def run(self, nowait=False, once=False): CHECK_LOOP2(self) cdef unsigned int flags = 0 if nowait: flags |= libev.EVRUN_NOWAIT if once: flags |= libev.EVRUN_ONCE with nogil: #这里主要就是运行libev的loop libev.ev_run(self._ptr, flags)
好了。。。其实到这里loop的基本上的东西 就说的差不多了。。。。
对于I/O watcher的创建,一般情况下是在loop上面调用io方法:
#创建I/O watcher #ifdef _WIN32 def io(self, libev.vfd_socket_t fd, int events, ref=True, priority=None): return io(self, fd, events, ref, priority) #else def io(self, int fd, int events, ref=True, priority=None): return io(self, fd, events, ref, priority) #endif
这里来看看IO watcher的源码定义吧:
#I/Owatcher的定义 cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]: WATCHER_BASE(io) #通过这个宏定义了一些基本的属性,例如libev的 watcher 引用等 #这个其实其实就是启动watcher def start(self, object callback, *args, pass_events=False): CHECK_LOOP2(self.loop) if callback is None: raise TypeError('callback must be callable, not None') self.callback = callback if pass_events: self.args = (GEVENT_CORE_EVENTS, ) + args else: self.args = args LIBEV_UNREF libev.ev_io_start(self.loop._ptr, &self._watcher) #在libev的loop上面启动这个io watcher PYTHON_INCREF ACTIVE PENDING #ifdef _WIN32 #io watcher的构造 def __init__(self, loop loop, libev.vfd_socket_t fd, int events, ref=True, priority=None): if events & ~(libev.EV__IOFDSET | libev.EV_READ | libev.EV_WRITE): raise ValueError('illegal event mask: %r' % events) cdef int vfd = libev.vfd_open(fd) libev.vfd_free(self._watcher.fd) #初始化,并设置回调为gevent_callback_io #在回调里面,会执行最开始watcher注册的回调,并会取消读写事件的注册 libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, vfd, events) self.loop = loop if ref: self._flags = 0 else: self._flags = 4 if priority is not None: libev.ev_set_priority(&self._watcher, priority) #else #一般就考虑这里的构造就好了,三个重要的参数分别是loop对象的引用,文件描述符,事件类型,一般情况下都不会带有优先级的 def __init__(self, loop loop, int fd, int events, ref=True, priority=None): if fd < 0: raise ValueError('fd must be non-negative: %r' % fd) if events & ~(libev.EV__IOFDSET | libev.EV_READ | libev.EV_WRITE): raise ValueError('illegal event mask: %r' % events) #调用libev的方法来初始化I/O watcher libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, fd, events) self.loop = loop if ref: self._flags = 0 else: self._flags = 4 if priority is not None: libev.ev_set_priority(&self._watcher, priority) #endif property fd: def __get__(self): return libev.vfd_get(self._watcher.fd) def __set__(self, long fd): if libev.ev_is_active(&self._watcher): raise AttributeError("'io' watcher attribute 'fd' is read-only while watcher is active") cdef int vfd = libev.vfd_open(fd) libev.vfd_free(self._watcher.fd) libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, vfd, self._watcher.events) property events: def __get__(self): return self._watcher.events def __set__(self, int events): if libev.ev_is_active(&self._watcher): raise AttributeError("'io' watcher attribute 'events' is read-only while watcher is active") libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, self._watcher.fd, events) property events_str: def __get__(self): return _events_to_str(self._watcher.events) def _format(self): return ' fd=%s events=%s' % (self.fd, self.events_str)
从构造函数中可以看到调用了libev的ev_io_init方法来初始化内部的I/O watcher引用,并设置了其回调为gevent_callback_io方法,在start方法中则调用了libev的ev_io_start方法来启动这个watcher,并记录了回调的方法。。。这里如果对libev有基本了解的话,通过上述代码就已经了解了I/O watcher的运行原理。。。
//首先获取wather对象,然后调用回调 #define DEFINE_CALLBACK(WATCHER_LC, WATCHER_TYPE) static void gevent_callback_##WATCHER_LC(struct ev_loop *_loop, void *c_watcher, int revents) { struct PyGevent##WATCHER_TYPE##Object* watcher = GET_OBJECT(PyGevent##WATCHER_TYPE##Object, c_watcher, _watcher); gevent_callback(watcher->loop, watcher->_callback, watcher->args, (PyObject*)watcher, c_watcher, revents); }
//对于所有的事件的回调,其实最终都是通过这个函数来处理的 static void gevent_callback(struct PyGeventLoopObject* loop, PyObject* callback, PyObject* args, PyObject* watcher, void *c_watcher, int revents) { GIL_DECLARE; PyObject *result, *py_events; long length; py_events = 0; GIL_ENSURE; Py_INCREF(loop); Py_INCREF(callback); Py_INCREF(args); Py_INCREF(watcher); gevent_check_signals(loop); if (args == Py_None) { args = __pyx_empty_tuple; } length = PyTuple_Size(args); if (length < 0) { gevent_handle_error(loop, watcher); goto end; } if (length > 0 && PyTuple_GET_ITEM(args, 0) == GEVENT_CORE_EVENTS) { py_events = PyInt_FromLong(revents); if (!py_events) { gevent_handle_error(loop, watcher); goto end; } PyTuple_SET_ITEM(args, 0, py_events); } else { py_events = NULL; } result = PyObject_Call(callback, args, NULL); //执行回调 if (result) { Py_DECREF(result); } else { gevent_handle_error(loop, watcher); if (revents & (EV_READ|EV_WRITE)) { //这里可以看出,如果注册了读写事件,那么会取消,也就标示每一次读写,都需要重写注册事件 /* io watcher: not stopping it may cause the failing callback to be called repeatedly */ gevent_stop(watcher, loop); goto end; } } if (!ev_is_active(c_watcher)) { /* Watcher was stopped, maybe by libev. Let's call stop() to clean up * 'callback' and 'args' properties, do Py_DECREF() and ev_ref() if necessary. * BTW, we don't need to check for EV_ERROR, because libev stops the watcher in that case. */ gevent_stop(watcher, loop); } end: if (py_events) { Py_DECREF(py_events); PyTuple_SET_ITEM(args, 0, GEVENT_CORE_EVENTS); } Py_DECREF(watcher); Py_DECREF(args); Py_DECREF(callback); Py_DECREF(loop); GIL_RELEASE; }
标签:des style io ar for 文件 art cti sp