标签:
从main函数切入,方便从宏观上掌握redis的运作机制,本篇就从main函数入手,从最上层看,main调用了哪些接口,具体完成了什么功能,然后再聚焦具体的模块。
aeEventLoop是Redis的事件核心数据结构,Redis将aeEventLoop不同平台上的多路分离器进行适配,如select/kqueue/epoll。
为了跨平台,aeEventLoop中定义了void* apidata这一结构,用来有不同平台的分离器进行关联。
1 typedef struct aeEventLoop { 2 // 目前已注册的最大描述符 3 int maxfd; /* highest file descriptor currently registered */ 4 // 目前已追踪的最大描述符 5 int setsize; /* max number of file descriptors tracked */ 6 // 用于生成时间事件 id 7 long long timeEventNextId; 8 // 最后一次执行时间事件的时间 9 time_t lastTime; /* Used to detect system clock skew */ 10 // 已注册的文件事件 11 aeFileEvent *events; /* Registered events */ 12 // 已就绪的文件事件 13 aeFiredEvent *fired; /* Fired events */ 14 // 时间事件 15 aeTimeEvent *timeEventHead; 16 // 事件处理器的开关 17 int stop; 18 // 多路复用库的私有数据 19 void *apidata; /* This is used for polling API specific data */ 20 // 在处理事件前要执行的函数 21 aeBeforeSleepProc *beforesleep; 22 } aeEventLoop;
其中select的apidata结构为:
typedef struct aeApiState { fd_set rfds, wfds; /* We need to have a copy of the fd sets as it‘s not safe to reuse * FD sets after select(). */ fd_set _rfds, _wfds; } aeApiState;
这里提到额外引入了_rfds,_wfds,是读写句柄集合的拷贝,原因是:select调用完成后,重用fd集合不是安全的(具体原因可以看select函数解析)。
在创建eventloop的时候,对其中eventloop->events数组中的每个event(即eventloop->events[fd])的mask都预置位AE_NONE;
/* Events with mask == AE_NONE are not set. So let‘s initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE;
当第一个事件来临时候,
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, j, numevents = 0; memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { for (j = 0; j <= eventLoop->maxfd; j++) { int mask = 0; aeFileEvent *fe = &eventLoop->events[j]; if (fe->mask == AE_NONE) continue; if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) mask |= AE_WRITABLE; eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } } return numevents; }
这里对fe->mask进行了判断,如果为AE_NONE,则说明此fd没有可读或者可写事件,跳过本次for循环。
否则分别判断为可读还是可写,并构造fe的mask。最后将此事件关联的fd(即这里的j)添加至fired数组,其为所有已经就绪的fd数组。
显然可知:aeApiPoll的作用是利用多路分离器,复用IO,判断哪些fd可读写,并加入到已就绪数组中,等待被处理。
那么aeApiPoll什么时候会被调用呢?
在函数aeProcessEvents中调用,获取已经就绪的fd,完成读写IO。
调用处的关键代码如下:
numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 从已就绪数组中获取事件 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn‘t * processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) { // 读事件 rfired = 1; // 确保读/写事件只能执行其中一个 fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { // 写事件 if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++;
简单分析下具体细节:
// 计算tvp(即多路分离器的timeout,超时时间,对于select的最后一个参数来说,如果为NULL,那么select会阻塞直到有句柄可读写为止)
调用numevents=aeApiPoll(eventloop, tvp),返回可读写的事件个数,遍历eventloop->events数组,此数组以fd作为下标索引事件,而fd则存储在eventloop->fired数组中。
因此从numevents到fe的索引逻辑如下图:
事件可读,则对调用fe->rfileProc;事件可写,那么调用fe->wfileProc进行IO写。而fe->rfileProc,fe->wfileProc的初始化则在initServer中;
initServer的主要代码如下:
// 注册新号处理函数,对于SIGHUP,SIGPIPE,忽略。 // 初始化共享对象 // 关键,初始化redis的事件循环主结构 server.el = aeCreateEventLoop(server.maxclients+1024); // 初始化作为server的网络连接 server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr); //初始化db // 初始化pubsub //关键,关联时间时间 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); // 将前面创建的server socket即server.ipfd与redis的事件循环关联。 if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event."); //初始化bio
其中关键的函数为aeCreateFileEvent,这里将server.ipfd于server.el进行关联,而acceptTcpHandler将用来初始化fe->rfileProc,以及fe->wfileProc。
进入aeCreateFileEvent函数中,此函数比较重要,我们逐行分析:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) return AE_ERR; aeFileEvent *fe = &eventLoop->events[fd]; // 监听指定 fd if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; // 设置文件事件类型 fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; // 如果有需要,更新事件处理器的最大 fd if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
上面对此调用aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL)
其中eventloop为redis核心事件循环,fd为server.ipfd,mask为AE_READABLE,proc=acceptTcpHandler,即fe->rfileProc以及fe->wfileProc均为acceptTcpHandler,
clientdata为NULL。
总结为:server端建立服务连接后,用于接收client响应的socket即server.ipfd(用socket(AF_INET,SOCK_STREAM,0)创建出socket),创建的socket监听读事件,对应的事件处理函数为acceptTcpHandler。
查看acceptTcpHandler函数代码:
// 其中调用accept,创建cfd服务客户端 cfd = anetTcpAccept(server.neterr, fd, cip, &cport) // 为cfd创建redisClient,获取客户端查询命令 // 调用readQueryFromClient()获取客户端buffer。 acceptCommandHandler(cfd,0)
cfd为来自客户端的响应创建的socket,为此客户端服务。在acceptCommandHandler中为cfd创建redisclient。关键代码为:
static void acceptCommonHandler(int fd, int flags) { redisClient *c; // 创建新客户端 if ((c = createClient(fd)) == NULL) { redisLog(REDIS_WARNING,"Error allocating resources for the client"); close(fd); /* May be already closed, just ignore errors */ return; } /* If maxclient directive is set and this is one client more... close the * connection. Note that we create the client instead to check before * for this condition, since now the socket is already set in nonblocking * mode and we can send an error for free using the Kernel I/O */ if (listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; /* That‘s a best effort error message, don‘t check write errors */ // 发送错误信息到客户端 if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; // 释放客户端 freeClient(c); return; } server.stat_numconnections++; c->flags |= flags; }
其中调用createClient为cfd这个句柄创建redisClient,在createClient中对socket设置了O_NONBLOCK以及TCP_NODELAY属性,即非阻塞及不延迟模式。
同时在此函数为cfd添加读事件,监听来自客户端的命令请求。具体的细节,在后面系列中再深入剖析。
对系列1进行总结:从redis.c的main函数入口,做必要的初始化、配置读取、创建redis的核心事件循环eventloop、创建server网络连接,并将事件循环与server的网络连接进行关联,并基于不同平台的IO多路复用机制实现redis自己的多路分离机制。
基于此,可以绘制出易于理解的思维导图:
小旗子中标出的为重点理解部分。
标签:
原文地址:http://www.cnblogs.com/crafet/p/4659802.html