标签:mutex lang 复制 同步 sign 变量定义 消息 open lib
本文档针对Nginx1.11.7版本,分析Windows下的相关代码,虽然服务器可能用linux更多,但是windows平台下的代码也基本相似
,另外windows的IOCP完成端口,异步IO模型非常优秀,很值得一看。
曾经有朋友问我,面对一个大项目的源代码,应该从何读起呢?我给他举了一个例子,我们学校大一大二是在紫金港校区,到了
大三搬到玉泉校区,但是大一的时候也会有时候有事情要去玉泉办。偶尔会去玉泉,但是玉泉校区不熟悉,于是跟着百度地图或者
跟着学长走。因为是办事情,所以一般也就是局部走走,比如在学院办公楼里面走走。等到大三刚来到玉泉,会发现,即使是自己
以前来过几次,也觉得这个校区完全陌生,甚至以前来过的地方,也显得格外生疏。但是当我们真正在玉泉校区开始学习生活了,
每天从寝室走到教室大多就是一条路,教超就是另一条路,这两条主要的路走几遍之后,有时候顺路去旁边的小路看看,于是慢慢
也熟悉了这个新的校区。
源代码的阅读又何尝不是这样呢,如果没有一条主要的路线,总是局部看看,浅尝辄止不说,还不容易把握整体的结构。各模块之间
的依赖也容易理不清。如果有一条比较主干的线路,去读源代码,整体结构和思路也会变得明晰起来。当然我也是持这种看法:博客、
文章的作者,写文章的思路作者自己是清楚的,读者却不一定能看得到;而且大家写东西都难免会有疏漏。看别人写的源码分析指引
等等,用一种比较极端的话来说,是一种自我满足,觉得自己很快学到了很多源码级别的知识,但是其实想想,学习乎,更重要的是
学习能力的锻炼,通过源码的学习,学习过程中自己结合自己情况的思考,甚至结合社会哲学的思考,以及读源码之后带来的收益,
自己在平时使用框架、库的时候,出了问题的解决思路,翻阅别人源码来找到bug的能力。如果只是单单看别人写的源码分析,与写
代码的时候只去抄抄现成的代码,某种程度上是有一定相似性的。
我自己是使用go
为主的,之前对于一流的nginx
中间件也没有太多了解,也是第一次去看,水平不足之处,还望海涵。回归正题,
Nginx的源代码分析,也是要找一条主要的路线,对于很多程序来说,启动过程就是一条很不错的路线,找找nginx的入口函数main
,发现在/src/core/nginx.c中,代码大概如下:
int ngx_cdecl main(int argc, char *const *argv) {
... // 先是一些变量声明
ngx_debug_init();
...
ngx_pid = ngx_getpid();
...
init_cycle.pool = ngx_create_pool(1024, log);
...
cycle = ngx_init_cycle(&init_cycle);
...
if (ngx_signal) {
return ngx_signal_process(cycle, ngx_signal);
}
...
if (ngx_create_pidfile(&ccf->pid, cycle->log) != NGX_OK) {
return 1;
}
...
if (ngx_process == NGX_PROCESS_SINGLE) {
ngx_single_process_cycle(cycle);
} else {
ngx_master_process_cycle(cycle);
}
return 0
}
这段代码大致看上去,先是做了一些初始化的事情,包括pool
看起来应该是内存池之类的变量的分配,获取系统信息,
初始化日志系统等等,因为还没有进入相应函数去仔细看,所以先放着。用过nginx
的同学应该了解,nginx
命令行
运行./nginx
后,他直接就运行服务了,很静默,然后即使用Ctrl+C
也关不掉。但是再开一个console
,运行
./nginx -h
就会看到:
nginx version: nginx/1.11.7
Usage: nginx [-?hvVtTq] [-s signal] [-c filename] [-p prefix] [-g directives]
Options:
-?,-h : this help
-v : show version and exit
-V : show version and configure options then exit
-t : test configuration and exit
-T : test configuration, dump it and exit
-q : suppress non-error messages during configuration testing
-s signal : send signal to a master process: stop, quit, reopen, reload
-p prefix : set prefix path (default: NONE)
-c filename : set configuration file (default: conf/nginx.conf)
-g directives : set global directives out of configuration file
这是nginx
的命令行参数介绍,要退出nginx
需要用nginx -s stop
给已经打开的nginx
进程发送信号,让其退出。
而且nginx
还支持平滑的重启,这种重启在更改nginx
配置时非常有用,重启服务器的过程,实际上是nginx
自己内部
的一种处理,重新载入新的配置,但是却不影响已经有的一些连接,所以称之为平滑重启。
gracefully stop nginx…
而main
函数在初始化之后,做的就是命令行参数的解析,如果是显示版本,那么显示一个版本信息,就退出;如果是设置
配置文件,那么去调用设置配置文件的相应处理;如果是发送控制信号,那么return ngx_signal_process(cycle, ngx_signal);
处理信号等等。这里还有个小trick,就是关于pid文件,程序把自己的pid写入一个文件,然后就可以防止启动多个进程,
这是一个比较常用的小技巧。关于ngx_single_process_cycle(cycle)
这应该是单进程的情况,一般而言现在的服务器
都是多核为主,所以我们去ngx_master_process_cycle(cycle)
Master进程的主函数看一看。
ngx_master_process_cycle
函数在/src/os/win32/ngx_process_cycle.c中,该函数接受一个参数,这个参数比较
复杂,但是可以看出,应该是和每次nginx
循环的生命周期有关,这里认为nginx
每平滑重启一次,就是一次循环。代码
分为几个部分来看:
void ngx_master_process_cycle(ngx_cycle_t *cycle) {
...
if (ngx_process == NGX_PROCESS_WORKER) {
// ngx_process标识进程的身份,如果本进程应该是工作者进程,就去执行工作者应该做的
ngx_worker_process_cycle(cycle, ngx_master_process_event_name);
return;
}
...
SetEnvironmentVariable("ngx_unique", ngx_unique); // 设置环境变量,表示nginx主进程已经运行
...
ngx_master_process_event = CreateEvent(NULL, 1, 0, ngx_master_process_event_name);
if (ngx_master_process_event == NULL) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"CreateEvent(\"%s\") failed",
ngx_master_process_event_name);
exit(2);
}
if (ngx_create_signal_events(cycle) != NGX_OK) {
exit(2);
}
ngx_sprintf((u_char *) ngx_cache_manager_mutex_name,
"ngx_cache_manager_mutex_%s%Z", ngx_unique);
ngx_cache_manager_mutex = CreateMutex(NULL, 0,
ngx_cache_manager_mutex_name);
if (ngx_cache_manager_mutex == NULL) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"CreateMutex(\"%s\") failed", ngx_cache_manager_mutex_name);
exit(2);
}
events[0] = ngx_stop_event;
events[1] = ngx_quit_event;
events[2] = ngx_reopen_event;
events[3] = ngx_reload_event;
ngx_close_listening_sockets(cycle);
if (ngx_start_worker_processes(cycle, NGX_PROCESS_RESPAWN) == 0) {
exit(2);
}
...
}
理解这段代码,需要了解Windows系统的一点点事件相关API,CreateEvent
可以创建一个事件,之后可以通过一些方法
比如SetEvent
可以使得这个事件被激活,进程或者线程也可以通过WaitForSingleObejct
等API去等待一个事件的发
生。这段代码就是创建了一些事件,包括stop
,quit
,reopen
和reload
,这些事件是在ngx_create_signal_events
函数中创建的:
static ngx_int_t
ngx_create_signal_events(ngx_cycle_t *cycle)
{
ngx_sprintf((u_char *) ngx_stop_event_name,
"Global\\ngx_stop_%s%Z", ngx_unique);
ngx_stop_event = CreateEvent(NULL, 1, 0, ngx_stop_event_name);
if (ngx_stop_event == NULL) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"CreateEvent(\"%s\") failed", ngx_stop_event_name);
return NGX_ERROR;
}
ngx_sprintf((u_char *) ngx_quit_event_name,
"Global\\ngx_quit_%s%Z", ngx_unique);
...
}
之后,主进程调用ngx_close_listening_sockets(cycle)
关闭正在侦听的套接字,这样之后的连接就不会进来了,
因为主进程循环肯定是重启或者初始化的时候被调用的。之后调用ngx_start_worker_processes
函数去启动工作者
线程。我们看看ngx_start_worker_process
函数,同样在这个文件里:
static ngx_int_t
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t type)
{
ngx_int_t n;
ngx_core_conf_t *ccf;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
for (n = 0; n < ccf->worker_processes; n++) {
if (ngx_spawn_process(cycle, "worker", type) == NGX_INVALID_PID) {
break;
}
}
return n;
}
这个函数先是读取了本次循环的配置,根据配置中的worker_process
的设置来启动相应数量的工作者进程,配置文件
在/conf/nginx.conf
中:
#user nobody;
worker_processes 8;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 65536;
}
...
当然如果配置文件中没有设置,以及新创建的配置文件中如何设置默认值,这些都在/src/core/nginx.c
中,但是不是
非常重要,所以暂时略过。回归ngx_master_process_cycle
函数,该函数在创建了事件之后,会进入一个死循环:
for ( ;; ) {
nev = 4;
for (n = 0; n < ngx_last_process; n++) {
if (ngx_processes[n].handle) {
events[nev++] = ngx_processes[n].handle;
}
}
if (timer) {
timeout = timer > ngx_current_msec ? timer - ngx_current_msec : 0;
}
ev = WaitForMultipleObjects(nev, events, 0, timeout);
err = ngx_errno;
ngx_time_update();
ngx_log_debug1(NGX_LOG_DEBUG_CORE, cycle->log, 0,
"master WaitForMultipleObjects: %ul", ev);
if (ev == WAIT_OBJECT_0) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
if (ResetEvent(ngx_stop_event) == 0) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"ResetEvent(\"%s\") failed", ngx_stop_event_name);
}
if (timer == 0) {
timer = ngx_current_msec + 5000;
}
ngx_terminate = 1;
ngx_quit_worker_processes(cycle, 0);
continue;
}
if (ev == WAIT_OBJECT_0 + 1) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "shutting down");
if (ResetEvent(ngx_quit_event) == 0) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"ResetEvent(\"%s\") failed", ngx_quit_event_name);
}
ngx_quit = 1;
ngx_quit_worker_processes(cycle, 0);
continue;
}
...
if (ev > WAIT_OBJECT_0 + 3 && ev < WAIT_OBJECT_0 + nev) {
ngx_log_debug0(NGX_LOG_DEBUG_CORE, cycle->log, 0, "reap worker");
live = ngx_reap_worker(cycle, events[ev]);
if (!live && (ngx_terminate || ngx_quit)) {
ngx_master_process_exit(cycle);
}
continue;
}
if (ev == WAIT_TIMEOUT) {
ngx_terminate_worker_processes(cycle);
ngx_master_process_exit(cycle);
}
if (ev == WAIT_FAILED) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
"WaitForMultipleObjects() failed");
continue;
}
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
WaitForMultipleObjects() returned unexpected value %ul", ev);
}
首先介绍WaitForMultipleObjects
,这个函数会等待多个内核对象,可以是事件,也可以是锁,进程等等。这个循环中,每次
循环先添加了ngx_last_process
个进程到了事件数组中,这个ngx_processes
大概是上次循环中使用的进程组。如果定义的
stop
,quit
,reload
,reopen
四种事件触发,分别调用相关函数去关闭或者重启工作者进程。如果是上次循环中使用的进
程死亡,那么就去重启这个进程,调用ngx_reap_worker
函数,这个函数在确认旧的进程已经死亡后,会调用ngx_spawn_process
去重启一个新的进程。ngx_spawn_process
会调用ngx_execute
去开一个新的进程,这部分的细节,放入下一节再讲。这样我们
了解了主进程在启动后,会进入事件处理循环来处理nginx -s
发送的指令以及处理进程组死亡的重启。那么我们看看工作者进程
是做什么的。
我们了解到,主进程调用ngx_start_worker_process
函数根据配置文件启动多个工作者进程,这个函数中调用了ngx_spawn_process
来启动新的工作者进程,那么我们来看看ngx_spawn_process
是如何启动一个新的进程。以下是部分代码(位于/src/os/win32/ngx_process.c):
ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, char *name, ngx_int_t respawn) {
... // 变量定义
// 第一次主循环传入的是NGX_PROCESS_JUST_RESPAWN==-3
if (respawn >= 0) {
s = respawn;
} else {
for (s = 0; s < ngx_last_process; s++) {
if (ngx_processes[s].handle == NULL) {
break;
}
}
if (s == NGX_MAX_PROCESSES) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"no more than %d processes can be spawned",
NGX_MAX_PROCESSES);
return NGX_INVALID_PID;
}
}
// 得到Nginx的文件路径
n = GetModuleFileName(NULL, file, MAX_PATH);
if (n == 0) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"GetModuleFileName() failed");
return NGX_INVALID_PID;
}
file[n] = ‘\0‘;
...
ctx.path = file;
...
pid = ngx_execute(cycle, &ctx); // 创建新进程
...
这部分是先找到ngx_process
中的索引,然后放入一个新的进程,那么我们看看ngx_execute
函数是怎么执行的:
ngx_pid_t ngx_execute(ngx_cycle_t *cycle, ngx_exec_ctx_t *ctx)
{
...
if (CreateProcess(ctx->path, ctx->args,
NULL, NULL, 0, CREATE_NO_WINDOW, NULL, NULL, &si, &pi)
== 0)
{
ngx_log_error(NGX_LOG_CRIT, cycle->log, ngx_errno,
"CreateProcess(\"%s\") failed", ngx_argv[0]);
return 0;
}
ctx->child = pi.hProcess;
if (CloseHandle(pi.hThread) == 0) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"CloseHandle(pi.hThread) failed");
}
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
"start %s process %P", ctx->name, pi.dwProcessId);
return pi.dwProcessId;
}
这个函数通过调用ngx_execute
开启新的进程,并把进程句柄存入了context中,返回pid。创建系统进程之后,会调用
WaitForMultipleObjects
等待两个事件,一个是ngx_master_process_event
,这个事件在主进程循环中定义,另一
个是新开的进程死亡。如果主进程事件触发,那么会使用OpenEvent
设置新进程的事件为以前创建的事件。但是可能是因为
我手头的版本还在开发中,我没有在代码里面找到关于这个手动事件触发的语句。另一个事件是新进程的死亡,如果该事件
被触发,就会执行一些清理代码(杀进程等等)。
但是我们发现,CreateProcess
里面只是新启动了一个nginx
,那么这个新启动的nginx
进程为什么会成为工作者进程呢?
还记得main
函数中有做针对操作系统的初始化os_init
,这个函数在win32的实现中有一部分代码如下:
ngx_int_t
ngx_os_init(ngx_log_t *log)
{
...
if (GetEnvironmentVariable("ngx_unique", ngx_unique, NGX_INT32_LEN + 1)
!= 0)
{
ngx_process = NGX_PROCESS_WORKER;
}
...
而ngx_process
正是决定了一个主进程循环变更为工作者进程的条件:
void ngx_master_process_cycle(ngx_cycle_t *cycle) {
...
if (ngx_process == NGX_PROCESS_WORKER) {
// ngx_process标识每个进程的身份,如果本进程应该是工作者进程,就去执行工作者应该做的
ngx_worker_process_cycle(cycle, ngx_master_process_event_name);
return;
}
...
SetEnvironmentVariable("ngx_unique", ngx_unique); // 设置环境变量,表示nginx主进程已经运行
那么我们来看看这个工作者进程的主循环:
static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, char *mevn)
{
... // 变量定义
log = cycle->log;
ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, 0, "worker started");
ngx_sprintf((u_char *) wtevn, "ngx_worker_term_%P%Z", ngx_pid);
events[0] = CreateEvent(NULL, 1, 0, wtevn);
if (events[0] == NULL) {
ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
"CreateEvent(\"%s\") failed", wtevn);
goto failed;
}
ngx_sprintf((u_char *) wqevn, "ngx_worker_quit_%P%Z", ngx_pid);
events[1] = CreateEvent(NULL, 1, 0, wqevn);
if (events[1] == NULL) {
ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
"CreateEvent(\"%s\") failed", wqevn);
goto failed;
}
ngx_sprintf((u_char *) wroevn, "ngx_worker_reopen_%P%Z", ngx_pid);
events[2] = CreateEvent(NULL, 1, 0, wroevn);
if (events[2] == NULL) {
ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
"CreateEvent(\"%s\") failed", wroevn);
goto failed;
}
mev = OpenEvent(EVENT_MODIFY_STATE, 0, mevn);
if (mev == NULL) {
ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
"OpenEvent(\"%s\") failed", mevn);
goto failed;
}
if (SetEvent(mev) == 0) {
ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
"SetEvent(\"%s\") failed", mevn);
goto failed;
}
ngx_sprintf((u_char *) ngx_cache_manager_mutex_name,
"ngx_cache_manager_mutex_%s%Z", ngx_unique);
ngx_cache_manager_mutex = OpenMutex(SYNCHRONIZE, 0,
ngx_cache_manager_mutex_name);
if (ngx_cache_manager_mutex == NULL) {
ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
"OpenMutex(\"%s\") failed", ngx_cache_manager_mutex_name);
goto failed;
}
ngx_cache_manager_event = CreateEvent(NULL, 1, 0, NULL);
if (ngx_cache_manager_event == NULL) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"CreateEvent(\"ngx_cache_manager_event\") failed");
goto failed;
}
...
最开始仍然是创建了一些事件,目前可以看到是有一些是通知工作者进程重启或者关闭的,还有一个是用来通知事件
修改状态的,而且马上激活了这个事件。然后拿到cache_manage
的互斥锁的句柄,创建了ngx_cache_manager_event
事件,这个事件是命令缓存管理线程退出的,后面函数体中会讲到。之后,工作者进程启动了3个主要线程,分别是
工作者线程、缓存管理线程、缓存加载线程:
...
if (ngx_create_thread(&wtid, ngx_worker_thread, NULL, log) != 0) {
goto failed;
}
if (ngx_create_thread(&cmtid, ngx_cache_manager_thread, NULL, log) != 0) {
goto failed;
}
if (ngx_create_thread(&cltid, ngx_cache_loader_thread, NULL, log) != 0) {
goto failed;
}
...
启动后工作进程的主线程会进入一个事件处理循环:
...
for ( ;; ) {
ev = WaitForMultipleObjects(3, events, 0, INFINITE);
err = ngx_errno;
ngx_time_update();
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"worker WaitForMultipleObjects: %ul", ev);
if (ev == WAIT_OBJECT_0) {
ngx_terminate = 1;
ngx_log_error(NGX_LOG_NOTICE, log, 0, "exiting");
if (ResetEvent(events[0]) == 0) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"ResetEvent(\"%s\") failed", wtevn);
}
break;
}
if (ev == WAIT_OBJECT_0 + 1) {
ngx_quit = 1;
ngx_log_error(NGX_LOG_NOTICE, log, 0, "gracefully shutting down");
break;
}
if (ev == WAIT_OBJECT_0 + 2) {
ngx_reopen = 1;
ngx_log_error(NGX_LOG_NOTICE, log, 0, "reopening logs");
if (ResetEvent(events[2]) == 0) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"ResetEvent(\"%s\") failed", wroevn);
}
continue;
}
if (ev == WAIT_FAILED) {
ngx_log_error(NGX_LOG_ALERT, log, err,
"WaitForMultipleObjects() failed");
goto failed;
}
}
...
这个事件循环会处理以下3个事件,如果是重新开启会设置重启位置(可能之后会有处理)拿掉消息,并继续循环如果是
终止或者退出就会跳出循环,则会设置标志后跳出循环,如果调用失败会进入失败处理:
...
/* wait threads */
if (SetEvent(ngx_cache_manager_event) == 0) {
ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
"SetEvent(\"ngx_cache_manager_event\") failed");
}
events[1] = wtid;
events[2] = cmtid;
nev = 3;
for ( ;; ) {
ev = WaitForMultipleObjects(nev, events, 0, INFINITE);
err = ngx_errno;
ngx_time_update();
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"worker exit WaitForMultipleObjects: %ul", ev);
if (ev == WAIT_OBJECT_0) {
break;
}
if (ev == WAIT_OBJECT_0 + 1) {
if (nev == 2) {
break;
}
events[1] = events[2];
nev = 2;
continue;
}
if (ev == WAIT_OBJECT_0 + 2) {
nev = 2;
continue;
}
if (ev == WAIT_FAILED) {
ngx_log_error(NGX_LOG_ALERT, log, err,
"WaitForMultipleObjects() failed");
break;
}
}
ngx_close_handle(ngx_cache_manager_event);
ngx_close_handle(events[0]);
ngx_close_handle(events[1]);
ngx_close_handle(events[2]);
ngx_close_handle(mev);
ngx_worker_process_exit(cycle);
failed:
exit(2);
}
这部分代码是在处理自己的死亡,先发送信息让缓存管理线程死亡。将终止(终止和退出的含义是有区别的,终止是很暴力的概念,而退
出就平稳了很多)事件和工作者线程和缓存管理线程的id都放入WaitForMultipleObjects
的列表中,等待自己开启的线程死亡。当然
里面做了一个小小的处理,使得WaitForMultipleObjects
收到某个线程的消息后,不会再去等另外一个,而且等到两个都结束后才会
跳出等待,执行会面自己的死亡处理。但是这里有个疑问,为什么不等待缓存加载线程呢?于是看看缓存加载线程干了什么:
static ngx_thread_value_t __stdcall
ngx_cache_loader_thread(void *data)
{
ngx_uint_t i;
ngx_path_t **path;
ngx_cycle_t *cycle;
ngx_msleep(60000);
cycle = (ngx_cycle_t *) ngx_cycle;
path = cycle->paths.elts;
for (i = 0; i < cycle->paths.nelts; i++) {
if (ngx_terminate || ngx_quit || ngx_exiting) {
break;
}
if (path[i]->loader) {
path[i]->loader(path[i]->data);
ngx_time_update();
}
}
return 0;
}
很短的一点代码,对每个路径调用它的缓存加载函数(在后面的http模块中看到了),如果这个过程中遇到了终止或者结束标志就直接退出。
缓存管理的代码如下:
static void
ngx_cache_manager_process_handler(void)
{
u_long ev;
ngx_uint_t i;
ngx_msec_t next, n;
ngx_path_t **path;
next = 60 * 60 * 1000;
path = ngx_cycle->paths.elts;
for (i = 0; i < ngx_cycle->paths.nelts; i++) {
if (path[i]->manager) {
n = path[i]->manager(path[i]->data);
next = (n <= next) ? n : next;
ngx_time_update();
}
}
if (next == 0) {
next = 1;
}
ev = WaitForSingleObject(ngx_cache_manager_event, (u_long) next);
if (ev != WAIT_TIMEOUT) {
ngx_time_update();
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
"cache manager WaitForSingleObject: %ul", ev);
}
}
这个线程代码也类似,调用所有path的缓存管理函数,之后会去等待让他死亡的信号。关于缓存这部分不在本篇文章中涉及,后续文章中
可能会专门去讲。于是我们关注一下比较关键的工作者线程。
工作者线程的代码如下:
static ngx_thread_value_t __stdcall
ngx_worker_thread(void *data)
{
ngx_int_t n;
ngx_time_t *tp;
ngx_cycle_t *cycle;
tp = ngx_timeofday();
srand((ngx_pid << 16) ^ (unsigned) tp->sec ^ tp->msec);
cycle = (ngx_cycle_t *) ngx_cycle;
// 加载所有模块的初始化函数
for (n = 0; cycle->modules[n]; n++) {
if (cycle->modules[n]->init_process) {
if (cycle->modules[n]->init_process(cycle) == NGX_ERROR) {
/* fatal */
exit(2);
}
}
}
while (!ngx_quit) {
if (ngx_exiting) {
// 退出的处理
ngx_event_cancel_timers();
if (ngx_event_timer_rbtree.root
== ngx_event_timer_rbtree.sentinel)
{
break;
}
}
ngx_log_debug0(NGX_LOG_DEBUG_CORE, cycle->log, 0, "worker cycle");
// 处理IO事件
ngx_process_events_and_timers(cycle);
if (ngx_terminate) {
// 暴力退出
return 0;
}
if (ngx_quit) {
ngx_quit = 0;
if (!ngx_exiting) {
ngx_exiting = 1;
ngx_close_listening_sockets(cycle);
ngx_close_idle_connections(cycle);
}
}
if (ngx_reopen) {
ngx_reopen = 0;
ngx_reopen_files(cycle, -1);
}
}
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
return 0;
}
工作者线程显示加载所有模块的初始化函数,然后循环调用IO事件的处理函数,由于工作者线程对于性能要求极高,所以该线程函数里面完全
没有系统调用,退出等等的处理都是通过进程共享的标识变量来处理的,避免了频繁内核态和用户态的切换引起的开销。由于标识变量的操作
都是在主线程中处理的,所以也不需要加锁。因为IO事件处理函数只会处理一个IO事件,所以这个for循环的频率非常高。于是我们看看IO事件
处理函数(位于/src/event/nginx_event.c):
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
#if (NGX_WIN32)
/* handle signals from master in case of network inactivity */
if (timer == NGX_TIMER_INFINITE || timer > 500) {
timer = 500;
}
#endif
}
if (ngx_use_accept_mutex) {
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
} else {
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
delta = ngx_current_msec;
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
if (delta) {
ngx_event_expire_timers();
}
ngx_event_process_posted(cycle, &ngx_posted_events);
}
IO事件处理函数先是做了一些时间的处理,暂时略过,然后看一下事件处理函数,(void) ngx_process_events(cycle, timer, flags);
这条语句调用的实际是已经加载的IO模块的处理函数。nginx
是支持很多种IO模型的,多路复用,信号驱动IO等等,但是这篇主要介绍一个
Nginx 1.11
之后的版本才开始考虑加入的超高性能IO模型,该模型表现非常优异,也是著名的node.js
项目在windows下的基础IO模型。
Linux下没有完美的异步IO模型,read
,select
,poll
,epoll
,pselect
,kqueue
实质都是应用程序同步轮询,即使内核告诉你已经
有设备就绪,也要应用程序自己去循环读取文件描述符状态,另外也需要自己去从内核态把缓冲区复制出来,而且对多线程不友好,以至于很多linux
网络库使用线程间通讯来模拟异步的IO事件(如Glibc的AIO只是将IO操作分到了多个线程上)。而IOCP则是几乎完美的解决方案,内核托管线程
池去处理IO,复制缓冲区也是内核处理,直到数据到达,应用程序线程才会被唤醒。
相比循环判断文件描述符,IOCP省去了大量循环时间;相比应用程序自己去内核复制缓冲区的系统调用,IOCP省去了大量系统调用的时间;IOCP自己
本身就可以把IO分配到多个进程,这是在内核里面做的,相比应用程序用线程间通信模拟,节省了大量系统调用和锁机制/信号机制导致的开销,nginx
中新加入的IOCP模块中处理IO事件的方法如下(位于/src/event/modules/ngx_iocp_module.c):
static
ngx_int_t ngx_iocp_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
ngx_uint_t flags)
{
int rc;
u_int key;
u_long bytes;
ngx_err_t err;
ngx_msec_t delta;
ngx_event_t *ev;
ngx_event_ovlp_t *ovlp;
if (timer == NGX_TIMER_INFINITE) {
timer = INFINITE;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "iocp timer: %M", timer);
// 获取一个完成的端口
rc = GetQueuedCompletionStatus(iocp, &bytes, (PULONG_PTR) &key,
(LPOVERLAPPED *) &ovlp, (u_long) timer);
if (rc == 0) {
err = ngx_errno;
} else {
err = 0;
}
delta = ngx_current_msec;
if (flags & NGX_UPDATE_TIME) {
ngx_time_update();
}
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"iocp: %d b:%d k:%d ov:%p", rc, bytes, key, ovlp);
if (timer != INFINITE) {
delta = ngx_current_msec - delta;
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"iocp timer: %M, delta: %M", timer, delta);
}
if (err) {
if (ovlp == NULL) {
if (err != WAIT_TIMEOUT) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
"GetQueuedCompletionStatus() failed");
return NGX_ERROR;
}
return NGX_OK;
}
ovlp->error = err;
}
if (ovlp == NULL) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"GetQueuedCompletionStatus() returned no operation");
return NGX_ERROR;
}
// 这个event是AcceptEx函数注册上去的
ev = ovlp->event;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, err, "iocp event:%p", ev);
if (err == ERROR_NETNAME_DELETED /* the socket was closed */
|| err == ERROR_OPERATION_ABORTED /* the operation was canceled */)
{
/*
* the WSA_OPERATION_ABORTED completion notification
* for a file descriptor that was closed
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, err,
"iocp: aborted event %p", ev);
return NGX_OK;
}
if (err) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
"GetQueuedCompletionStatus() returned operation error");
}
switch (key) {
case NGX_IOCP_ACCEPT:
if (bytes) {
ev->ready = 1;
}
break;
case NGX_IOCP_IO:
ev->complete = 1;
ev->ready = 1;
break;
case NGX_IOCP_CONNECT:
ev->ready = 1;
}
ev->available = bytes;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"iocp event handler: %p", ev->handler);
ev->handler(ev);
return NGX_OK;
}
在这个事件处理函数中,IOCP模块先调用GetQueuedCompletionStatus
来获取一个完成的端口,并设置了超时时间,在win32下,
超时时间被设置为最多500.然后判断完成的端口,如果是空,那么返回,如果出现一些错误,就进行错误处理。然后判断是哪种IO
完成,并对其作出分别的处理,对ev
的标识进行标记,那么这个ev
是哪里来的呢,ev = ovlp->event;
,而ovelapeed
重叠
IO的结构,是可以自己定义的,最后的语句,也是调用了这个自己定义的ev
的handler
函数去处理这个异步IO。而注册新的IO
端口,是在IOCP模块的添加事件接口中:
static ngx_int_t
ngx_iocp_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t key)
{
ngx_connection_t *c;
c = (ngx_connection_t *) ev->data;
c->read->active = 1;
c->write->active = 1;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"iocp add: fd:%d k:%ui ov:%p", c->fd, key, &ev->ovlp);
if (CreateIoCompletionPort((HANDLE) c->fd, iocp, key, 0) == NULL) {
ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
"CreateIoCompletionPort() failed");
return NGX_ERROR;
}
return NGX_OK;
}
但是要了解ev
的handler
等等到底是哪里定义的,还得看看event
进程的初始化函数(位于/src/ngx_event.c):
static ngx_int_t
ngx_event_process_init(ngx_cycle_t *cycle)
{
...
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module);
// 如果有多个用户进程就考虑使用Accept锁
if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) {
ngx_use_accept_mutex = 1;
ngx_accept_mutex_held = 0;
ngx_accept_mutex_delay = ecf->accept_mutex_delay;
} else {
ngx_use_accept_mutex = 0;
}
#if (NGX_WIN32)
/*
* disable accept mutex on win32 as it may cause deadlock if
* grabbed by a process which can‘t accept connections
*/
ngx_use_accept_mutex = 0;
#endif
// 初始化投递事件队列
ngx_queue_init(&ngx_posted_accept_events);
ngx_queue_init(&ngx_posted_events);
if (ngx_event_timer_init(cycle->log) == NGX_ERROR) {
return NGX_ERROR;
}
// 寻找加载的event模块并调用其初始化函数
for (m = 0; cycle->modules[m]; m++) {
if (cycle->modules[m]->type != NGX_EVENT_MODULE) {
continue;
}
if (cycle->modules[m]->ctx_index != ecf->use) {
continue;
}
module = cycle->modules[m]->ctx;
if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
/* fatal */
exit(2);
}
break;
}
...
// 为所有连接分配空间
cycle->connections =
ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
if (cycle->connections == NULL) {
return NGX_ERROR;
}
c = cycle->connections;
// 为读事件分配空间
cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
cycle->log);
if (cycle->read_events == NULL) {
return NGX_ERROR;
}
// 初始化读事件
rev = cycle->read_events;
for (i = 0; i < cycle->connection_n; i++) {
rev[i].closed = 1;
rev[i].instance = 1;
}
// 为写事件分配空间
cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
cycle->log);
if (cycle->write_events == NULL) {
return NGX_ERROR;
}
// 初始化写事件
wev = cycle->write_events;
for (i = 0; i < cycle->connection_n; i++) {
wev[i].closed = 1;
}
// 初始化所有连接
i = cycle->connection_n;
next = NULL;
do {
i--;
c[i].data = next;
c[i].read = &cycle->read_events[i];
c[i].write = &cycle->write_events[i];
c[i].fd = (ngx_socket_t) -1;
next = &c[i];
} while (i);
// 连接复用
cycle->free_connections = next;
cycle->free_connection_n = cycle->connection_n;
// 初始化侦听的套接字
/* for each listening socket */
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
#if (NGX_HAVE_REUSEPORT)
if (ls[i].reuseport && ls[i].worker != ngx_worker) {
continue;
}
#endif
c = ngx_get_connection(ls[i].fd, cycle->log);
if (c == NULL) {
return NGX_ERROR;
}
c->type = ls[i].type;
c->log = &ls[i].log;
c->listening = &ls[i];
ls[i].connection = c;
rev = c->read;
rev->log = c->log;
rev->accept = 1;
#if (NGX_HAVE_DEFERRED_ACCEPT)
rev->deferred_accept = ls[i].deferred_accept;
#endif
if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
if (ls[i].previous) {
/*
* delete the old accept events that were bound to
* the old cycle read events array
*/
old = ls[i].previous->connection;
if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT)
== NGX_ERROR)
{
return NGX_ERROR;
}
old->fd = (ngx_socket_t) -1;
}
}
#if (NGX_WIN32)
if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
// 关键的IOCP处理代码
ngx_iocp_conf_t *iocpcf;
rev->handler = ngx_event_acceptex;
if (ngx_use_accept_mutex) {
continue;
}
// 侦听端口继续放入accept事件
if (ngx_add_event(rev, 0, NGX_IOCP_ACCEPT) == NGX_ERROR) {
return NGX_ERROR;
}
ls[i].log.handler = ngx_acceptex_log_error;
iocpcf = ngx_event_get_conf(cycle->conf_ctx, ngx_iocp_module);
if (ngx_event_post_acceptex(&ls[i], iocpcf->post_acceptex)
== NGX_ERROR)
{
return NGX_ERROR;
}
} else {
...
}
#else
...
#endif
if (ngx_use_accept_mutex) {
continue;
}
...
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
#endif
}
return NGX_OK;
}
这个函数先是调用了`event`模块的初始化,然后为投递事件跌了、连接、读写事件分配了空间,然后我们分析下和IOCP最相关的部分,
初始化函数为每个侦听的连接添加了`NGX_IOCP_ACCEPT`事件,并设置事件处理函数为`ngx_event_acceptex`这样当ACCEPT事件发生,
侦听端口会继续去侦听,该函数定义如下:
```c
void
ngx_event_acceptex(ngx_event_t *rev)
{
// 对新连入的socket进行一些设置
...
// 给侦听端口投递下一次accept
ngx_event_post_acceptex(ls, 1);
// 原子操作+1
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
// 处理新连入的连接,该函数由上层模块定义,如http,mail等
// handler还会给新连入的connection加上读写的handler
// 之后IO事件循环将会调用
ls->handler(c);
return;
}
<div class="se-preview-section-delimiter"></div>
大家都知道tcp侦听后,是会返回一个新的套接字,该函数对连接上面的套接字使用了原子加一操作,该函数在不同架构下有不同实现,在
amd64平台下的实现为:
static ngx_inline ngx_atomic_int_t
ngx_atomic_fetch_add(ngx_atomic_t *value, ngx_atomic_int_t add)
{
__asm__ volatile (
NGX_SMP_LOCK
" xaddq %0, %1; "
: "+r" (add) : "m" (*value) : "cc", "memory");
return add;
}
<div class="se-preview-section-delimiter"></div>
纯汇编指令实现,性能非常高效。然后看看ngx_event_post_acceptex
函数:
ngx_int_t
ngx_event_post_acceptex(ngx_listening_t *ls, ngx_uint_t n)
{
...
for (i = 0; i < n; i++) {
/* TODO: look up reused sockets */
s = ngx_socket(ls->sockaddr->sa_family, ls->type, 0);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, &ls->log, 0,
ngx_socket_n " s:%d", s);
if (s == (ngx_socket_t) -1) {
ngx_log_error(NGX_LOG_ALERT, &ls->log, ngx_socket_errno,
ngx_socket_n " failed");
return NGX_ERROR;
}
// 复用一个连接
c = ngx_get_connection(s, &ls->log);
if (c == NULL) {
return NGX_ERROR;
}
// 为新的连接分配空间
c->pool = ngx_create_pool(ls->pool_size, &ls->log);
if (c->pool == NULL) {
ngx_close_posted_connection(c);
return NGX_ERROR;
}
log = ngx_palloc(c->pool, sizeof(ngx_log_t));
if (log == NULL) {
ngx_close_posted_connection(c);
return NGX_ERROR;
}
c->buffer = ngx_create_temp_buf(c->pool, ls->post_accept_buffer_size
+ 2 * (ls->socklen + 16));
if (c->buffer == NULL) {
ngx_close_posted_connection(c);
return NGX_ERROR;
}
c->local_sockaddr = ngx_palloc(c->pool, ls->socklen);
if (c->local_sockaddr == NULL) {
ngx_close_posted_connection(c);
return NGX_ERROR;
}
c->sockaddr = ngx_palloc(c->pool, ls->socklen);
if (c->sockaddr == NULL) {
ngx_close_posted_connection(c);
return NGX_ERROR;
}
*log = ls->log;
c->log = log;
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
c->listening = ls;
rev = c->read;
wev = c->write;
// 设置这个连接的overlapped结构体中的事件
rev->ovlp.event = rev;
wev->ovlp.event = wev;
rev->handler = ngx_event_acceptex;
rev->ready = 1;
wev->ready = 1;
rev->log = c->log;
wev->log = c->log;
// 新增到IOCP的IO事件中
if (ngx_add_event(rev, 0, NGX_IOCP_IO) == NGX_ERROR) {
ngx_close_posted_connection(c);
return NGX_ERROR;
}
// 调用AcceptEx来接受连接
if (ngx_acceptex(ls->fd, s, c->buffer->pos, ls->post_accept_buffer_size,
ls->socklen + 16, ls->socklen + 16,
&rcvd, (LPOVERLAPPED) &rev->ovlp)
== 0)
{
err = ngx_socket_errno;
if (err != WSA_IO_PENDING) {
ngx_log_error(NGX_LOG_ALERT, &ls->log, err,
"AcceptEx() %V failed", &ls->addr_text);
ngx_close_posted_connection(c);
return NGX_ERROR;
}
}
}
return NGX_OK;
}
<div class="se-preview-section-delimiter"></div>
ngx_acceptex
函数用已经分配好的空间,让内核去accept
一个连接,并把相关信息放入该结构体中。这就有了IOCP
事件处理循环中的处理。
switch (key) {
case NGX_IOCP_ACCEPT:
if (bytes) {
ev->ready = 1;
}
break;
case NGX_IOCP_IO:
ev->complete = 1;
ev->ready = 1;
break;
case NGX_IOCP_CONNECT:
ev->ready = 1;
}
ev->available = bytes;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"iocp event handler: %p", ev->handler);
ev->handler(ev);
return NGX_OK;
<div class="se-preview-section-delimiter"></div>
ev就是acceptex
中传入的overlapped
结构中的event
,也就是nginx
事件模型中定义的事件结构。这个结构也被其他高层
模块所使用(比如http
)。在调用相应处理函数处理了IO事件后返回。最后回到event
事件循环中的处理:
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
if (delta) {
ngx_event_expire_timers();
}
ngx_event_process_posted(cycle, &ngx_posted_events);
}
在调用IO事件处理函数之后,会调用ngx_event_process_posted
函数,这个函数只是简单的将传入的队列里面的所有事件调用一遍相应
的回调函数。然后就会进入下一次IO事件循环。
写了这么多,也只是在nginx
启动和IOCP
模型中做了一些窥探,实际使用event和iocp都是非常复杂的事情,需要处理大量细节,尤其使用
c之类需要手动管理内存的语言,需要加上大量监控标记等等,来防止内存泄露。nginx
作为世界知名的项目,结构和架构的复杂度,也是远远
超过我所窥探的部分,要想深入研究理解,还得多加努力。最后附两张图,帮助理解:
(IOCP流程图转载自:http://blog.csdn.net/piggyxp/article/details/6922277)
标签:mutex lang 复制 同步 sign 变量定义 消息 open lib
原文地址:http://blog.csdn.net/inszva/article/details/53899245