标签:
http://blog.csdn.net/u011011917/article/details/17203539
传统的、教科书里的I/O复用等待函数select/poll在处理数以万计的客户端连接时,往往出现效率低下甚至完全瘫痪,,这被称为C10K 问题。
本文尝试着以一个最简单的单线程epoll程序为基础,轻松应对收发数据不频繁的过万客户端并发连接。并以此回顾C10K问题,介绍应对C10K问题的本质方法,线程模式的选择,介绍服务器编程流行的Reactor模式。 顺带介绍怎么应对socket句柄耗尽。
以下是一段使用Epoll的代码。用它可以轻松应对不繁忙的过万客户端连接。
为了让它可以工作,需要修改 sudovi /etc/security/limits.conf 增加如下行:
* soft nofile 65535
* hard nofile 65535
- int main(int argc,char** argv)
- {
-
- socket(AF_INET,SOCK_STREAM,0);
- bind( listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr))
- listen(listenfd,5);
-
- make_socket_non_blocking(listenfd)
-
-
- epollfd = epoll_create1(EPOLL_CLOEXEC);
- event.data.fd = listenfd;
- event.events = EPOLLIN | EPOLLET;
- val = epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&event
-
- idleFd = open("/dev/null", O_RDONLY | O_CLOEXEC);
- for(;;)
- {
-
- int nfds = epoll_wait(epollfd,events,MAXEVENTS,-1);
-
- for(i = 0; i < nfds; i++)
- {
- if((events[i].events & EPOLLERR)
- ||(events[i].events & EPOLLHUP)
- ||(!(events[i].events & EPOLLIN))
- )
- {
- close(events[i].data.fd);
- continue;
- }
- else if(listenfd == events[i].data.fd)
- {
- while(1)
- {
- infd = accept(events[i].data.fd,(struct sockaddr*)&in_addr,&in_len);
- if(infd < 0)
- {
- switch (errno)
- {
- case EAGAIN:
- case ECONNABORTED:
- case EINTR:
- case EPROTO:
- case EPERM:
- {
-
- }
- break;
- case EMFILE:
-
- close(idleFd);
- idleFd= accept(events[i].data.fd,(struct sockaddr*)&in_addr,&in_len);
- close(idleFd);
- idleFd = open("/dev/null",O_RDONLY | O_CLOEXEC);
- break;
- default:
-
- break;
- }
- }
-
- val = make_socket_non_blocking(infd);
- event.data.fd = infd;
- event.events = EPOLLIN|EPOLLET;
- val = epoll_ctl(epollfd,EPOLL_CTL_ADD,infd,&event);
-
- }
- continue;
- }
- else if(events[i].events & EPOLLIN)
- {
-
-
- }
- else if(events[i].events & EPOLLOUT)
- {
- }
- }
- }
- }
这就是epoll的程序框架。很重要的一点是,它将编程的思维转变过来。由主动accept连接,主动调用send/receive改为当相应句柄数据准备好时,由操作系统通知你来处理事件,处理数据。这个就是Reactor模式的基本要点
网上有很多关于epoll的例子,因此我不打算深入展开解释。简单提一下ET模式和LT模式,
ET模式被称为高速模式,是当句柄可用时epoll只通知你一次,因此在代码里我们需要用循环抱住所有的事件处理,一直处理到数据为空为止。否则会收不到接下来的事件。在这个模式下,意味着如果你有大数据需要接收,或者大数据需要发送,不可避免地会对本线程的其它连接造成影响。它的好处是通过减少epoll相关的系统调用(epoll_wait,epoll_ctl)来增加效率。坏处是编程复杂,容易出错。
LT模式是epoll的缺省处理模式,只要句柄就绪了,epoll就会不停地通知你。一直到你处理完毕为止。用它编程简单,可以均衡线程中的各个连接处理。但是用它关注EPOLLOUT时会有点小麻烦。每当socket可写时,不管你有没有数据要发送,它都会不停地通知你。于是腾讯有这么一道面试题:epoll水平触发模式(Level-Triggered);当socket可写时,会不停的触发socket可写的事件,如何处理?解决办法是只有数据要发送时才关注EPOLLOUT,一旦数据发送完毕,立刻调用epoll_ctl停止观察。
ET模式和LT模式谁的效率更高则有待时间检验,目前缺乏相应的基准测试数据。
回头来再来看看C10K问题。没错,我们已经解决它了。C10K的原文在这里,英文不好的人可以看看翻译版的。C10K最大的特点是提升机器性能并不能相应提升程序的吞吐量。
应对C10K主要有两方面的策略:
1) 应用程序以何种方式和操作系统合作,获取I/O事件并调度多个Socket上的I/O操作?
2) 应用程序以何种方式处理任务和线程/进程的关系?
我们先来看策略1:
传统的select/poll函数返回后需要对传入的句柄列表做一次扫描来确定引发事件的相应句柄,其单个任务消耗的资源和当前连接的关系是O(n),CPU占用率和并发数近似成O(n2)。
Epoll则用一个数组只返回就绪的句柄。单个任务和连接的关系是O(1)。在有大量空闲的情况下无疑效率要高出一大截来。
我们再来看策略2:
本质问题时你要create几个线程,每个线程用来干什么?废话不多说,我只谈我自己的理解:再多CPU多核的环境下,效率最高的线程数量是和CPU个数*核数相等。这样可以避开操作系统线程调度的开销。
简单的处理方式是,一个线程负责accept,其它线程负责send/receive。线程的总数固定。好处是编程简单,计算任务可以直接在send/receive线程中完成,当出现某个计算任务过大时不会把系统跑死。一般用这个方案就可以了。
复杂方案是按千兆比特每秒来配置send/receive线程,也就是一个千兆以太网卡一个线程,其它的放入线程池,用来计算用。
再复杂的线程配置方案则可以开一个专题来讲,半同步半异步模式、领导追随者模式等。
附上自己随手写的的单线程epoll程序,是上面代码的完整版。ET模式下收到数据后返回8K内容。
- #include <stdlib.h>
- #include <sys/epoll.h>
- #include <stdio.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <fcntl.h>
- #include <unistd.h>
- #include <string.h>
- #include <errno.h>
- #include <string.h>
- #define MAXEVENTS 64
- #define PORT 2981
- int make_socket_non_blocking (int sfd)
- {
- int flags, s;
-
- flags = fcntl (sfd, F_GETFL, 0);
- if (flags == -1)
- {
- perror ("fcntl");
- return -1;
- }
-
- flags |= O_NONBLOCK;
- s = fcntl (sfd, F_SETFL, flags);
- if (s == -1)
- {
- perror ("fcntl");
- return -1;
- }
-
- return 0;
- }
- struct StruEventBuf
- {
- int fd;
- char *pszData;
- int nSize;
- int nStart;
- };
- int main(int argc,char** argv)
- {
- int listenfd = 0;
- struct sockaddr_in servaddr;
- int epollfd = 0;
- int val = 0;
- struct epoll_event event;
- struct epoll_event events[MAXEVENTS];
- int i = 0;
- int j = 0;
- int idleFd;
- int nPort = 0;
- int flag;
-
- int nSendBuf=10;
-
- socklen_t optlen;
- idleFd = open("/dev/null", O_RDONLY | O_CLOEXEC);
-
- listenfd = socket(AF_INET,SOCK_STREAM,0);
- if( listenfd < 0)
- {
- printf("socket error\n");
- return 1;
- }
-
- bzero(events,sizeof(events));
- bzero(&servaddr,sizeof(servaddr));
- servaddr.sin_family = AF_INET;
- servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
- servaddr.sin_port = htons(PORT);
-
- flag = 1;
- optlen = sizeof(flag);
- setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&flag,optlen);
- if( bind( listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) < 0)
- {
- printf("bind error:(%d) %s\n", errno, strerror(errno));
- }
- val = listen(listenfd,1024);
-
- if(-1 == val )
- {
- printf("listen error,errno = %d, %s\n",errno, strerror(errno));
- return 1;
- }
-
- val = fcntl(listenfd,F_GETFL,0);
- fcntl(listenfd,F_SETFL, val | O_NONBLOCK);
-
- val = fcntl(STDIN_FILENO,F_GETFL,0);
- fcntl(STDIN_FILENO,F_SETFL, val | O_NONBLOCK);
-
- val = fcntl(STDOUT_FILENO,F_GETFL,0);
- fcntl(STDOUT_FILENO,F_SETFL, val | O_NONBLOCK);
-
-
- epollfd = epoll_create1(EPOLL_CLOEXEC);
- printf("epollfd = %d\n",epollfd);
-
- struct StruEventBuf *pStruEventBufListen = (struct StruEventBuf*)malloc(sizeof(struct StruEventBuf));
- bzero(pStruEventBufListen,sizeof(struct StruEventBuf));
- pStruEventBufListen->fd = listenfd;
- event.data.ptr = pStruEventBufListen;
- event.events = EPOLLIN | EPOLLET;
- val = epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&event);
- if(-1 == val)
- {
- printf("epoll_ctl error\n");
- }
- for(;;)
- {
- int nfds = epoll_wait(epollfd,events,MAXEVENTS,-1);
- if(-1 == nfds)
- {
- printf("epoll_pwat error\n");
- return 1;
- }
- for(i = 0; i < nfds; i++)
- {
- struct StruEventBuf*pStruEventBuf = (struct StruEventBuf*) events[i].data.ptr;
- int fd = pStruEventBuf->fd;
- if((events[i].events & EPOLLERR)
- ||(events[i].events & EPOLLHUP)
- ||(!(events[i].events & (EPOLLIN | EPOLLOUT)))
- )
- {
- printf("epoll error fd = %d, events = %0x, errno = %d, %s\n"
- , fd,events[i].events,errno
- ,strerror(errno));
- close(fd);
- continue;
- }
- else if(listenfd == fd)
- {
- while(1)
- {
- struct sockaddr_in in_addr;
- socklen_t in_len;
- int infd;
-
- #define NI_MAXHOST 100
- #define NI_MAXSERV 100
- char hbuf[NI_MAXHOST],sbuf[NI_MAXSERV];
-
- in_len = sizeof(in_addr);
- infd = accept(fd,(struct sockaddr*)&in_addr,&in_len);
- if(infd < 0)
- {
- switch (errno)
- {
- case EAGAIN:
- case ECONNABORTED:
- case EINTR:
- case EPROTO:
- case EPERM:
- {
-
- }
- break;
- case EMFILE:
-
- close(idleFd);
-
- idleFd= accept(fd,(struct sockaddr*)&in_addr,&in_len);
- inet_ntop(AF_INET,&in_addr.sin_addr,hbuf,sizeof(hbuf));
- nPort = ntohs(in_addr.sin_port);
- printf("Max connection ,will close connection from %s, port %d errno = %d %s\n",hbuf,nPort,errno, strerror(errno));
-
- close(idleFd);
-
- idleFd = open("/dev/null",O_RDONLY | O_CLOEXEC);
- break;
- case EBADF:
- case EFAULT:
- case EINVAL:
- case ENFILE:
- case ENOBUFS:
- case ENOMEM:
- case ENOTSOCK:
- case EOPNOTSUPP:
- {
- printf("accept unexpected error %d,%s\n", errno,strerror(errno));
- }
- break;
- default:
- {
- printf("accept unkonw error %d,%s\n", errno,strerror(errno));
- } break;
- }
-
- if((EAGAIN == errno)
- ||(EWOULDBLOCK == errno))
- {
-
- break;
- }
- else
- {
- printf("accept error %d,%s\n", errno,strerror(errno));
- break;
- }
- }
- inet_ntop(AF_INET,&in_addr.sin_addr,hbuf,sizeof(hbuf));
- nPort = ntohs(in_addr.sin_port);
-
- printf("connection from %s, port %d \n",hbuf,nPort);
-
-
- val = make_socket_non_blocking(infd);
- if(-1 == val)
- {
- printf("make socket_non_bolcking error\n");
- }
-
- printf("accepted, fd = %d\n",infd);
-
- nSendBuf = 10;
- setsockopt(infd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int));
- struct StruEventBuf *pStruEventBuf = (struct StruEventBuf*)malloc(sizeof(struct StruEventBuf));
- bzero(pStruEventBuf,sizeof(struct StruEventBuf));
- pStruEventBuf->fd = infd;
-
-
- pStruEventBuf->nStart = 0;
- event.data.ptr = pStruEventBuf;
- event.events = EPOLLIN|EPOLLOUT|EPOLLET;
- val = epoll_ctl(epollfd,EPOLL_CTL_ADD,infd,&event);
- if(val == -1)
- {
- printf("epoll_ctrl error\n");
- }
- }
- continue;
- }
- else if(events[i].events & EPOLLIN)
- {
-
-
-
-
-
-
- int done = 0;
- struct StruEventBuf* pStruEventBuf = (struct StruEventBuf*) (events[i].data.ptr);
- while(1)
- {
- ssize_t count;
- char buf[512];
- int fd = pStruEventBuf->fd;
- count = read(fd, buf, sizeof(buf));
- if(-1 == count)
- {
-
-
-
- if(errno != EAGAIN)
- {
- printf("read error\n");
- done = 1;
- }
- break;
- }
- else if(count == 0)
- {
-
- printf("Remote has close the socket\n");
- done = 1;
- break;
- }
- buf[count] = 0;
- printf("receive %s\n", buf);
-
- pStruEventBuf->nSize = 8 * 1024 + 1;
- pStruEventBuf->pszData = (char*)malloc(pStruEventBuf->nSize);
- char *p = pStruEventBuf->pszData;
- for(i = 0; i < 8; i++)
- {
- for(j = 0; j < 1023; j ++)
- {
- *p++ = ‘0‘ + i;
- }
- *p++ = ‘\n‘;
-
- }
-
- if( p >= pStruEventBuf->pszData + pStruEventBuf->nSize )
- {
- printf("ERRORRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR!\n");
- }
- val = send(fd,pStruEventBuf->pszData, pStruEventBuf->nSize - pStruEventBuf->nStart,0);
- if(val < 0)
- {
- if((-1 == val) && (errno != EAGAIN))
- {
- printf("write error\n");
- done = 1;
- }
- }
- else
- {
- pStruEventBuf->nStart += val;
- }
-
- }
- if(done)
- {
- printf("closed connection on descriptor %d \n",
- fd);
-
-
-
- free(pStruEventBuf);
- close(fd);
- }
- }
- else if(events[i].events & EPOLLOUT)
- {
-
- while( pStruEventBuf->nStart < pStruEventBuf->nSize)
- {
- int nLen = pStruEventBuf->nSize - pStruEventBuf->nStart;
- val = send(fd,pStruEventBuf->pszData + pStruEventBuf->nStart,nLen,0);
-
- if(val < nLen)
- {
- if(val < 0)
- {
- if((-1 == val) && (errno != EAGAIN))
- {
- printf("write error\n");
- ;
- }
- }
- else
- {
- pStruEventBuf->nStart += val;
- printf("Send data\n");
- }
- break;
- }
- else
- {
- char *p = pStruEventBuf->pszData;
- free(p);
- pStruEventBuf->pszData = NULL;
- pStruEventBuf->nSize = 0;
- pStruEventBuf->nStart= 0;
- }
- }
- }
- }
- }
- close(epollfd);
- return 0;
- }
轻松应对C10k问题
标签:
原文地址:http://www.cnblogs.com/jukan/p/5228311.html