标签:
所谓IO多路复用,就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
Linux支持IO多路复用的系统调用有select、poll、epoll,这些调用都是内核级别的。但select、poll、epoll本质上都是同步I/O,先是block住等待就绪的socket,再是block住将数据从内核拷贝到用户内存。
当然select、poll、epoll之间也是有区别的,如下表:
\ | select | poll | epoll |
---|---|---|---|
操作方式 | 遍历 | 遍历 | 回调 |
底层实现 | 数组 | 链表 | 哈希表 |
IO效率 | 每次调用都进行线性遍历,时间复杂度为O(n) | 每次调用都进行线性遍历,时间复杂度为O(n) | 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到rdllist里面。时间复杂度O(1) |
最大连接数 | 1024(x86)或 2048(x64) | 无上限 | 无上限 |
fd拷贝 | 每次调用select,都需要把fd集合从用户态拷贝到内核态 | 每次调用poll,都需要把fd集合从用户态拷贝到内核态 | 调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝 |
注:摘自IBM iSeries 信息中心。
#include <sys/select.h>
#include <sys/time.h>
int select(int max_fd, fd_set *readset, fd_set *writeset, fd_set *exceptset, struct timeval *timeout)
该select()函数返回就绪描述符的数目,超时返回0,出错返回-1
第一个参数max_fd指待测试的fd个数,它的值是待测试的最大文件描述符加1,文件描述符从0开始到max_fd-1都将被测试。
中间三个参数readset、writeset和exceptset指定要让内核测试读、写和异常条件的fd集合,如果不需要测试可以设置为NULL。操作fd_set有四个宏:
timeout是指 select 的等待时长,如果这段时间内所监听的 socket 没有事件就绪,超时返回。
这里写一个程序,Client向Server发送消息,Server接收消息并原样发送给Client,Client再把消息输出到终端。
/*************************************************************************
> File Name: server.cpp
> Author: SongLee
> E-mail: lisong.shine@qq.com
> Created Time: 2016年04月28日 星期四 22时02分43秒
> Personal Blog: http://songlee24.github.io/
************************************************************************/
#include<netinet/in.h> // sockaddr_in
#include<sys/types.h> // socket
#include<sys/socket.h> // socket
#include<arpa/inet.h>
#include<unistd.h>
#include<sys/select.h> // select
#include<sys/ioctl.h>
#include<sys/time.h>
#include<iostream>
#include<vector>
#include<string>
#include<cstdlib>
#include<cstdio>
#include<cstring>
using namespace std;
#define BUFFER_SIZE 1024
struct PACKET_HEAD
{
int length;
};
class Server
{
private:
struct sockaddr_in server_addr;
socklen_t server_addr_len;
int listen_fd; // 监听的fd
int max_fd; // 最大的fd
fd_set master_set; // 所有fd集合,包括监听fd和客户端fd
fd_set working_set; // 工作集合
struct timeval timeout;
public:
Server(int port);
~Server();
void Bind();
void Listen(int queue_len = 20);
void Accept();
void Run();
void Recv(int nums);
};
Server::Server(int port)
{
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htons(INADDR_ANY);
server_addr.sin_port = htons(port);
// create socket to listen
listen_fd = socket(PF_INET, SOCK_STREAM, 0);
if(listen_fd < 0)
{
cout << "Create Socket Failed!";
exit(1);
}
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
}
Server::~Server()
{
for(int fd=0; fd<=max_fd; ++fd)
{
if(FD_ISSET(fd, &master_set))
{
close(fd);
}
}
}
void Server::Bind()
{
if(-1 == (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr))))
{
cout << "Server Bind Failed!";
exit(1);
}
cout << "Bind Successfully.\n";
}
void Server::Listen(int queue_len)
{
if(-1 == listen(listen_fd, queue_len))
{
cout << "Server Listen Failed!";
exit(1);
}
cout << "Listen Successfully.\n";
}
void Server::Accept()
{
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int new_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len);
if(new_fd < 0)
{
cout << "Server Accept Failed!";
exit(1);
}
cout << "new connection was accepted.\n";
// 将新建立的连接的fd加入master_set
FD_SET(new_fd, &master_set);
if(new_fd > max_fd)
{
max_fd = new_fd;
}
}
void Server::Run()
{
max_fd = listen_fd; // 初始化max_fd
FD_ZERO(&master_set);
FD_SET(listen_fd, &master_set); // 添加监听fd
while(1)
{
FD_ZERO(&working_set);
memcpy(&working_set, &master_set, sizeof(master_set));
timeout.tv_sec = 30;
timeout.tv_usec = 0;
int nums = select(max_fd+1, &working_set, NULL, NULL, &timeout);
if(nums < 0)
{
cout << "select() error!";
exit(1);
}
if(nums == 0)
{
//cout << "select() is timeout!";
continue;
}
if(FD_ISSET(listen_fd, &working_set))
Accept(); // 有新的客户端请求
else
Recv(nums); // 接收客户端的消息
}
}
void Server::Recv(int nums)
{
for(int fd=0; fd<=max_fd; ++fd)
{
if(FD_ISSET(fd, &working_set))
{
bool close_conn = false; // 标记当前连接是否断开了
PACKET_HEAD head;
recv(fd, &head, sizeof(head), 0); // 先接受包头,即数据总长度
char* buffer = new char[head.length];
bzero(buffer, head.length);
int total = 0;
while(total < head.length)
{
int len = recv(fd, buffer + total, head.length - total, 0);
if(len < 0)
{
cout << "recv() error!";
close_conn = true;
break;
}
total = total + len;
}
if(total == head.length) // 将收到的消息原样发回给客户端
{
int ret1 = send(fd, &head, sizeof(head), 0);
int ret2 = send(fd, buffer, head.length, 0);
if(ret1 < 0 || ret2 < 0)
{
cout << "send() error!";
close_conn = true;
}
}
delete buffer;
if(close_conn) // 当前这个连接有问题,关闭它
{
close(fd);
FD_CLR(fd, &master_set);
if(fd == max_fd) // 需要更新max_fd;
{
while(FD_ISSET(max_fd, &master_set) == false)
--max_fd;
}
}
}
}
}
int main()
{
Server server(15000);
server.Bind();
server.Listen();
server.Run();
return 0;
}
/*************************************************************************
> File Name: client.cpp
> Author: SongLee
> E-mail: lisong.shine@qq.com
> Created Time: 2016年04月28日 星期四 23时10分15秒
> Personal Blog: http://songlee24.github.io/
************************************************************************/
#include<netinet/in.h> // sockaddr_in
#include<sys/types.h> // socket
#include<sys/socket.h> // socket
#include<arpa/inet.h>
#include<sys/ioctl.h>
#include<unistd.h>
#include<iostream>
#include<string>
#include<cstdlib>
#include<cstdio>
#include<cstring>
using namespace std;
#define BUFFER_SIZE 1024
struct PACKET_HEAD
{
int length;
};
class Client
{
private:
struct sockaddr_in server_addr;
socklen_t server_addr_len;
int fd;
public:
Client(string ip, int port);
~Client();
void Connect();
void Send(string str);
string Recv();
};
Client::Client(string ip, int port)
{
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
if(inet_pton(AF_INET, ip.c_str(), &server_addr.sin_addr) == 0)
{
cout << "Server IP Address Error!";
exit(1);
}
server_addr.sin_port = htons(port);
server_addr_len = sizeof(server_addr);
// create socket
fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd < 0)
{
cout << "Create Socket Failed!";
exit(1);
}
}
Client::~Client()
{
close(fd);
}
void Client::Connect()
{
cout << "Connecting......" << endl;
if(connect(fd, (struct sockaddr*)&server_addr, server_addr_len) < 0)
{
cout << "Can not Connect to Server IP!";
exit(1);
}
cout << "Connect to Server successfully." << endl;
}
void Client::Send(string str)
{
PACKET_HEAD head;
head.length = str.size()+1; // 注意这里需要+1
int ret1 = send(fd, &head, sizeof(head), 0);
int ret2 = send(fd, str.c_str(), head.length, 0);
if(ret1 < 0 || ret2 < 0)
{
cout << "Send Message Failed!";
exit(1);
}
}
string Client::Recv()
{
PACKET_HEAD head;
recv(fd, &head, sizeof(head), 0);
char* buffer = new char[head.length];
bzero(buffer, head.length);
int total = 0;
while(total < head.length)
{
int len = recv(fd, buffer + total, head.length - total, 0);
if(len < 0)
{
cout << "recv() error!";
break;
}
total = total + len;
}
string result(buffer);
delete buffer;
return result;
}
int main()
{
Client client("127.0.0.1", 15000);
client.Connect();
while(1)
{
string msg;
getline(cin, msg);
if(msg == "exit")
break;
client.Send(msg);
cout << client.Recv() << endl;
}
return 0;
}
对上述程序的一些说明:
poll
本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。
它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一个缺点:
从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。
原型:
#include <poll.h>
int poll(struct pollfd fds[], nfds_t nfds, int timeout);
参数描述:
struct pollfd
类型的数组,用于存放需要检测其状态的socket描述符,并且调用poll函数之后fds数组不会被清空;其中pollfd结构体定义如下:
typedef struct pollfd {
int fd; /* 需要被检测或选择的文件描述符*/
short events; /* 对文件描述符fd上感兴趣的事件 */
short revents; /* 文件描述符fd上当前实际发生的事件*/
} pollfd_t;
一个pollfd结构体表示一个被监视的文件描述符,通过传递fds[]
指示 poll() 监视多个文件描述符,其中:
events
域是监视该文件描述符的事件掩码,由用户来设置这个域。revents
域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。events域中请求的任何事件都可能在revents域中返回。合法的事件如下:
常量 | 说明 |
---|---|
POLLIN | 普通或优先级带数据可读 |
POLLRDNORM | 普通数据可读 |
POLLRDBAND | 优先级带数据可读 |
POLLPRI | 高优先级数据可读 |
POLLOUT | 普通数据可写 |
POLLWRNORM | 普通数据可写 |
POLLWRBAND | 优先级带数据可写 |
POLLERR | 发生错误 |
POLLHUP | 发生挂起 |
POLLNVAL | 描述字不是一个打开的文件 |
当需要监听多个事件时,使用POLLIN | POLLRDNORM
设置 events 域;当poll调用之后检测某事件是否就绪时,fds[i].revents & POLLIN
进行判断。
这里写一个程序,Client向Server发送消息,Server接收消息并原样发送给Client,Client再把消息输出到终端。
#include<netinet/in.h> // sockaddr_in
#include<sys/types.h> // socket
#include<sys/socket.h> // socket
#include<arpa/inet.h>
#include<unistd.h>
#include<poll.h> // poll
#include<sys/ioctl.h>
#include<sys/time.h>
#include<iostream>
#include<vector>
#include<string>
#include<cstdlib>
#include<cstdio>
#include<cstring>
using namespace std;
#define BUFFER_SIZE 1024
#define MAX_FD 1000
struct PACKET_HEAD
{
int length;
};
class Server
{
private:
struct sockaddr_in server_addr;
socklen_t server_addr_len;
int listen_fd; // 监听的fd
struct pollfd fds[MAX_FD]; // fd数组,大小为1000
int nfds;
public:
Server(int port);
~Server();
void Bind();
void Listen(int queue_len = 20);
void Accept();
void Run();
void Recv();
};
Server::Server(int port)
{
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htons(INADDR_ANY);
server_addr.sin_port = htons(port);
// create socket to listen
listen_fd = socket(PF_INET, SOCK_STREAM, 0);
if(listen_fd < 0)
{
cout << "Create Socket Failed!";
exit(1);
}
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
}
Server::~Server()
{
for(int i=0; i<MAX_FD; ++i)
{
if(fds[i].fd >=0)
{
close(fds[i].fd);
}
}
}
void Server::Bind()
{
if(-1 == (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr))))
{
cout << "Server Bind Failed!";
exit(1);
}
cout << "Bind Successfully.\n";
}
void Server::Listen(int queue_len)
{
if(-1 == listen(listen_fd, queue_len))
{
cout << "Server Listen Failed!";
exit(1);
}
cout << "Listen Successfully.\n";
}
void Server::Accept()
{
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int new_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len);
if(new_fd < 0)
{
cout << "Server Accept Failed!";
exit(1);
}
cout << "new connection was accepted.\n";
// 将新建立的连接的fd加入fds[]
int i;
for(i=1; i<MAX_FD; ++i)
{
if(fds[i].fd < 0)
{
fds[i].fd = new_fd;
break;
}
}
// 超过最大连接数
if(i == MAX_FD)
{
cout << "Too many clients.\n";
exit(1);
}
fds[i].events = POLLIN; // 设置新描述符的读事件
nfds = i > nfds ? i : nfds; // 更新连接数
}
void Server::Run()
{
fds[0].fd = listen_fd; // 添加监听描述符
fds[0].events = POLLIN;
nfds = 0;
for(int i=1; i<MAX_FD; ++i)
fds[i].fd = -1;
while(1)
{
int nums = poll(fds, nfds+1, -1);
if(nums < 0)
{
cout << "poll() error!";
exit(1);
}
if(nums == 0)
{
continue;
}
if(fds[0].revents & POLLIN)
Accept(); // 有新的客户端请求
else
Recv();
}
}
void Server::Recv()
{
for(int i=1; i<MAX_FD; ++i)
{
if(fds[i].fd < 0)
continue;
if(fds[i].revents & POLLIN) // 读就绪
{
int fd = fds[i].fd;
bool close_conn = false; // 标记当前连接是否断开了
PACKET_HEAD head;
recv(fd, &head, sizeof(head), 0); // 先接受包头,即数据总长度
char* buffer = new char[head.length];
bzero(buffer, head.length);
int total = 0;
while(total < head.length)
{
int len = recv(fd, buffer + total, head.length - total, 0);
if(len < 0)
{
cout << "recv() error!";
close_conn = true;
break;
}
total = total + len;
}
if(total == head.length) // 将收到的消息原样发回给客户端
{
int ret1 = send(fd, &head, sizeof(head), 0);
int ret2 = send(fd, buffer, head.length, 0);
if(ret1 < 0 || ret2 < 0)
{
cout << "send() error!";
close_conn = true;
}
}
delete buffer;
if(close_conn) // 当前这个连接有问题,关闭它
{
close(fd);
fds[i].fd = -1;
}
}
}
}
int main()
{
Server server(15000);
server.Bind();
server.Listen();
server.Run();
return 0;
}
客户端程序同上。
epoll是在2.6内核中提出的,相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
epoll使用“事件”的就绪通知方式,通过epoll_ctl
注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait
便可以收到通知。epoll的优点在于:
epoll对文件描述符的操作有两种模式:LT(level trigger,水平触发)和ET(edge trigger,边缘触发)。二者的区别如下:
水平触发:默认工作模式,即当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序可以不立即处理该事件;下次调用epoll_wait时,会再次通知此事件。
边缘触发:当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次通知此事件。(直到你做了某些操作导致该描述符变成未就绪状态了,也就是说边缘触发只在状态由未就绪变为就绪时通知一次)
边缘触发(ET模式)在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞socket,以避免由于一个文件描述符的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
epoll操作过程涉及三个函数:
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
1) epoll_create
函数用来创建一个epoll句柄,参数size用来告诉内核要监听的数目一共有多少个。
2)epoll_ctl
函数用于注册要监听的事件类型,它有四个参数:
EPOLL_CTL_ADD
(注册新的fd到epfd中)EPOLL_CTL_MOD
(修改已注册的fd的监听事件)EPOLL_CTL_DEL
(从epfd中删除一个fd)其中struct epoll_event
结构体定义如下:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
域 events 可以是以下几个宏的集合:
3)epoll_wait
函数等待事件的就绪,成功时返回就绪的事件数目,调用失败时返回 -1,等待超时返回 0。它也有四个参数:
这里写一个程序,Client向Server发送消息,Server接收消息并原样发送给Client,Client再把消息输出到终端。
#include<netinet/in.h> // sockaddr_in
#include<sys/types.h> // socket
#include<sys/socket.h> // socket
#include<arpa/inet.h>
#include<unistd.h>
#include<sys/epoll.h> // epoll
#include<sys/ioctl.h>
#include<sys/time.h>
#include<iostream>
#include<vector>
#include<string>
#include<cstdlib>
#include<cstdio>
#include<cstring>
using namespace std;
#define BUFFER_SIZE 1024
#define SIZE 1000
#define EPOLLSIZE 100
struct PACKET_HEAD
{
int length;
};
class Server
{
private:
struct sockaddr_in server_addr;
socklen_t server_addr_len;
int listen_fd; // 监听的fd
int epfd; // epoll fd
struct epoll_event events[EPOLLSIZE]; // epoll_wait返回的就绪事件
public:
Server(int port);
~Server();
void Bind();
void Listen(int queue_len = 20);
void Accept();
void Run();
void Recv(int fd);
};
Server::Server(int port)
{
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htons(INADDR_ANY);
server_addr.sin_port = htons(port);
// create socket to listen
listen_fd = socket(PF_INET, SOCK_STREAM, 0);
if(listen_fd < 0)
{
cout << "Create Socket Failed!";
exit(1);
}
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
}
Server::~Server()
{
close(epfd);
}
void Server::Bind()
{
if(-1 == (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr))))
{
cout << "Server Bind Failed!";
exit(1);
}
cout << "Bind Successfully.\n";
}
void Server::Listen(int queue_len)
{
if(-1 == listen(listen_fd, queue_len))
{
cout << "Server Listen Failed!";
exit(1);
}
cout << "Listen Successfully.\n";
}
void Server::Accept()
{
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int new_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len);
if(new_fd < 0)
{
cout << "Server Accept Failed!";
exit(1);
}
cout << "new connection was accepted.\n";
// 在epfd中注册新建立的连接
struct epoll_event event;
event.data.fd = new_fd;
event.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &event);
}
void Server::Run()
{
epfd = epoll_create(SIZE); // 创建epoll句柄
struct epoll_event event;
event.data.fd = listen_fd;
event.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &event); // 注册listen_fd
while(1)
{
int nums = epoll_wait(epfd, events, EPOLLSIZE, -1);
if(nums < 0)
{
cout << "poll() error!";
exit(1);
}
if(nums == 0)
{
continue;
}
for(int i=0; i<nums; ++i) // 遍历所有就绪事件
{
int fd = events[i].data.fd;
if((fd == listen_fd) && (events[i].events & EPOLLIN))
Accept(); // 有新的客户端请求
else if(events[i].events & EPOLLIN)
Recv(fd); // 读数据
else
;
}
}
}
void Server::Recv(int fd)
{
bool close_conn = false; // 标记当前连接是否断开了
PACKET_HEAD head;
recv(fd, &head, sizeof(head), 0); // 先接受包头,即数据总长度
char* buffer = new char[head.length];
bzero(buffer, head.length);
int total = 0;
while(total < head.length)
{
int len = recv(fd, buffer + total, head.length - total, 0);
if(len < 0)
{
cout << "recv() error!";
close_conn = true;
break;
}
total = total + len;
}
if(total == head.length) // 将收到的消息原样发回给客户端
{
int ret1 = send(fd, &head, sizeof(head), 0);
int ret2 = send(fd, buffer, head.length, 0);
if(ret1 < 0 || ret2 < 0)
{
cout << "send() error!";
close_conn = true;
}
}
delete buffer;
if(close_conn) // 当前这个连接有问题,关闭它
{
close(fd);
struct epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &event); // Delete一个fd
}
}
int main()
{
Server server(15000);
server.Bind();
server.Listen();
server.Run();
return 0;
}
客户端程序同上
标签:
原文地址:http://blog.csdn.net/lisonglisonglisong/article/details/51328062