ngx_module_t ngx_events_module = { NGX_MODULE_V1, &ngx_events_module_ctx, /* module context */ ngx_events_commands, /* module directives */ NGX_CORE_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING }; static ngx_command_t ngx_events_commands[] = { { ngx_string("events"), NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS, ngx_events_block, 0, 0, NULL }, ngx_null_command }; static ngx_core_module_t ngx_events_module_ctx = { ngx_string("events"), NULL, NULL };
//conf为ngx_conf_handler中的conf = confp[ngx_modules[i]->ctx_index];也就是conf指向的是ngx_cycle_s->conf_ctx[], //所以对conf赋值就是对ngx_cycle_s中的conf_ctx赋值,最终就是ngx_cycle_s中的conf_ctx[ngx_events_module=>index]指向了ctx *(void **) conf = ctx; for (i = 0; ngx_modules[i]; i++) { if (ngx_modules[i]->type != NGX_EVENT_MODULE) { continue; } m = ngx_modules[i]->ctx; if (m->create_conf) { (*ctx)[ngx_modules[i]->ctx_index] = m->create_conf(cf->cycle); if ((*ctx)[ngx_modules[i]->ctx_index] == NULL) { return NGX_CONF_ERROR; } } } //零时保存之前的cf,在下面解析完event{}配置后,在恢复 pcf = *cf; cf->ctx = ctx; cf->module_type = NGX_EVENT_MODULE; cf->cmd_type = NGX_EVENT_CONF; rv = ngx_conf_parse(cf, NULL);//这时候cf里面的上下文ctx为NGX_EVENT_MODULE模块create_conf的用于存储event{}的空间 *cf = pcf; //解析完event{}配置后,恢复 if (rv != NGX_CONF_OK) { return rv; } for (i = 0; ngx_modules[i]; i++) { if (ngx_modules[i]->type != NGX_EVENT_MODULE) { continue; } m = ngx_modules[i]->ctx; if (m->init_conf) { rv = m->init_conf(cf->cycle, (*ctx)[ngx_modules[i]->ctx_index]); if (rv != NGX_CONF_OK) { return rv; } } }
//相关配置见ngx_event_core_commands ngx_http_core_commands ngx_stream_commands ngx_http_core_commands ngx_core_commands ngx_mail_commands static ngx_command_t ngx_event_core_commands[] = { //每个worker进程可以同时处理的最大连接数 //连接池的大小,也就是每个worker进程中支持的TCP最大连接数,它与下面的connections配置项的意义是重复的,可参照9.3.3节理解连接池的概念 { ngx_string("worker_connections"), NGX_EVENT_CONF|NGX_CONF_TAKE1, ngx_event_connections, 0, 0, NULL }, //设置事件模型。 use [kqueue | rtsig | epoll | dev/poll | select | poll | eventport] linux系统中只支持select poll epoll三种 //freebsd里的kqueue,LINUX中没有 //确定选择哪一个事件模块作为事件驱动机制 { ngx_string("use"), NGX_EVENT_CONF|NGX_CONF_TAKE1, ngx_event_use, 0, 0, NULL }, //当事件模块通知有TCP连接时,尽可能在本次调度中对所有的客户端TCP连接请求都建立连接 //对应于事件定义的available字段。对于epoll事件驱动模式来说,意味着在接收到一个新连接事件时,调用accept以尽可能多地接收连接 { ngx_string("multi_accept"), NGX_EVENT_CONF|NGX_CONF_FLAG, ngx_conf_set_flag_slot, 0, offsetof(ngx_event_conf_t, multi_accept), NULL }, //accept_mutex on|off是否打开accept进程锁,是为了实现worker进程接收连接的负载均衡、打开后让多个worker进程轮流的序列号的接收TCP连接 //默认是打开的,如果关闭的话TCP连接会更快,但worker间的连接不会那么均匀。 { ngx_string("accept_mutex"), NGX_EVENT_CONF|NGX_CONF_FLAG, ngx_conf_set_flag_slot, 0, offsetof(ngx_event_conf_t, accept_mutex), NULL }, //accept_mutex_delay time,如果设置为accpt_mutex on,则worker同一时刻只有一个进程能个获取accept锁,这个accept锁不是阻塞的,如果娶不到会 //立即返回,然后等待time时间重新获取。 //启用accept_mutex负载均衡锁后,延迟accept_mutex_delay毫秒后再试图处理新连接事件 { ngx_string("accept_mutex_delay"), NGX_EVENT_CONF|NGX_CONF_TAKE1, ngx_conf_set_msec_slot, 0, offsetof(ngx_event_conf_t, accept_mutex_delay), NULL }, //debug_connection则在收到该IP地址请求的时候,使用debug级别打印。其他的还是沿用error_log中的设置 //需要对来自指定IP的TCP连接打印debug级别的调斌日志 { ngx_string("debug_connection"), NGX_EVENT_CONF|NGX_CONF_TAKE1, ngx_event_debug_connection, 0, 0, NULL }, ngx_null_command }; //ngx_event_core_module模块则仅实现了create_conf方法和init_conf方法,这是因为它并不真正负责TCP网络事件的驱动, //所以不会实现ngx_event_actions_t中的方法 ngx_event_module_t ngx_event_core_module_ctx = { &event_core_name, ngx_event_core_create_conf, /* create configuration */ ngx_event_core_init_conf, /* init configuration */ { NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL } }; /* Nginx定义了一系列(目前为9个)运行在不同操作系统、不同内核版本上的事件驱动模块,包括:ngx_epoll_module、ngx_kqueue_module、 ngx_poll_module、ngx_select_module、ngx_devpoll_module、ngx_eventport_module、ngx_aio_module、ngx_rtsig_module 和基于Windows的ngx_select_module模块。在ngx_event_core_module模块的初始化过程中,将会从以上9个模块中选取1个作为Nginx进程的事件驱动模块。 */ ngx_module_t ngx_event_core_module = { NGX_MODULE_V1, &ngx_event_core_module_ctx, /* module context */ ngx_event_core_commands, /* module directives */ NGX_EVENT_MODULE, /* module type */ NULL, /* init master */ ngx_event_module_init, /* init module */ //解析完配置文件后执行 ngx_event_process_init, /* init process */ //在创建子进程的里面执行 ngx_worker_process_init NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING };注意这里的ngx_event_process_init函数实际上是做了很多的事情,这个函数是会在创建子进程的时候被调用。具体参见书本说明。
static ngx_command_t ngx_epoll_commands[] = { /* 在调用epoll_wait时,将由第2和第3个参数告诉Linux内核一次最多可返回多少个事件。这个配置项表示调用一次epoll_wait时最多可返回 的事件数,当然,它也会预分配那么多epoll_event结构体用于存储事件 */ { ngx_string("epoll_events"), NGX_EVENT_CONF|NGX_CONF_TAKE1, ngx_conf_set_num_slot, 0, offsetof(ngx_epoll_conf_t, events), NULL }, /* 在开启异步I/O且使用io_setup系统调用初始化异步I/O上下文环境时,初始分配的异步I/O事件个数 */ { ngx_string("worker_aio_requests"), NGX_EVENT_CONF|NGX_CONF_TAKE1, ngx_conf_set_num_slot, 0, offsetof(ngx_epoll_conf_t, aio_requests), NULL }, ngx_null_command }; ngx_event_module_t ngx_epoll_module_ctx = { &epoll_name, ngx_epoll_create_conf, /* create configuration */ ngx_epoll_init_conf, /* init configuration */ { ngx_epoll_add_event, /* add an event */ //ngx_add_event ngx_epoll_del_event, /* delete an event */ ngx_epoll_add_event, /* enable an event */ //ngx_add_conn ngx_epoll_del_event, /* disable an event */ ngx_epoll_add_connection, /* add an connection */ ngx_epoll_del_connection, /* delete an connection */ #if (NGX_HAVE_EVENTFD) ngx_epoll_notify, /* trigger a notify */ #else NULL, /* trigger a notify */ #endif ngx_epoll_process_events, /* process the events */ ngx_epoll_init, /* init the events */ ngx_epoll_done, /* done the events */ } }; ngx_module_t ngx_epoll_module = { NGX_MODULE_V1, &ngx_epoll_module_ctx, /* module context */ ngx_epoll_commands, /* module directives */ NGX_EVENT_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING };ngx_epoll_init, ngx_epoll_addevent, ngx_epoll-process_events 中则是对应epoll_create,epoll_ctl, epoll_wait这些epoll的系统调用函数。
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)//flags参数中含有NGX_POST_EVENTS表示这批事件要延后处理 { int events; uint32_t revents; ngx_int_t instance, i; ngx_uint_t level; ngx_err_t err; ngx_event_t *rev, *wev; ngx_queue_t *queue; ngx_connection_t *c; char epollbuf[256]; /* NGX_TIMER_INFINITE == INFTIM */ //ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "begin to epoll_wait, epoll timer: %M ", timer); /* 调用epoll_wait获取事件。注意,timer参数是在process_events调用时传入的,在9.7和9.8节中会提到这个参数 */ //The call was interrupted by a signal handler before any of the requested events occurred or the timeout expired; //如果有信号发生(见函数ngx_timer_signal_handler),如定时器,则会返回-1 //需要和ngx_add_event与ngx_add_conn配合使用 //event_list存储的是就绪好的事件,如果是select则是传入用户注册的事件,需要遍历检查,而且每次select返回后需要重新设置事件集,epoll不用 /* 这里面等待的事件包括客户端连接事件(这个是从父进程继承过来的ep,然后在子进程while前的ngx_event_process_init->ngx_add_event添加), 对已经建立连接的fd读写事件的添加在ngx_event_accept->ngx_http_init_connection->ngx_handle_read_event */ /* ngx_notify->ngx_epoll_notify只会触发epoll_in,不会同时引发epoll_out,如果是网络读事件epoll_in,则会同时引起epoll_out */ events = epoll_wait(ep, event_list, (int) nevents, timer); //timer为-1表示无限等待 nevents表示最多监听多少个事件,必须大于0 //EPOLL_WAIT如果没有读写事件或者定时器超时事件发生,则会进入睡眠,这个过程会让出CPU err = (events == -1) ? ngx_errno : 0; //当flags标志位指示要更新时间时,就是在这里更新的 //要摸ngx_timer_resolution毫秒超时后跟新时间,要摸epoll读写事件超时后跟新时间 if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { ngx_time_update(); } if (err) { if (err == NGX_EINTR) { if (ngx_event_timer_alarm) { //定时器超时引起的epoll_wait返回 ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); return NGX_ERROR; } if (events == 0) { if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); return NGX_ERROR; } //遍历本次epoll_wait返回的所有事件 for (i = 0; i < events; i++) { //和ngx_epoll_add_event配合使用 /* c = event_list[i].data.ptr; //通过这个确定是那个连接 instance = (uintptr_t) c & 1; //将地址的最后一位取出来,用instance变量标识, 见ngx_epoll_add_event /* 无论是32位还是64位机器,其地址的最后1位肯定是0,可以用下面这行语句把ngx_connection_t的地址还原到真正的地址值 */ //注意这里的c有可能是accept前的c,用于检测是否客户端发起tcp连接事件,accept返回成功后会重新创建一个ngx_connection_t,用来读写客户端的数据 c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); rev = c->read; //取出读事件 //注意这里的c有可能是accept前的c,用于检测是否客户端发起tcp连接事件,accept返回成功后会重新创建一个ngx_connection_t,用来读写客户端的数据 if (c->fd == -1 || rev->instance != instance) { //判断这个读事件是否为过期事件 //当fd套接字描述符为-l或者instance标志位不相等时,表示这个事件已经过期了,不用处理 /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } revents = event_list[i].events; //取出事件类型 ngx_epoll_event_2str(revents, epollbuf); memset(epollbuf, 0, sizeof(epollbuf)); ngx_epoll_event_2str(revents, epollbuf); ngx_log_debug4(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d %s(ev:%04XD) d:%p", c->fd, epollbuf, revents, event_list[i].data.ptr); if (revents & (EPOLLERR|EPOLLHUP)) { //例如对方close掉套接字,这里会感应到 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents); } #if 0 if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "strange epoll_wait() events fd:%d ev:%04XD", c->fd, revents); } #endif if ((revents & (EPOLLERR|EPOLLHUP)) && (revents & (EPOLLIN|EPOLLOUT)) == 0) { /* * if the error events were returned without EPOLLIN or EPOLLOUT, * then add these flags to handle the events at least in one * active handler */ revents |= EPOLLIN|EPOLLOUT; //epoll EPOLLERR|EPOLLHUP实际上是通过触发读写事件进行读写操作recv write来检测连接异常 } if ((revents & EPOLLIN) && rev->active) { //如果是读事件且该事件是活跃的 #if (NGX_HAVE_EPOLLRDHUP) if (revents & EPOLLRDHUP) { rev->pending_eof = 1; } #endif //注意这里的c有可能是accept前的c,用于检测是否客户端发起tcp连接事件,accept返回成功后会重新创建一个ngx_connection_t,用来读写客户端的数据 rev->ready = 1; //表示已经有数据到了这里只是把accept成功前的 ngx_connection_t->read->ready置1,accept返回后会重新从连接池中获取一个ngx_connection_t //flags参数中含有NGX_POST_EVENTS表示这批事件要延后处理 if (flags & NGX_POST_EVENTS) { /* 如果要在post队列中延后处理该事件,首先要判断它是新连接事件还是普通事件,以决定把它加入到ngx_posted_accept_events队 列或者ngx_postedL events队列中。关于post队列中的事件何时执行 */ queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events; ngx_post_event(rev, queue); } else { //如果接收到客户端数据,这里为ngx_http_wait_request_handler rev->handler(rev); //如果为还没accept,则为ngx_event_process_init中设置为ngx_event_accept。如果已经建立连接,则读数据为ngx_http_process_request_line } } wev = c->write; if ((revents & EPOLLOUT) && wev->active) { if (c->fd == -1 || wev->instance != instance) { //判断这个读事件是否为过期事件 //当fd套接字描述符为-1或者instance标志位不相等时,表示这个事件已经过期,不用处理 /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } wev->ready = 1; if (flags & NGX_POST_EVENTS) { ngx_post_event(wev, &ngx_posted_events); //将这个事件添加到post队列中延后处理 } else { //立即调用这个写事件的回调方法来处理这个事件 wev->handler(wev); } } } return NGX_OK; }可知本模块只负责事件的分发,具体的业务逻辑处理是在各个读写事件对应的handler方法中。