1、基本概念
IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用如下场合:
(1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。
(2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。
(3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。
(4)如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。
(5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。
与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。
2、select函数
该函数准许进程指示内核等待多个事件中的任何一个发送,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒。函数原型如下:
#include <sys/select.h>
#include <sys/time.h>
int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)
返回值:就绪描述符的数目,超时返回0,出错返回-1
函数参数介绍如下:
(1)第一个参数maxfdp1指定待测试的描述字个数,它的值是待测试的最大描述字加1(因此把该参数命名为maxfdp1),描述字0、1、2...maxfdp1-1均将被测试。
因为文件描述符是从0开始的。
(2)中间的三个参数readset、writeset和exceptset指定我们要让内核测试读、写和异常条件的描述字。如果对某一个的条件不感兴趣,就可以把它设为空指针。struct fd_set可以理解为一个集合,这个集合中存放的是文件描述符,可通过以下四个宏进行设置:
void FD_ZERO(fd_set *fdset); //清空集合
void FD_SET(int fd, fd_set *fdset); //将一个给定的文件描述符加入集合之中
void FD_CLR(int fd, fd_set *fdset); //将一个给定的文件描述符从集合中删除
int FD_ISSET(int fd, fd_set *fdset); // 检查集合中指定的文件描述符是否可以读写
(3)timeout告知内核等待所指定描述字中的任何一个就绪可花多少时间。其timeval结构用于指定这段时间的秒数和微秒数。
struct timeval{
long tv_sec; //seconds
long tv_usec; //microseconds
};
这个参数有三种可能:
(1)永远等待下去:仅在有一个描述字准备好I/O时才返回。为此,把该参数设置为空指针NULL。
(2)等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。
(3)根本不等待:检查描述字后立即返回,这称为轮询。为此,该参数必须指向一个timeval结构,而且其中的定时器值必须为0。
原理图:
3、测试程序
写一个TCP回射程序,程序的功能是:客户端向服务器发送信息,服务器接收并原样发送给客户端,客户端显示出接收到的信息。
服务端程序如下:
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <errno.h>
5 #include <netinet/in.h>
6 #include <sys/socket.h>
7 #include <sys/select.h>
8 #include <sys/types.h>
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #include <unistd.h>
12 #include <assert.h>
13
14 #define IPADDR "127.0.0.1"
15 #define PORT 8787
16 #define MAXLINE 1024
17 #define LISTENQ 5
18 #define SIZE 10
19
20 typedef struct server_context_st
21 {
22 int cli_cnt; /*客户端个数*/
23 int clifds[SIZE]; /*客户端的个数*/
24 fd_set allfds; /*句柄集合*/
25 int maxfd; /*句柄最大值*/
26 } server_context_st;
27 static server_context_st *s_srv_ctx = NULL;
28 /*===========================================================================
29 * ==========================================================================*/
30 static int create_server_proc(const char* ip,int port)
31 {
32 int fd;
33 struct sockaddr_in servaddr;
34 fd = socket(AF_INET, SOCK_STREAM,0);
35 if (fd == -1) {
36 fprintf(stderr, "create socket fail,erron:%d,reason:%s\n",
37 errno, strerror(errno));
38 return -1;
39 }
40
41 /*一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。*/
42 int reuse = 1;
43 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
44 return -1;
45 }
46
47 bzero(&servaddr,sizeof(servaddr));
48 servaddr.sin_family = AF_INET;
49 inet_pton(AF_INET,ip,&servaddr.sin_addr);
50 servaddr.sin_port = htons(port);
51
52 if (bind(fd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1) {
53 perror("bind error: ");
54 return -1;
55 }
56
57 listen(fd,LISTENQ);
58
59 return fd;
60 }
61
62 static int accept_client_proc(int srvfd)
63 {
64 struct sockaddr_in cliaddr;
65 socklen_t cliaddrlen;
66 cliaddrlen = sizeof(cliaddr);
67 int clifd = -1;
68
69 printf("accpet clint proc is called.\n");
70
71 ACCEPT:
72 clifd = accept(srvfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
73
74 if (clifd == -1) {
75 if (errno == EINTR) {
76 goto ACCEPT;
77 } else {
78 fprintf(stderr, "accept fail,error:%s\n", strerror(errno));
79 return -1;
80 }
81 }
82
83 fprintf(stdout, "accept a new client: %s:%d\n",
84 inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
85
86 //将新的连接描述符添加到数组中
87 int i = 0;
88 for (i = 0; i < SIZE; i++) {
89 if (s_srv_ctx->clifds[i] < 0) {
90 s_srv_ctx->clifds[i] = clifd;
91 s_srv_ctx->cli_cnt++;
92 break;
93 }
94 }
95
96 if (i == SIZE) {
97 fprintf(stderr,"too many clients.\n");
98 return -1;
99 }
100
101 }
102
103 static int handle_client_msg(int fd, char *buf)
104 {
105 assert(buf);
106 printf("recv buf is :%s\n", buf);
107 write(fd, buf, strlen(buf) +1);
108 return 0;
109 }
110
111 static void recv_client_msg(fd_set *readfds)
112 {
113 int i = 0, n = 0;
114 int clifd;
115 char buf[MAXLINE] = {0};
116 for (i = 0;i <= s_srv_ctx->cli_cnt;i++) {
117 clifd = s_srv_ctx->clifds[i];
118 if (clifd < 0) {
119 continue;
120 }
121 /*判断客户端套接字是否有数据*/
122 if (FD_ISSET(clifd, readfds)) {
123 //接收客户端发送的信息
124 n = read(clifd, buf, MAXLINE);
125 if (n <= 0) {
126 /*n==0表示读取完成,客户都关闭套接字*/
127 FD_CLR(clifd, &s_srv_ctx->allfds);
128 close(clifd);
129 s_srv_ctx->clifds[i] = -1;
130 continue;
131 }
132 handle_client_msg(clifd, buf);
133 }
134 }
135 }
136 static void handle_client_proc(int srvfd)
137 {
138 int clifd = -1;
139 int retval = 0;
140 fd_set *readfds = &s_srv_ctx->allfds;
141 struct timeval tv;
142 int i = 0;
143
144 while (1) {
145 /*每次调用select前都要重新设置文件描述符和时间,因为事件发生后,文件描述符和时间都被内核修改啦*/
146 FD_ZERO(readfds);
147 /*添加监听套接字*/
148 FD_SET(srvfd, readfds);
149 s_srv_ctx->maxfd = srvfd;
150
151 tv.tv_sec = 30;
152 tv.tv_usec = 0;
153 /*添加客户端套接字*/
154 for (i = 0; i < s_srv_ctx->cli_cnt; i++) {
155 clifd = s_srv_ctx->clifds[i];
156 /*去除无效的客户端句柄*/
157 if (clifd != -1) {
158 FD_SET(clifd, readfds);
159 }
160 s_srv_ctx->maxfd = (clifd > s_srv_ctx->maxfd ? clifd : s_srv_ctx->maxfd);
161 }
162
163 /*开始轮询接收处理服务端和客户端套接字*/
164 retval = select(s_srv_ctx->maxfd + 1, readfds, NULL, NULL, &tv);
165 if (retval == -1) {
166 fprintf(stderr, "select error:%s.\n", strerror(errno));
167 return;
168 }
169 if (retval == 0) {
170 fprintf(stdout, "select is timeout.\n");
171 continue;
172 }
173 if (FD_ISSET(srvfd, readfds)) {
174 /*监听客户端请求*/
175 accept_client_proc(srvfd);
176 } else {
177 /*接受处理客户端消息*/
178 recv_client_msg(readfds);
179 }
180 }
181 }
182
183 static void server_uninit()
184 {
185 if (s_srv_ctx) {
186 free(s_srv_ctx);
187 s_srv_ctx = NULL;
188 }
189 }
190
191 static int server_init()
192 {
193 s_srv_ctx = (server_context_st *)malloc(sizeof(server_context_st));
194 if (s_srv_ctx == NULL) {
195 return -1;
196 }
197
198 memset(s_srv_ctx, 0, sizeof(server_context_st));
199
200 int i = 0;
201 for (;i < SIZE; i++) {
202 s_srv_ctx->clifds[i] = -1;
203 }
204
205 return 0;
206 }
207
208 int main(int argc,char *argv[])
209 {
210 int srvfd;
211 /*初始化服务端context*/
212 if (server_init() < 0) {
213 return -1;
214 }
215 /*创建服务,开始监听客户端请求*/
216 srvfd = create_server_proc(IPADDR, PORT);
217 if (srvfd < 0) {
218 fprintf(stderr, "socket create or bind fail.\n");
219 goto err;
220 }
221 /*开始接收并处理客户端请求*/
222 handle_client_proc(srvfd);
223 server_uninit();
224 return 0;
225 err:
226 server_uninit();
227 return -1;
228 }
客户端程序如下:
1 #include <netinet/in.h>
2 #include <sys/socket.h>
3 #include <stdio.h>
4 #include <string.h>
5 #include <stdlib.h>
6 #include <sys/select.h>
7 #include <time.h>
8 #include <unistd.h>
9 #include <sys/types.h>
10 #include <errno.h>
11
12 #define MAXLINE 1024
13 #define IPADDRESS "127.0.0.1"
14 #define SERV_PORT 8787
15
16 #define max(a,b) (a > b) ? a : b
17
18 static void handle_recv_msg(int sockfd, char *buf)
19 {
20 printf("client recv msg is:%s\n", buf);
21 sleep(5);
22 write(sockfd, buf, strlen(buf) +1);
23 }
24
25 static void handle_connection(int sockfd)
26 {
27 char sendline[MAXLINE],recvline[MAXLINE];
28 int maxfdp,stdineof;
29 fd_set readfds;
30 int n;
31 struct timeval tv;
32 int retval = 0;
33
34 while (1) {
35
36 FD_ZERO(&readfds);
37 FD_SET(sockfd,&readfds);
38 maxfdp = sockfd;
39
40 tv.tv_sec = 5;
41 tv.tv_usec = 0;
42
43 retval = select(maxfdp+1,&readfds,NULL,NULL,&tv);
44
45 if (retval == -1) {
46 return ;
47 }
48
49 if (retval == 0) {
50 printf("client timeout.\n");
51 continue;
52 }
53
54 if (FD_ISSET(sockfd, &readfds)) {
55 n = read(sockfd,recvline,MAXLINE);
56 if (n <= 0) {
57 fprintf(stderr,"client: server is closed.\n");
58 close(sockfd);
59 FD_CLR(sockfd,&readfds);
60 return;
61 }
62
63 handle_recv_msg(sockfd, recvline);
64 }
65 }
66 }
67
68 int main(int argc,char *argv[])
69 {
70 int sockfd;
71 struct sockaddr_in servaddr;
72
73 sockfd = socket(AF_INET,SOCK_STREAM,0);
74
75 bzero(&servaddr,sizeof(servaddr));
76 servaddr.sin_family = AF_INET;
77 servaddr.sin_port = htons(SERV_PORT);
78 inet_pton(AF_INET,IPADDRESS,&servaddr.sin_addr);
79
80 int retval = 0;
81 retval = connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
82 if (retval < 0) {
83 fprintf(stderr, "connect fail,error:%s\n", strerror(errno));
84 return -1;
85 }
86
87 printf("client send to server .\n");
88 write(sockfd, "hello server", 32);
89
90 handle_connection(sockfd);
91
92 return 0;
93 }
4、程序结果
启动服务程序,执行三个个客户程序进行测试,结果如下图所示:
参考:
http://konglingchun.is-programmer.com/posts/12146.html
http://blog.163.com/smileface100@126/blog/static/27720874200951024532966/
I/O多路复用详解
要想完全理解I/O多路复用,需先要了解I/O模型:
一、五种I/O模型
1、阻塞I/O模型
最流行的I/O模型是阻塞I/O模型,缺省情形下,所有套接口都是阻塞的。我们以数据报套接口为例来讲解此模型(我们使用UDP而不是TCP作为例子的原因在于就UDP而言,数据准备好读取的概念比较简单:要么整个数据报已经收到,要么还没有。然而对于TCP来说,诸如套接口低潮标记等额外变量开始活动,导致这个概念变得复杂)。
进程调用recvfrom,其系统调用直到数据报到达且被拷贝到应用进程的缓冲区中或者发生错误才返回,期间一直在等待。我们就说进程在从调用recvfrom开始到它返回的整段时间内是被阻塞的。
2、非阻塞I/O模型
进程把一个套接口设置成非阻塞是在通知内核:当所请求的I/O操作非得把本进程投入睡眠才能完成时,不要把本进程投入睡眠,而是返回一个错误。也就是说当数据没有到达时并不等待,而是以一个错误返回。
3、I/O复用模型
调用select或poll,在这两个系统调用中的某一个上阻塞,而不是阻塞于真正I/O系统调用。 阻塞于select调用,等待数据报套接口可读。当select返回套接口可读条件时,调用recevfrom将数据报拷贝到应用缓冲区中。
4、信号驱动I/O模型
首先开启套接口信号驱动I/O功能, 并通过系统调用sigaction安装一个信号处理函数(此系统调用立即返回,进程继续工作,它是非阻塞的)。当数据报准备好被读时,就为该进程生成一个SIGIO信号。随即可以在信号处理程序中调用recvfrom来读数据报,井通知主循环数据已准备好被处理中。也可以通知主循环,让它来读数据报。
5、异步I/O模型
告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核拷贝到用户自己的缓冲区)通知我们。这种模型与信号驱动模型的主要区别是:
信号驱动I/O:由内核通知我们何时可以启动一个I/O操作,
异步I/O模型:由内核通知我们I/O操作何时完成。
二、I/O复用的典型应用场合:
1、当客户处理多个描述字(通常是交互式输入和网络套接口)时,必须使用I/O复用。
2、如果一个服务器要处理多个服务或者多个协议(例如既要处理TCP,又要处理UDP),一般就要使用I/O复用。
三、支持I/O复用的系统调用
目前支持I/O复用的系统调用有select、pselect、poll、epoll:
1、select函数
该函数允许进程指示内核等待多个事件中的任何一个发生,并仅在有一个或多个事件发生或经历一段指定的时间后才唤醒它。
格式为:
我们从该函数的最后一个参数开始介绍,它告知内核等待所指定描述字中的任何一个就绪可花多少时间。其timeval结构用于指定这段时间的秒数和微秒数。
struct timeval{
long tv_sec; //seconds
long tv_usec; //microseconds
};
这个参数有三种可能:
(1)永远等待下去:仅在有一个描述字准备好I/O时才返回。为此,我们把该参数设置为空指针。
(2)等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。
(3)根本不等待:检查描述字后立即返回,这称为轮询。为此,该参数必须指向一个timeval结构,而且其中的定时器值必须为0。
中间的三个参数readset、writeset和exceptset指定我们要让内核测试读、写和异常条件的描述字。如果我们对某一个的条件不感兴趣,就可以把它设为空指针。struct fd_set可以理解为一个集合,这个集合中存放的是文件描述符,可通过以下四个宏进行设置:
void FD_ZERO(fd_set *fdset); //清空集合
void FD_SET(int fd, fd_set *fdset); //将一个给定的文件描述符加入集合之中
void FD_CLR(int fd, fd_set *fdset); //将一个给定的文件描述符从集合中删除
int FD_ISSET(int fd, fd_set *fdset); // 检查集合中指定的文件描述符是否可以读写 ?
目前支持的异常条件只有两个:
(1)某个套接口的带外数据的到达。
(2)某个已置为分组方式的伪终端存在可从其主端读取的控制状态信息。
第一个参数maxfdp1指定待测试的描述字个数,它的值是待测试的最大描述字加1(因此我们把该参数命名为maxfdp1),描述字0、1、2...maxfdp1-1均将被测试。
一个应用select的例子:
2、pselect函数
pselect函数是由POSIX发明的,如今许多Unix变种都支持它。
#include <sys/select.h>
#include <signal.h>
#include <time.h>
int
pselect(
int
maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
const
struct
timespec *timeout,
const
sigset_t *sigmask);
返回:就绪描述字的个数,0-超时,-1-出错
pselect相对于通常的select有两个变化:
1、pselect使用timespec结构,而不使用timeval结构。timespec结构是POSIX的又一个发明。
struct timespec{
time_t tv_sec; //seconds
long tv_nsec; //nanoseconds
};
这两个结构的区别在于第二个成员:新结构的该成员tv_nsec指定纳秒数,而旧结构的该成员tv_usec指定微秒数。
2、pselect函数增加了第六个参数:一个指向信号掩码的指针。该参数允许程序先禁止递交某些信号,再测试由这些当前被禁止的信号处理函数设置的全局变量,然后调用pselect,告诉它重新设置信号掩码。
关于第二点,考虑下面的例子,这个程序的SIGINT信号处理函数仅仅设置全局变量intr_flag并返回。如果我们的进程阻塞于select调用,那么从信号处理函数的返回将导致select返回EINTR错误。然而调用select时,代码看起来大体如下:
if
( intr_flag )
handle_intr();
if
( (nready = select(...)) < 0 )
{
if
(
errno
== EINTR )
{
if
( intr_flag )
handle_intr();
}
...
}
问题是在测试intr_flag和调用select之间如果有信号发生,那么要是select永远阻塞,该信号就会丢失。有了pselect后,我们可以如下可靠地编写这个例子的代码:
sigset_t newmask, oldmask, zeromask;
sigemptyset(&zeromask);
sigemptyset(&newmask);
sigaddset(&newmask, SIGINT);
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
//block SIGINT
if
(intr_flag)
handle_intr();
if
( (nready = pselect(...,&zeromask)) < 0 )
{
if
(
errno
== EINTR)
{
if
(intr_flag)
handle_intr();
}
...
}
在测试intr_flag变量之前,我们阻塞SIGINT。当pselect被调用时,它先以空集(zeromask)取代进程的信号掩码,再检查描述字,并可能进入睡眠。然而当pselect函数返回时,进程的信号掩码又被重置为调用pselect之前的值(即SIGINT被阻塞)。
3、poll函数
poll函数起源于SVR3,最初局限于流设备。SVR4取消了这种限制,允许poll工作在任何描述字上。poll提供的功能与select类似,不过在处理流设备时,它能够提供额外的信息。
#include <poll.h>
int
poll(
struct
pollfd *fdarray, unsigned
long
nfds,
int
timeout);
返回:就绪描述字的个数,0-超时,-1-出错
第一个参数是指向一个结构数组第一个元素的指针。每个数组元素都是一个pollfd结构,用于指定测试某个给定描述字fd的条件。
struct pollfd{
int fd; //descriptor to check
short events; //events of interest on fd
short revents; //events that occurred on fd
};
要测试的条件由events成员指定,而返回的结果则在revents中存储。常用条件及含意说明如下:
常量 | 说明 |
POLLIN | 普通或优先级带数据可读 |
POLLRDNORM | 普通数据可读 |
POLLRDBAND | 优先级带数据可读 |
POLLPRI | 高优先级数据可读 |
POLLOUT | 普通数据可写 |
POLLWRNORM | 普通数据可写 |
POLLWRBAND | 优先级带数据可写 |
POLLERR | 发生错误 |
POLLHUP | 发生挂起 |
POLLNVAL | 描述字不是一个打开的文件 |
注意:后三个只能作为描述字的返回结果存储在revents中,而不能作为测试条件用于events中。
第二个参数nfds是用来指定数组fdarray的长度。
最后一个参数timeout是指定poll函数返回前等待多长时间。它的取值如下:
timeout值 | 说明 |
INFTIM | 永远等待 |
0 | 立即返回,不阻塞进程 |
>0 | 等待指定数目的毫秒数 |
一个使用poll的网络程序例子:
/**
*TCP回射服务器的服务端程序
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netdb.h>
#include <string.h>
#include <errno.h>
#include <poll.h> //for poll
#define LISTENQ 1024
#define MAXLINE 1024
#define OPEN_MAX 50000
#define SERVER_PORT 3333
#ifndef INFTIM /*按照书上解释:POSIX规范要求INFTIM在头文件<poll.h>中定义,不过*/
#define INFTIM -1 /*许多系统仍然把它定义在头文件<sys/stropts.h>中,但是经过我的测试*/
#endif /*即使都包含这两个文件,编译器也找不到,不知何解。索性自己定义了。*/
int
main(
int
argc,
char
*argv[])
{
int
i, maxi, listenfd, connfd, sockfd;
int
nready;
ssize_t n;
socklen_t clilen;
struct
sockaddr_in servaddr, cliaddr;
struct
hostent *hp;
char
buf[BUFSIZ];
struct
pollfd client[OPEN_MAX];
/*用于poll函数第一个参数的数组*/
if
( argc != 2 )
{
printf
(
"Please input %s <hostname>\n"
, argv[0]);
exit
(1);
}
//创建socket
if
( (listenfd = socket(AF_INET, SOCK_STREAM,0)) < 0 )
{
printf
(
"Create socket error!\n"
);
exit
(1);
}
//设置服务器地址结构
bzero(&servaddr,
sizeof
(servaddr));
servaddr.sin_family = AF_INET;
if
( (hp = gethostbyname(argv[1])) != NULL )
{
bcopy(hp->h_addr, (
struct
sockaddr*)&servaddr.sin_addr, hp->h_length);
}
else
if
(inet_aton(argv[1], &servaddr.sin_addr) < 0 )
{
printf
(
"Input Server IP error!\n"
);
exit
(1);
}
servaddr.sin_port = htons(SERVER_PORT);
//绑定地址
if
( bind(listenfd, (
struct
sockaddr*)&servaddr,
sizeof
(servaddr)) < 0 )
{
printf
(
"IPaddress bound failure!\n"
);
exit
(1);
}
//开始监听
listen(listenfd, LISTENQ);
client[0].fd = listenfd;
/*将数组中的第一个元素设置成监听描述字*/
client[0].events = POLLIN;
/*将测试条件设置成普通或优先级带数据可读,此处书中为POLLRDNORM,
但是怎么也编译不过去 ,编译器就是找不到,所以就临时改成了POLLIN这个条件,
希望以后能弄清楚。
*/
for
(i = 1;i < OPEN_MAX; ++i)
/*数组中的其它元素将暂时设置成不可用*/
client[i].fd = -1;
maxi = 0;
while
(1)
{
nready = poll(client, maxi+1,INFTIM);
//将进程阻塞在poll上
if
( client[0].revents & POLLIN
/*POLLRDNORM*/
)
/*先测试监听描述字*/
{
connfd = accept(listenfd,(
struct
sockaddr*)&servaddr, &clilen);
for
(i = 1; i < OPEN_MAX; ++i)
if
( client[i].fd < 0 )
{
client[i].fd = connfd;
/*将新连接加入到测试数组中*/
client[i].events = POLLIN;
//POLLRDNORM; /*测试条件普通数据可读*/
break
;
}
if
( i == OPEN_MAX )
{
printf
(
"too many clients"
);
//连接的客户端太多了,都达到最大值了
exit
(1);
}
if
( i > maxi )
maxi = i;
//maxi记录的是数组元素的个数
if
( --nready <= 0 )
continue
;
//如果没有可读的描述符了,就重新监听连接
}
for
(i = 1; i <= maxi; i++)
/*测试除监听描述字以后的其它连接描述字*/
{
if
( (sockfd = client[i].fd) < 0)
/*如果当前描述字不可用,就测试下一个*/
continue
;
if
(client[i].revents & (POLLIN
/*POLLRDNORM*/
| POLLERR))
/*如果当前描述字返回的是普通数据可读或出错条件*/
{
if
( (n = read(sockfd, buf, MAXLINE)) < 0)
//从套接口中读数据
{
if
(
errno
== ECONNRESET)
//如果连接断开,就关闭连接,并设当前描述符不可用
{
close(sockfd);
client[i].fd = -1;
}
else
perror
(
"read error"
);
}
else
if
(n == 0)
//如果数据读取完毕,关闭连接,设置当前描述符不可用
{
close(sockfd);
client[i].fd = -1;
}
else
write(sockfd, buf, n);
//打印数据
if
(--nready <= 0)
break
;
}
}
}
exit
(0);
}
4、epoll
在linux的网络编程中,很长的一段时间都在使用select来做事件触发。然而select逐渐暴露出了一些缺陷,使得linux不得不在新的内核中寻找出替代方案,那就是epoll。其实,epoll与select原理类似,只不过,epoll作出了一些重大改进,即:
a、当它们所监听的集合中有状态发生改变时,select需要循环检查整个集合,才能确定那个文件描述符状态发生改变,进而进行操作;而epoll在添加文件描述符到集合时,已经绑定了该文件描述符的对应函数,因此,当该文件描述符状态改变时,不需要循环查询整个集合,因而将复杂度由0(n)将为o(1),性能得到几何量级的提高,尤其是在大量连接的情况下。
b、再有,select所监听的描述字最大数目是有一定限制的,由FD_SETSIZE设置,通常是1024。对于那些需要支持的上万连接数目的web服务器来说显然是太少了,尽管可以通过修改头文件再重编译内核来扩大这个数目,不过资料也同时指出这样会带来网络效率的下降。(详细请参考:epoll 相对于poll的优点)
epoll的接口非常简单,一共就三个函数:
(1)、int epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。需要注意的是,当创建好epoll句柄后,epoll本身就占用一个fd值,所以用完后必须调用close()关闭,以防止fd被耗尽。
(2)、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事件,struct epoll_event结构如下:
struct epoll_event{
__uint32_t events; //epoll events
epoll_data_t data; //user data variable
};
events可以是以下几个宏的集合:
EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加
入到EPOLL队列里。
(3)、int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
令人高兴的是,2.6内核的epoll比其2.5开发版本的/dev/epoll简洁了许多,所以,大部分情况下,强大的东西往往是简单的。唯一有点麻烦是epoll有2种工作方式:LT和ET。
LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.
ET (edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。
注:以上参考epoll精髓一文。
一个epoll的例子:
编译此程序用命令:
gcc -Wall epoll-server.c -o server
运行此程序需要具有管理员权限!
sudo ./server 7838 1
通过测试这一个服务器可能同时处理10000 -3 = 9997 个连接!
如果这是一个在线服务系统,那么它可以支持9997人同时在线,比如游戏、聊天等。
参考网址:http://zhoulifa.bokee.com/6081520.html
注:本章内容摘自<Unix 网络编程>第六章。
Linux中select函数的使用
阻塞式I/O编程有两个特点:
一、如果一个发现I\O有输入,读取的过程中,另外一个也有了输入,这时候不会产生任何反应,也就是需要你的程序语句去select的时候才知道有数据输入。
二、程序去select的时候,如果没有数据输入,程序会一直等待,直到有数据位置,也就是程序中无需循环和sleep。
Select在Socket编程中还是比较重要的,可是对于初学Socket的人来说都不太爱用Select写程序,他们只是习惯写诸如connect、accept、recv或recvfrom这样的阻塞程序(所谓阻塞方式block,顾名思义,就是进程或是线程执行到这些函数时必须等待某个事件的发生,如果事件没有发生,进程或线程就被阻塞,函数不能立即返回)。可是使用Select就可以完成非阻塞(所谓非阻塞方式non-block,就是进程或线程执行此函数时不必非要等待事件的发生,一旦执行肯定返回,以返回值的不同来反映函数的执行情况,如果事件发生则与阻塞方式相同,若事件没有发生则返回一个代码来告知事件未发生,而进程或线程继续执行,所以效率较高)方式工作的程序,它能够监视我们需要监视的文件描述符的变化情况——读写或是异常。下面详细介绍一下!
Select的函数格式(我所说的是Unix系统下的伯克利socket编程,和windows下的有区别,一会儿说明):
int select(int maxfdp,fd_set *readfds,fd_set *writefds,fd_set *errorfds,struct timeval *timeout);
先说明两个结构体:
第一,struct
fd_set可以理解为一个集合,这个集合中存放的是文件描述符(file
descriptor),即文件句柄,这可以是我们所说的普通意义的文件,当然Unix下任何设备、管道、FIFO等都是文件形式,全部包括在内,所以毫无疑问一个socket就是一个文件,socket句柄就是一个文件描述符。fd_set集合可以通过一些宏由人为来操作,比如清空集合FD_ZERO(fd_set
*),将一个给定的文件描述符加入集合之中FD_SET(int ,fd_set *),将一个给定的文件描述符从集合中删除FD_CLR(int
,fd_set*),检查集合中指定的文件描述符是否可以读写FD_ISSET(int ,fd_set* )。一会儿举例说明。
第二,struct timeval是一个大家常用的结构,用来代表时间值,有两个成员,一个是秒数,另一个是毫秒数。
具体解释select的参数:
int maxfdp是一个整数值,是指集合中所有文件描述符的范围,即所有文件描述符的最大值加1,不能错!在Windows中这个参数的值无所谓,可以设置不正确。
fd_set
*readfds是指向fd_set结构的指针,这个集合中应该包括文件描述符,我们是要监视这些文件描述符的读变化的,即我们关心是否可以从这些文件中读取数据了,如果这个集合中有一个文件可读,select就会返回一个大于0的值,表示有文件可读,如果没有可读的文件,则根据timeout参数再判断是否超时,若超出timeout的时间,select返回0,若发生错误返回负值。可以传入NULL值,表示不关心任何文件的读变化。
fd_set
*writefds是指向fd_set结构的指针,这个集合中应该包括文件描述符,我们是要监视这些文件描述符的写变化的,即我们关心是否可以向这些文件中写入数据了,如果这个集合中有一个文件可写,select就会返回一个大于0的值,表示有文件可写,如果没有可写的文件,则根据timeout参数再判断是否超时,若超出timeout的时间,select返回0,若发生错误返回负值。可以传入NULL值,表示不关心任何文件的写变化。
fd_set *errorfds同上面两个参数的意图,用来监视文件错误异常。
struct timeval*
timeout是select的超时时间,这个参数至关重要,它可以使select处于三种状态,第一,若将NULL以形参传入,即不传入时间结构,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止;第二,若将时间值设为0秒0毫秒,就变成一个纯粹的非阻塞函数,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值;第三,timeout的值大于0,这就是等待的超时时间,即select在timeout时间内阻塞,超时时间之内有事件到来就返回了,否则在超时后不管怎样一定返回,返回值同上述。
返回值:
负值:select错误 正值:某些文件可读写或出错 0:等待超时,没有可读写或错误的文件
在有了select后可以写出像样的网络程序来!举个简单的例子,就是从网络上接受数据写入一个文件中。
例子:
main()
{
int sock;
FILE *fp;
struct fd_set fds;
struct timeval timeout={3,0}; //select等待3秒,3秒轮询,要非阻塞就置0
char buffer[256]={0}; //256字节的接收缓冲区
/* 假定已经建立UDP连接,具体过程不写,简单,当然TCP也同理,主机ip和port都已经给定,要写的文件已经打开
sock=socket(...);
bind(...);
fp=fopen(...); */
while(1)
{
FD_ZERO(&fds); //每次循环都要清空集合,否则不能检测描述符变化
FD_SET(sock,&fds); //添加描述符
FD_SET(fp,&fds); //同上
maxfdp=sock>fp?sock+1:fp+1; //描述符最大值加1
switch(select(maxfdp,&fds,&fds,NULL,&timeout)) //select使用
{
case -1: exit(-1);break; //select错误,退出程序
case 0:break; //再次轮询
default:
if(FD_ISSET(sock,&fds)) //测试sock是否可读,即是否网络上有数据
{
recvfrom(sock,buffer,256,.....);//接受网络数据
if(FD_ISSET(fp,&fds)) //测试文件是否可写
fwrite(fp,buffer...);//写入文件
buffer清空;
}// end if break;
}// end switch
}//end while
}//end main
select()的机制中提供一fd_set的数据结构,实际上是一long类型的数组,
每一个数组元素都能与一打开的文件句柄(不管是Socket句柄,还是其他
文件或命名管道或设备句柄)建立联系,建立联系的工作由程序员完成,
当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执
行了select()的进程哪一Socket或文件可读,下面具体解释:
#include <sys/types.h>
#include <sys/times.h>
#include <sys/select.h>
int select(nfds, readfds, writefds, exceptfds, timeout)
int nfds;
fd_set *readfds, *writefds, *exceptfds;
struct timeval *timeout;
ndfs:select监视的文件句柄数,视进程中打开的文件数而定,一般设为呢要监视各文件
中的最大文件号加一。
readfds:select监视的可读文件句柄集合。
writefds: select监视的可写文件句柄集合。
exceptfds:select监视的异常文件句柄集合。
timeout:本次select()的超时结束时间。(见/usr/sys/select.h,
可精确至百万分之一秒!)
当readfds或writefds中映象的文件可读或可写或超时,本次select()
就结束返回。程序员利用一组系统提供的宏在select()结束时便可判
断哪一文件可读或可写。对Socket编程特别有用的就是readfds。
几只相关的宏解释如下:
FD_ZERO(fd_set *fdset):清空fdset与所有文件句柄的联系。
FD_SET(int fd, fd_set *fdset):建立文件句柄fd与fdset的联系。
FD_CLR(int fd, fd_set *fdset):清除文件句柄fd与fdset的联系。
FD_ISSET(int fd, fdset *fdset):检查fdset联系的文件句柄fd是否
可读写,>0表示可读写。
(关于fd_set及相关宏的定义见/usr/include/sys/types.h)
这样,你的socket只需在有东东读的时候才读入,大致如下:
...
int sockfd;
fd_set fdR;
struct timeval timeout = ..;
...
for(;;) {
FD_ZERO(&fdR);
FD_SET(sockfd, &fdR);
switch (select(sockfd + 1, &fdR, NULL, &timeout)) {
case -1:
error handled by u;
case 0:
timeout hanled by u;
default:
if (FD_ISSET(sockfd)) {
now u read or recv something;
/* if sockfd is father and
server socket, u can now
accept() */
}
}
}
所以一个FD_ISSET(sockfd)就相当通知了sockfd可读。
至于struct timeval在此的功能,请man select。不同的timeval设置
使使select()表现出超时结束、无超时阻塞和轮询三种特性。由于
timeval可精确至百万分之一秒,所以Windows的SetTimer()根本不算
什么。你可以用select()做一个超级时钟。
FD_ACCEPT的实现?依然如上,因为客户方socket请求连接时,会发送
连接请求报文,此时select()当然会结束,FD_ISSET(sockfd)当然大
于零,因为有报文可读嘛!至于这方面的应用,主要在于服务方的父
Socket,你若不喜欢主动accept(),可改为如上机制来accept()。
至于FD_CLOSE的实现及处理,颇费了一堆cpu处理时间,未完待续。
--
讨论关于利用select()检测对方Socket关闭的问题:
仍然是本地Socket有东东可读,因为对方Socket关闭时,会发一个关闭连接
通知报文,会马上被select()检测到的。关于TCP的连接(三次握手)和关
闭(二次握手)机制,敬请参考有关TCP/IP的书籍。
不知是什么原因,UNIX好象没有提供通知进程关于Socket或Pipe对方关闭的
信号,也可能是cpu所知有限。总之,当对方关闭,一执行recv()或read(),
马上回返回-1,此时全局变量errno的值是115,相应的sys_errlist[errno]
为"Connect refused"(请参考/usr/include/sys/errno.h)。所以,在上
篇的for(;;)...select()程序块中,当有东西可读时,一定要检查recv()或
read()的返回值,返回-1时要作出关断本地Socket的处理,否则select()会
一直认为有东西读,其结果曾几令cpu伤心欲断针脚。不信你可以试试:不检
查recv()返回结果,且将收到的东东(实际没收到)写至标准输出...
在有名管道的编程中也有类似问题出现。具体处理详见拙作:发布一个有用
的Socket客户方原码。
至于主动写Socket时对方突然关闭的处理则可以简单地捕捉信号SIGPIPE并作
出相应关断本地Socket等等的处理。SIGPIPE的解释是:写入无读者方的管道。
在此不作赘述,请详man signal。
以上是cpu在作tcp/ip数据传输实验积累的经验,若有错漏,请狂炮击之。
唉,昨天在hacker区被一帮孙子轰得差点儿没短路。ren cpu(奔腾的心) z80
补充关于select在异步(非阻塞)connect中的应用,刚开始搞socket编程的时候
我一直都用阻塞式的connect,非阻塞connect的问题是由于当时搞proxy scan
而提出的呵呵
通过在网上与网友们的交流及查找相关FAQ,总算知道了怎么解决这一问题.同样
用select可以很好地解决这一问题.大致过程是这样的:
1.将打开的socket设为非阻塞的,可以用fcntl(socket, F_SETFL, O_NDELAY)完
成(有的系统用FNEDLAY也可).
2.发connect调用,这时返回-1,但是errno被设为EINPROGRESS,意即connect仍旧
在进行还没有完成.
3.将打开的socket设进被监视的可写(注意不是可读)文件集合用select进行监视,
如果可写,用
getsockopt(socket, SOL_SOCKET, SO_ERROR, &error, sizeof(int));
来得到error的值,如果为零,则connect成功.
在许多unix版本的proxyscan程序你都可以看到类似的过程,另外在solaris精华
区->编程技巧中有一个通用的带超时参数的connect模块.