标签:
八、统一定时器事件和I/O事件详解
和信号事件相比,把定时器事件和I/O事件统一起来就变得十分容易了,为什么?因为I/O复用机制如select(),poll(),epoll_wait()都允许设置一个最大等待时间^_^。So,让我们来看看libevent是怎样做的吧。PS:实际上很多事件驱动的软件都是这样做的。
1.实现方法
核心就是在每次事件循环中设置I/O复用的最大等待时间为定时器小顶堆中的顶节点的时间(即将要最早发送的定时器事件)。当然,如果进入一个事件循环时,激活事件队列不为空(即有就绪事件没有被处理),则设置最大等待时间为0。
具体的代码在源文件 event.c 的 event_base_loop()中:
/*event_dispatch调用了这个函数,事件驱动机制的核心循环,在这个事件循环中监听事件并处理就绪的事件*/ int event_base_loop(struct event_base *base, int flags) { ...... /*flags目前没有什么作用,传递的都是0,如果当前没有被激活的事件,从小顶堆中 取出时间,作为回调epoll_wait第三个参数*/ if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) { timeout_next(base, &tv_p);//根据Timer事件计算IO复用的最大等待时间 } else { /*如果仍然有激活的事件,并不是马上处理,而是将时间便为0,让epoll_wait立刻 返回,按照以前的流程继续走*/ evutil_timerclear(&tv); } ...... /*这里调用了I/O复用的驱动器,在epoll中相当与是epoll_wait,如果这个函数返回,说明 存在事件需要处理 等待这I/O就绪*/ res = evsel->dispatch(base, evbase, tv_p); ...... /*使用小顶堆的堆顶作为循环的时间是将定时事件融合到I/O机制中的关键, 在这个函数中将合适的超时事件添加到激活队列中*/ timeout_process(base); /*根据活跃事件的个数(event_count_active)来进行处理*/ if (base->event_count_active) { /*处理事件的函数*/ event_process_active(base); if (!base->event_count_active && (flags & EVLOOP_ONCE)) done = 1; } else if (flags & EVLOOP_NONBLOCK) done = 1; } ...... }
timeout_next()函数根据堆中具有最小超时值的事件和当前时间来计算等待时间
static int timeout_next(struct event_base *base, struct timeval **tv_p) { struct timeval now; struct event *ev; struct timeval *tv = *tv_p; <pre name="code" class="cpp">// 堆的首元素具有最小的超时值 if ((ev = min_heap_top(&base->timeheap)) == NULL) { // 如果没有定时事件,将等待时间设置为NULL,表示一直阻塞直到有I/O事件发生 *tv_p = NULL; return (0); } // 取得当前时间 gettime(base, &now); // 如果超时时间<=当前值,不能等待,需要立即返回 if (evutil_timercmp(&ev->ev_timeout, &now, <=)) { evutil_timerclear(tv); return (0); } // 计算等待的时间=当前时间-最小的超时时间 evutil_timersub(&ev->ev_timeout, &now, tv); return (0);}
Libevent中管理定时事件的数据结构是小顶堆,源码位于文件min_heap.h中,向最小堆中插入、删除元素的时间复杂度是O(lgN),获取最小值得时间复杂度是O(1)。另外,堆是一个完全二叉树,基本存储方式是一个数组
下面是小顶堆的插入元素的典型代码逻辑:
Heap[size++]<-new; // 先放到数组末尾,元素个数+1 // 下面就是 shift_up()的代码逻辑,不断的将 new 向上调整 _child = size; while(_child>0) // 循环 { _parent<-(_child-1)/2; // 计算 parent if(Heap[_parent].key < Heap[_child].key) break; // 调整结束,跳出循环 swap(_parent, _child); // 交换 parent 和 child }Libevent对插入操作进行了优化:
// 下面就是 shift_up()的代码逻辑,不断的将 new 的“预留位置”向上调整 _hole = size; // _hole 就是为 new 预留的位置,但并不立刻将 new 放上 while(_hole>0) // 循环 { _parent<-(_hole-1)/2; // 计算 parent if(Heap[_parent].key < new.key) break; // 调整结束,跳出循环 Heap[_hole] = Heap[_parent]; // 将 parent 向下调整 _hole = _parent; // 将_hole 调整到_parent } Heap[_hole] = new; // 调整结束,将 new 插入到_hole 指示的位置 size++; // 元素个数+1
3、总结
Libevent实际上是利用最小堆去管理定时事件(当定时事件很少时,可以用链表,redis就是这么干的),然后用最小堆中时间最近的定时事件的时间去设置I/O复用的最大等待时间,从而实现了定时器事件和I/O事件的统一。从而,我们将三类事件全部统一到了事件主循环中去了。
注:libevent1.4.12中的epoll没有提供边沿触发,而是使用的默认的水平触发。另外,我想说的是libevent1.4.12不支持持久的定时器事件。我将在我的简化版的libevent(likevent)中增加这个功能。
九、选择最优的I/O复用
1、将I/O复用封装成事件多路分发器
前面我们说过,Libevet本身是一种典型的Reactor模式,Reactor模式中有一个组件叫做事件多路分发器,这个组件实际上就是对某一种I/O复用的封装。那么问题来了,每种系统下提供的I/O复用机制不全相同,即使是同一个操作系统中提供的接口也有多种,那么怎么统一这些I/O复用机制来提供一个标准的事件多路分发器给其他组件使用呢?java中,我们可以采用接口;c++中,我们可以采用包含虚函数的类。Libevent展现了一种c中实现统一接口的方法,带有函数指针的结构体。
Libevent支持多种I/O复用技术的关键就在于结构体eventop:
struct event_op { const char *name;//io复用的名字 void *(*init)(struct event_base *);//初始化 int (*add)(void *, struct event *);//注册事件 int (*del)(void *, struct event *);//删除事件 int (*dispatch)(struct event_base *, void *, struct timeval *);//事件分发 void (*dealloc)(struct event_base *, void *);//销毁资源 /* set if we need to reinitialize the event base */ int need_reinit; };
static void *epoll_init (struct event_base *); static int epoll_add (void *, struct event *); static int epoll_del (void *, struct event *); static int epoll_dispatch (struct event_base *, void *, struct timeval *); static void epoll_dealloc (struct event_base *, void *); const struct event_op epollops = { "epoll", epoll_init, epoll_add, epoll_del, epoll_dispatch, epoll_dealloc, 1 /* need reinit */ };epollops和epoll对五个函数接口的实现定义在epoll.c文件中,对外是不可见的,从而实现了信息隐藏。
2、Libevent怎么选择最优的I/O复用机制
这里可以分解成两个问题:
(1)怎么知道有哪些I/O复用机制可用
Libevent的编译用的是autotools,她的编译脚本configure在执行时会检测系统中提供的api,并生成一个存放测试结果的头文件(例如:如果测试得知系统中有epoll,则在存放测试结果的头文件中加入一个宏,即#define HAVE_EPOLL)。同理,我们就可以知道哪些I/O复用机制可用。
(2)如何选择最优的I/O复用机制
Libevent中由base->evbase去存放唯一一个会使用的事件多路分发器实例,但是如果系统中有多个I/O多路复用机制,我们在初始化base->evbase前就有多个事件多路分发器实例,应该选哪个来初始化base->evbase呢?答案就是对所有的I/O复用机制按性能进行排序,然后按性能由低到高将对应的事件多路分发器实例放到一个数组里,将该数组中的所有事件多路分发器实例由前向后依次赋予base->evbase,这样就能保证最后base->evbase中存放的是最优的I/O复用机制实现的事件多路分发器实例。
下面是实现这一机制的核心代码:
static const struct event_op *eventops[] = { #ifdef HAVE_EVENT_PORTS &evportops, #endif #ifdef HAVE_SELECT &selectops, #endif #ifdef HAVE_POLL &pollops, #endif #ifdef HAVE_EPOLL &epollops, #endif #ifdef HAVE_WORKING_KQUEUE &event_op kqops, #endif #ifdef HAVE_DEVPOLL &event_op devpollops, #endif #ifdef WIN32 &struct event_op win32ops, #endif NULL }; const struct event_op epollops = { "epoll", epoll_init, epoll_add, epoll_del, epoll_dispatch, epoll_dealloc, 1 /* need reinit */ }; base->evbase = NULL; /*寻找合适的I/O复用机制,在这里说明,libevent库使用的I/O机制是在编译的时候确定的 其实evbase和某一个复用机制的关系就像类和对象的关系一样,复用机制是evbase的具体实现 */ for (i = 0; eventops[i] && !base->evbase; i++) { /*eventops 是一个全局的结构体,结构体中都是不同内核所支持的几种I/O复用机制*/ base->evsel = eventops[i]; /*注意:在这里调用了一个非常重要的函数base->evsel->init(base),使用这个回调函数来初始 化套接着的信息,如果使用的是epoll_wait复用机制,这个回调函数中最重要的 就是epoll_create函数.... */ /*在回调函数中的初始化就是做一些和系统调用相关的操作,注意回调函数的的返回值是一个void* 但是这个指针是和epoll_wait联通的桥梁,返回的就是eventops[i]这个结构体指针*/ base->evbase = base->evsel->init(base); }
Libevent通过函数指针实现了对多种I/O复用机制的支持,同时也展现了c语言中的条件编译的应用,实际上我们完全可以更改configure生成的头文件来手动选择用哪个I/O复用机制。后面,我将提供一份我针对linux简化了的libevent(likevent),供大家参考。
九、时间缓存和校正
1、原理
如果《libevent源码深度剖析》中所说,如果系统支持monotonic时间,该时间是系统从boot后到现在的时间,因此不需要执行校正(归根到底,是因为用户不能手动更改monotonic时间)。否则,就要在事件循环中执行时间校验。why?你想想啦,如果你加了个定时器事件,准备两个小时后处理(或许是放音乐叫你起床),结果有个就家伙恶作剧把系统时间往前调了一个小时,然后电脑放音乐的时候已经过了3个小时了,今天的工资估计就扣的差不多了,哈哈。这种情况,libevent是可以帮你处理的。但是,有一点必须注意,如果那个家伙也了解libevent的原理(而且你的系统不支持monotonic时间),他可能把系统时间往后调1个小时,这个时候libevent就帮不了你了,可能还没睡着,音乐就响了。那么,libevent是如何处理系统时间被往前调了这个情况的呢?(当然,如果你的系统支持monotonic时间,libevent就不会操这么多心了)
在回答上面那个问题之前,我们必须要回答的一个问题是服务器为什么一般都要做时间缓存,需要的时候直接从系统取不就可以了吗?效率。现在的服务器都要求有很高的效率,然后系统调用是一种非常消耗cpu资源的行为,它伴随着用户空间和内核空间的切换。所以我们就把时间缓存在用户空间,大部分时候需要时间的话,就直接读时间缓存啦,等适当的时间再更新时间缓存。libevent正是处于效率的考虑,也用了时间缓存机制。
libevent更新时间缓存的时机是事件循环中单次循环结束后。时间缓存tv_cache放在base对象中,在base对象中还有一个时间缓存的副本event_tv,跟新这个副本的时机是单次下次循环开始前(注:单次和下次只是表示紧挨着的两次循环)。在跟新时间缓存的副本event_tv(时间缓存tv_cache存放的已经是当前时间),如果一切正常,在更新event_tv前,是不是应该event_tv<=tv_cache(因为这时,event_tv表示过去,而tv_cache表示现在嘛),要是,event_tv>tv_cache(表示时间倒流了,一个美妙的幻想。),libevent就会明白,系统时间一定是被哪个家伙偷偷修改了。于是,libevent就根据event_tv和tv_cache的时间差来调整时间堆里面每个定时器事件的时间(libevent默认为系统时间没向前调了event_tv-tv_cache的时间)。这样以来,就再也不用担心系统时间没小伙伴恶意往前调了。但是,系统时间往后调了怎么办,libevent就只能表示很无奈了。
2、下面是时间校验的核心代码
static void timeout_correct(struct event_base *base, struct timeval *tv) { struct event **pev; unsigned int size; struct timeval off; if (use_monotonic)//有monotonic时间的支持,就是这么任性 return; /* Check if time is running backwards */ /*tv <----- tv_cache*/ gettime(base, tv); /*本来event_tv应该小于tv_cache 如果 tv< event_tv 表明用户向前调整了时间,需要校正*/ if (evutil_timercmp(tv, &base->event_tv, >=)) { base->event_tv = *tv; return; } event_debug(("%s: time is running backwards, corrected", __func__)); /*计算时间差值*/ evutil_timersub(&base->event_tv, tv, &off); /* * We can modify the key element of the node without destroying * the key, beause we apply it to all in the right order. */ /*调整定时事件的小顶堆*/ pev = base->timeheap.p; size = base->timeheap.n; for (; size-- > 0; ++pev) { struct timeval *ev_tv = &(**pev).ev_timeout; evutil_timersub(ev_tv, &off, ev_tv); } /* Now remember what the new time turned out to be. */ /*更新event_tv为tv_cache*/ base->event_tv = *tv; }
系统时间要是被往后调了,那么libevent就玩不动了。
标签:
原文地址:http://blog.csdn.net/qq_15457239/article/details/51320917