我们经常在不知不觉间就说到或使用并发,但从未深入思考并发。我们经常能“遇见”并发,因为并发不仅仅是操作系统内核的“绝招”,它也是应用开发中必不可少的技巧:
现代操作系统提供三种基本方法构建上述这些并发程序:
构建并发程序的最简单方式就是使用进程。每接收到一个连接,父进程都会fork出一个子进程。因为子进程拷贝了父进程的文件描述符表,所以使用进程构建并发时一定要避免资源泄露,尤其是父进程。子进程一定要关闭监听描述符listenfd,这个描述符在子进程中是没有任何用处的。而同样的,父进程要负责关闭连接描述符connfd,同理,这个描述符在父进程中也是一点用处都没有。
下面是一个使用进程构建并发Web服务器的小例子,用telnet localhost 7777连接上后能看到服务端发送过来的”helloworld”欢迎语。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
void sigchld_handler(int sig);
int open_serverfd(int port);
int main(int argc, char const *argv[])
{
int listenfd, connfd;
socklen_t clientlen = sizeof(struct sockaddr_in);
struct sockaddr_in clientaddr;
// 1.Register signal handler
signal(SIGCHLD, sigchld_handler);
// 2.Open socket decriptor
listenfd = open_serverfd(7777);
printf("Main: start...\n");
// 3.Start service loop
while(1) {
connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clientlen);
printf("Main: accept client %s\n", inet_ntoa(clientaddr.sin_addr));
if (fork() == 0) {
// Child process flow:
// 1.Close parent resource
close(listenfd);
// 2.Individule logic
char buf[11] = "helloworld";
write(connfd, buf, strlen(buf));
// 3.Close own resource
close(connfd);
exit(0);
}
close(connfd);
}
return 0;
}
void sigchld_handler(int sig)
{
while(waitpid(-1, 0, WNOHANG) > 0)
;
}
int open_serverfd(int port)
{
int listenfd, optval = 1;
struct sockaddr_in sockaddr;
// 1.Create socket address
memset(&sockaddr, 0, sizeof(sockaddr));
sockaddr.sin_family = AF_INET;
sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
sockaddr.sin_port = htons(port);
// 2.Create socket of specific protocal
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
fprintf(stderr, "Error: %s\n", strerror(errno));
return -1;
}
// 3.Eliminates "Address already in use" error from bind
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,
&optval, sizeof(int)) < 0) {
fprintf(stderr, "Error: %s\n", strerror(errno));
return -2;
}
// 3.Bind socket and address
if (bind(listenfd, (struct sockaddr *)&sockaddr, sizeof(sockaddr)) < 0) {
fprintf(stderr, "Error: %s\n", strerror(errno));
return -3;
}
// 4.Start to listen for connection requests with backlog queue 10
if (listen(listenfd, 10) < 0) {
fprintf(stderr, "Error: %s\n", strerror(errno));
return -4;
}
return listenfd;
}
基于进程的并发的优点就是模型清晰,子进程的虚拟地址空间独立,不会互相干扰。但这也是它的缺点,如果要共享数据则必须使用IPC(interprocess communication,广义上讲Socket、信号、waitpid()都算,但一般IPC指pipe管道、FIFO队列、System V共享内存、信号量等)机制。此外,由于进程控制和IPC的开销都很高,所以基于进程的方式可能会很慢。
SIGCHLD信号
如果父进程不等待子进程结束,子进程将成为僵尸进程(zombie process)从而占用系统资源,因此上面的例子中我们注册了SIGCHLD信号的处理函数。
“The SIGCHLD signal is sent to the parent of a child process when it exits, is interrupted, or resumes after being interrupted. By default the signal is simply ignored.” - Wikipedia
这里有一个小技巧。如果父进程等待子进程结束,将增加父进程的负担,影响服务器进程的并发性能。Web服务器为了保证高性能,一般会将此信号的处理方式设为忽略,让内核把僵尸子进程转交给init进程去处理。忽略设置方式为:signal(SIGCHLD,SIG_IGN);
现在对我们的Web服务器进行升级,让它不仅能够处理远程客户端的连接请求,写回helloworld欢迎语。同时它还能响应当前命令行中的用户输入。那么,我们应该先等待哪种事件?实际上先等待谁都不是最理想的,因为阻塞地等待一个必然导致无法响应另一个。解决这种困境的技术就是I/O多路复用。
我们用select()函数请求内核挂起当前进程,只有当我们关心的I/O事件发生时再将控制返回给我们的应用程序。下面就是select()函数的原型及操作描述符集合的宏。我们主要关注的就是select()的前两个参数:基数n(cardinality,集合中最大描述符的值加1)和描述符集合(fdset or read set)。
#include <unistd.h>
#include <sys/types.h>
// Returns nonzero count of ready descriptors, ?1 on error
int select(int n, fd_set *fdset, NULL, NULL, NULL);
// Macros for manipulating descriptor sets
FD_ZERO(fd_set *fdset); /* Clear all bits in fdset */
FD_CLR(int fd, fd_set *fdset); /* Clear bit fd in fdset */
FD_SET(int fd, fd_set *fdset); /* Turn on bit fd in fdset */
FD_ISSET(int fd, fd_set *fdset); /* Is bit fd in fdset on? */
select()函数实际很复杂。下面先看一段代码,结合实际代码来理解select()和I/O多路复用是如何使用的。
下面就是升级后的服务端代码,利用I/O多路复用,我们既能响应远程客户端请求,又能响应命令行输入。注意两点:一是使用I/O多路复用后,我们不再直接调用accept()函数,而是阻塞在select()函数的调用上。二是select()返回后我们要用if逐一判断各个描述符,因为可能有多个描述符的事件发生了,所以不能用if-else做判断。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/select.h>
int open_serverfd(int port);
int main(int argc, char const *argv[])
{
int listenfd, connfd;
socklen_t clientlen = sizeof(struct sockaddr_in);
struct sockaddr_in clientaddr;
fd_set readset, readyset;
// 1.Open socket decriptor
listenfd = open_serverfd(7777);
printf("Main: start...\n");
// 2.Prepare descriptor set
FD_ZERO(&readyset);
FD_SET(STDIN_FILENO, &readset);
FD_SET(listenfd, &readset);
// 3.Start service loop
while(1) {
readyset = readset;
select(listenfd+1, &readyset, NULL, NULL, NULL);
if (FD_ISSET(STDIN_FILENO, &readyset)) {
char buf[100];
if (!fgets(buf, 100, stdin))
exit(0);
if (!strncmp(buf, "quit", 4))
exit(0);
printf("%s\n", buf);
}
if (FD_ISSET(listenfd, &readyset)) {
connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clientlen);
printf("Main: accept client %s\n", inet_ntoa(clientaddr.sin_addr));
char buf[11] = "helloworld";
write(connfd, buf, strlen(buf));
close(connfd);
}
}
return 0;
}
int open_serverfd(int port)
{
int listenfd, optval = 1;
struct sockaddr_in sockaddr;
// 1.Create socket address
memset(&sockaddr, 0, sizeof(sockaddr));
sockaddr.sin_family = AF_INET;
sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
sockaddr.sin_port = htons(port);
// 2.Create socket of specific protocal
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
fprintf(stderr, "Error: %s\n", strerror(errno));
return -1;
}
// 3.Eliminates "Address already in use" error from bind
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,
&optval, sizeof(int)) < 0) {
fprintf(stderr, "Error: %s\n", strerror(errno));
return -2;
}
// 3.Bind socket and address
if (bind(listenfd, (struct sockaddr *)&sockaddr, sizeof(sockaddr)) < 0) {
fprintf(stderr, "Error: %s\n", strerror(errno));
return -3;
}
// 4.Start to listen for connection requests with backlog queue 10
if (listen(listenfd, 10) < 0) {
fprintf(stderr, "Error: %s\n", strerror(errno));
return -4;
}
return listenfd;
}
FD_ZERO()清空readset:
listenfd | stdin | ||
---|---|---|---|
3 | 2 | 1 | 0 |
0 | 0 | 0 | 0 |
FD_SET()设置readset:
listenfd | stdin | ||
---|---|---|---|
3 | 2 | 1 | 0 |
1 | 0 | 0 | 1 |
假设控制台有输入,则select()“苏醒”并返回readset:
listenfd | stdin | ||
---|---|---|---|
3 | 2 | 1 | 0 |
0 | 0 | 0 | 1 |
此时我们用FD_ISSET()就能检测到stdin已准备好读。
一切看起来都很完美,但仔细看一下select()的循环就会发现问题。如果远程客户端发送连接请求,select()返回进入客户端处理逻辑。因为这段逻辑是同步执行的,所以 在select()函数返回到处理完客户端请求的这段时间内,我们没有再次调用select(),因此这段时间内我们是无法响应的。
解决上面问题的方法就是在更细的粒度上多路复用I/O。基本思想是:将I/O多路复用作为事件驱动程序的基础框架,将程序的整个执行流程划分为不同的事件,在多路复用机制上构建起状态机。我们通常用有向图代表状态机,用结点代表状态,用有向弧代表状态的迁移,用弧上标签文字代表促使迁移的输入事件。每个输入事件都触发了从当前状态到下一状态的迁移。
以下面例子为例,当接收到新客户端连接时,为其创建一个状态机,并关联到已连接的描述符connfd上。当connfd准备好读写时,我们写回欢迎语,完成状态的转换。所以与前面第一版的例子有些区别,当我们用telnet连接上时,要敲入一个字符才能看到欢迎语。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/select.h>
typedef struct {
int maxfd;
fd_set readset;
fd_set readyset;
int nready;
int maxi;
int clientfd[FD_SETSIZE];
} pool;
void init_pool(int listenfd, pool *p);
void add_client(int connfd, pool *p);
void check_clients(pool *p);
int open_serverfd(int port);
int main(int argc, char const *argv[])
{
int listenfd, connfd;
socklen_t clientlen = sizeof(struct sockaddr_in);
struct sockaddr_in clientaddr;
static pool pool;
listenfd = open_serverfd(7777);
init_pool(listenfd, &pool);
printf("Main: start...\n");
while(1) {
// 1.Wait for listening/connected descriptor ready
pool.readyset = pool.readset;
pool.nready = select(pool.maxfd+1, &pool.readyset, NULL, NULL, NULL);
// 2.Add new client descriptor to pool if listening descriptor ready
if (FD_ISSET(listenfd, &pool.readyset)) {
connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clientlen);
add_client(connfd, &pool);
}
// 3.Echo welcome text if connected descriptor ready
check_clients(&pool);
}
return 0;
}
void init_pool(int listenfd, pool *p)
{
int i;
// 1.Initially, there‘re no connected descriptor
p->maxi = -1;
for (i = 0; i < FD_SETSIZE; ++i)
p->clientfd[i] = -1;
// 2.Initially, listening descriptor is only member of read set
p->maxfd = listenfd;
FD_ZERO(&p->readset);
FD_SET(listenfd, &p->readset);
}
void add_client(int connfd, pool *p)
{
int i;
p->nready--;
for (i = 0; i < FD_SETSIZE; ++i) {
if (p->clientfd[i] < 0) {
// 1.Add connected descriptor to the pool
p->clientfd[i] = connfd;
// 2.Add it to descriptor set
FD_SET(connfd, &p->readset);
// 3.Update max descriptor
if (connfd > p->maxfd)
p->maxfd = connfd;
if (i > p->maxi)
p->maxi = i;
break;
}
}
}
void check_clients(pool *p)
{
int i, connfd;
for (i = 0; i <= p->maxi && p->nready > 0; ++i) {
connfd = p->clientfd[i];
if (connfd > 0 && FD_ISSET(connfd, &p->readyset)) {
p->nready--;
char buf[11] = "helloworld";
write(connfd, buf, strlen(buf));
close(connfd);
FD_CLR(connfd, &p->readset);
p->clientfd[i] = -1;
}
}
}
int open_serverfd(int port) { ... }
下面扩展一下知识面,围绕I/O多路复用,介绍一些周边的、但很重要的知识。主要参考资料有:
select最早出现于1983年的BSD 4.2中。其优点是跨平台性比较好,几乎所有平台都支持它。而其缺点是:select轮询(线性扫描)所有Socket描述符,检查是否有事件发生。当描述符很多时,select的耗时也会增加。因此select能够监视的描述符数目是有上限的,上限就是FD_SETSIZE常量。此外,select每次都会在内核态和用户态之间复制整个描述符数组,而不管这里面是只有一个还是几个描述符有事件发生。
poll诞生于System V 3,它与select没有本质上区别,只不过poll没有描述符最大数量的限制。
Linux 2.6后提供了更加高效的epoll机制,被公认为目前性能最好的多路I/O就绪通知方法。它有两项本质上的改进:
kqueue是BSD系统中的事件通知机制,不在这里讨论了。”Kqueue is a scalable event notification interface introduced in FreeBSD 4.1”.
我们知道,select()和poll()函数会将准备就绪的文件描述符通知给进程,如果进程没有对其进行处理(I/O操作),那么下次调用select()和poll()时将再次报告这些文件描述符。所以select和poll机制一般不会丢失就绪的描述符消息,这种方式称为 水平触发(Level Triggered)。
epoll可以同时支持水平触发和边际触发,所谓 边际触发(Edge Triggered)是指:只告诉进程哪些文件描述符刚刚变为就绪状态,并且只说一遍,如果我们没有采取行动,那么它将不会再次告知。理论上边缘触发的性能要更高一些,但是代码实现起来要非常小心。
(待补充)
目前比较流行的事件通知库有libevent、libev、libuv:
像Nginx、Redis等高性能中间件,出于简洁、性能、以及作者的“代码洁癖”方面的考虑,没有使用libevent等第三方事件通知库,而是自己动手实现了类似的简化版。
因为没有上下文切换,所以基于I/O多路复用的并发要比基于进程的高效得多。而且因为是单线程执行的,所以调试起来也很方便,直接用GDB等工具就能调错。
I/O多路复用的确很好很高效,但它不是没有缺点:
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文地址:http://blog.csdn.net/dc_726/article/details/46942303