typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; // <MM> // 存放的是上次触发定时器事件的时间 // </MM> time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ // <MM> // 所有定时器事件组织成链表 // </MM> aeTimeEvent *timeEventHead; // <MM> // 是否停止eventLoop // </MM> int stop; void *apidata; /* This is used for polling API specific data */ // <MM> // 事件循环每一次迭代都会调用beforesleep // </MM> aeBeforeSleepProc *beforesleep; } aeEventLoop;
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; // <MM> // setsize指定事件循环监听的fd的数目 // 由于内核保证新创建的fd是最小的正整数,所以直接创建setsize大小 // 的数组,存放对应的event // </MM> if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let‘s initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }以epoll为例,aeApiCreate主要是创建epoll的fd,以及要监听的epoll_event,这些数据定义在:
typedef struct aeApiState { int epfd; struct epoll_event *events; } aeApiState;这里,监听到的事件组织方式与event_loop中监听事件一样,同样是setsize大小的数据,以fd为下标。
static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } eventLoop->apidata = state; return 0; }
/* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent;
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
/* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;下面看一下epoll添加事件的实现,主要是调用epoll_ctl。
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee; /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }struct epll_event用于指定要监听的事件,以及该文件描述符绑定的data,在事件触发时可以返回。这里将data直接存为fd,通过这个数据,便可以找到对应的事件,然后调用其处理函数。
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ // <MM> // 在两种情况下进入poll,阻塞等待事件发生: // 1)在有需要监听的描述符时(maxfd != -1) // 2)需要处理定时器事件,并且DONT_WAIT开关关闭的情况下 // </MM> if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; // <MM> // 根据最快发生的定时器事件的发生时间,确定此次poll阻塞的时间 // </MM> if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // <MM> // 线性查找最快发生的定时器事件 // </MM> shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // <MM> // 如果有定时器事件,则根据它触发的时间,计算sleep的时间(ms单位) // </MM> long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { // <MM> // 如果没有定时器事件,则根据情况是立即返回,或者永远阻塞 // </MM> /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } }接着,调用aeApiPoll函数,传入前面计算的sleep时间,等待io事件放生。在函数返回后,触发的事件已经填充到eventLoop的fired数组中。epoll的实现如下,就是调用epoll_wait,函数返回后,会将触发的事件存放到state->events数组中的前numevents个元素。接下来,填充fired数组,设置每个触发事件的fd,以及事件类型。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // <MM> // 调用epoll_wait,state->events存放返回的发生事件的fd // </MM> retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; // <MM> // 有事件发生,将发生的事件存放于fired数组 // </MM> for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; }在事件返回后,需要处理事件。遍历fired数组,取得fd对应的事件,并根据触发的事件类型,回调其处理函数。
for (j = 0; j < numevents; j++) { // <MM> // poll返回后,会将所有触发的时间存放于fired数组 // </MM> aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn‘t * processed, so we check if the event is still valid. */ // <MM> // 回调发生事件的fd,注册的事件处理函数 // </MM> if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; }
/* If the system clock is moved to the future, and then set back to the * right value, time events may be delayed in a random way. Often this * means that scheduled operations will not be performed soon enough. * * Here we try to detect system clock skews, and force all the time * events to be processed ASAP when this happens: the idea is that * processing events earlier is less dangerous than delaying them * indefinitely, and practice suggests it is. */ if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } } eventLoop->lastTime = now;接下来遍历所有定时器事件,查找触发的事件,然后回调处理函数。定时器事件处理函数的返回值,决定这个事件是一次性的,还是周期性的。如果返回AE_NOMORE,则是一次性事件,在调用完后会删除该事件。否则的话,返回值指定的是下一次触发的时间。
te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; while(te) { long now_sec, now_ms; long long id; if (te->id > maxId) { te = te->next; continue; } aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { // <MM> // 定时器事件的触发时间已过,则回调注册的事件处理函数 // </MM> int retval; id = te->id; retval = te->timeProc(eventLoop, id, te->clientData); processed++; /* After an event is processed our time event list may * no longer be the same, so we restart from head. * Still we make sure to don‘t process events registered * by event handlers itself in order to don‘t loop forever. * To do so we saved the max ID we want to handle. * * FUTURE OPTIMIZATIONS: * Note that this is NOT great algorithmically. Redis uses * a single time event so it‘s not a problem but the right * way to do this is to add the new elements on head, and * to flag deleted elements in a special way for later * deletion (putting references to the nodes to delete into * another linked list). */ // <MM> // 根据定时器事件处理函数的返回值,决定是否将该定时器删除。 // 如果retval不等于-1(AE_NOMORE),则更改定时器的触发时间为 // now + retval(ms) // </MM> if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { // <MM> // 如果返回AE_NOMORE,则删除该定时器 // </MM> aeDeleteTimeEvent(eventLoop, id); } te = eventLoop->timeEventHead; } else { te = te->next; } }在回调处理函数时,有可能会添加新的定时器事件,如果不断加入,存在死循环的风险,所以需要避免这种情况,每次循环不处理新添加的事件,这是通过下面的代码实现的。
if (te->id > maxId) { te = te->next; continue; }