标签:linux统系统开发12 socket api编程3 tcp状态转换 多路io高并发select poll epoll udp组播 线程池
【本文谢绝转载原文来自http://990487026.blog.51cto.com】
Linux统系统开发12 Socket API编程3 TCP状态转换 多路IO高并发select poll epoll udp组播 线程池 TCP 11种状态理解: 1,客户端正常发起关闭请求 2,客户端与服务端同时发起关闭请求 3,FIN_WAIT1直接转变TIME_WAIT 4,客户端接收来自服务器的关闭连接请求 多路IO转接服务器: select模型 poll模型 epoll模型 udp组播模型 线程池模型
TCP 11种状态理解:
以客户端状态图为例:
客户端主动关闭,有3种情景
1,客户端正常发起关闭请求
2,客户端与服务端同时发起关闭请求
3,FIN_WAIT1直接转变TIME_WAIT
客户端被动关闭,有1种情景
4,客户端接收来自服务器的关闭连接请求
1,客户端正常发起关闭请求
标志 状态 说明 无 CLOSED 这是客户端正常未建立连接的状态 发SYN SYN_SENT 客户端发送SYN 收ACK SYN_RECV 收到服务端应答 发ACK WSTABLISHED 连接建立完成 ..................... 数据收发 发FIN FIN_WAIT1 发出连接关闭请求 收ACK FIN_WAIT2 收到服务器应答 收FIN TIME_WAIT 收到服务器请求 发ACK TIME_WAIT 回应服务器 2MSL超时后 CLOSED 连接关闭
2,客户端与服务端同时发起关闭请求
2,客户端与服务端同时发起关闭请求 标志 状态 说明 无 CLOSED 这是客户端正常未建立连接的状态 发SYN SYN_SENT 客户端发送SYN 收ACK SYN_RECV 收到服务端应答 发ACK WSTABLISHED 连接建立完成 ..................... 数据收发 收FIN CLOSING 收到服务器关闭请求 发ACK CLOSING 客户端发出应答 收ACK TIME_WAIT 收到服务器应答 2MSL超时后 CLOSED 连接关闭
3,FIN_WAIT1直接转变TIME_WAIT
标志 状态 说明 无 CLOSED 这是客户端正常未建立连接的状态 发SYN SYN_SENT 客户端发送SYN 收ACK SYN_RECV 收到服务端应答 发ACK WSTABLISHED 连接建立完成 ..................... 数据收发 发FIN FIN_WAIT1 发出连接关闭请求 收ACK&FIN TIME_WAIT 收到服务器应答 发ACK TIME_WAIT 回应服务器 2MSL超时后 CLOSED 连接关闭
4,客户端接收来自服务器的关闭连接请求
标志 状态 说明 无 CLOSED 这是客户端正常未建立连接的状态 发SYN SYN_SENT 客户端发送SYN 收ACK SYN_RECV 收到服务端应答 发ACK WSTABLISHED 连接建立完成 ..................... 数据收发 收FIN CLOSE_WAIT 收到服务器请求 发ACK CLOSE_WAIT 回应服务器 发FIN LAST_ACK 发出关闭请求 收ACK CLOSED 连接关闭
多路IO转接服务器:
select
1.select能监听的文件描述符个数受限于FD_SETSIZE,一般为1024,单纯改变进程打开的文件描述符个数并不能改变select监听文件个数
2.解决1024以下客户端时使用select是很合适的,但如果链接客户端过多,select采用的是轮询模型,会大大降低服务器响应效率,不应在select上投入更多精力
函数原型:
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
nfds: 监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态
readfds:监控有读数据到达文件描述符集合,传入传出参数
writefds:监控写数据到达文件描述符集合,传入传出参数
exceptfds:监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数
timeout:定时阻塞监控时间,3种情况
1.NULL,永远等下去
2.设置timeval,等待固定时间
3.设置timeval里时间均为0,检查描述字后立即返回,轮询
配套API函数
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
void FD_CLR(int fd, fd_set *set); 把文件描述符集合里fd清0
int FD_ISSET(int fd, fd_set *set); 测试文件描述符集合里fd是否置1
void FD_SET(int fd, fd_set *set); 把文件描述符集合里fd位置1
void FD_ZERO(fd_set *set); 把文件描述符集合里所有位清0
select 轮询 多并发模型
1,依赖库
chunli@ubuntu:~/linux_c/select$ cat wrap.h /* wrap.h */ #ifndef __WRAP_H_ #define __WRAP_H_ void perr_exit(const char *s); int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr); void Bind(int fd, const struct sockaddr *sa, socklen_t salen); void Connect(int fd, const struct sockaddr *sa, socklen_t salen); void Listen(int fd, int backlog); int Socket(int family, int type, int protocol); ssize_t Read(int fd, void *ptr, size_t nbytes); ssize_t Write(int fd, const void *ptr, size_t nbytes); void Close(int fd); ssize_t Readn(int fd, void *vptr, size_t n); ssize_t Writen(int fd, const void *vptr, size_t n); ssize_t my_read(int fd, char *ptr); ssize_t Readline(int fd, void *vptr, size_t maxlen); #endif chunli@ubuntu:~/linux_c/select$ cat wrap.c /* wrap.c */ #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <unistd.h> #include <sys/socket.h> void perr_exit(const char *s) { perror(s); exit(1); } int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) { int n; again: if ( (n = accept(fd, sa, salenptr)) < 0) { if ((errno == ECONNABORTED) || (errno == EINTR)) goto again; else perr_exit("accept error"); } return n; } void Bind(int fd, const struct sockaddr *sa, socklen_t salen) { if (bind(fd, sa, salen) < 0) perr_exit("bind error"); } void Connect(int fd, const struct sockaddr *sa, socklen_t salen) { if (connect(fd, sa, salen) < 0) perr_exit("connect error"); } void Listen(int fd, int backlog) { if (listen(fd, backlog) < 0) perr_exit("listen error"); } int Socket(int family, int type, int protocol) { int n; if ( (n = socket(family, type, protocol)) < 0) perr_exit("socket error"); return n; } ssize_t Read(int fd, void *ptr, size_t nbytes) { ssize_t n; again: if ( (n = read(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; } ssize_t Write(int fd, const void *ptr, size_t nbytes) { ssize_t n; again: if ( (n = write(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; } void Close(int fd) { if (close(fd) == -1) perr_exit("close error"); } ssize_t Readn(int fd, void *vptr, size_t n) { size_t nleft; ssize_t nread; char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nread = read(fd, ptr, nleft)) < 0) { if (errno == EINTR) nread = 0; else return -1; } else if (nread == 0) break; nleft -= nread; ptr += nread; } return n - nleft; } ssize_t Writen(int fd, const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft)) <= 0) { if (nwritten < 0 && errno == EINTR) nwritten = 0; else return -1; } nleft -= nwritten; ptr += nwritten; } return n; } static ssize_t my_read(int fd, char *ptr) { static int read_cnt; static char *read_ptr; static char read_buf[100]; if (read_cnt <= 0) { again: if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) { if (errno == EINTR) goto again; return -1; } else if (read_cnt == 0) return 0; read_ptr = read_buf; } read_cnt--; *ptr = *read_ptr++; return 1; } ssize_t Readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr; ptr = vptr; for (n = 1; n < maxlen; n++) { if ( (rc = my_read(fd, &c)) == 1) { *ptr++ = c; if (c == ‘\n‘) break; } else if (rc == 0) { *ptr = 0; return n - 1; } else return -1; } *ptr = 0; return n; } chunli@ubuntu:~/linux_c/select$
服务端程序:
chunli@ubuntu:~/linux_c/select$ cat server.c /* server.c */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <netinet/in.h> #include <ctype.h> #include <unistd.h> #include <arpa/inet.h> #include "wrap.h" #define MAXLINE 80 #define SERV_PORT 8000 int main(void) { int i, maxi, maxfd, listenfd, connfd, sockfd; int nready, client[FD_SETSIZE]; /* FD_SETSIZE 默认为 1024 */ ssize_t n; fd_set rset, allset; char buf[MAXLINE]; char str[INET_ADDRSTRLEN]; /* #define INET_ADDRSTRLEN 16 */ socklen_t cliaddr_len; struct sockaddr_in cliaddr, servaddr; listenfd = Socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT); Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); Listen(listenfd, 20); /* 默认最大128 */ maxfd = listenfd; /* 初始化 */ maxi = -1; /* client[]的下标 */ for (i = 0; i < FD_SETSIZE; i++) { client[i] = -1; /* 用-1初始化client[] */ } FD_ZERO(&allset); FD_SET(listenfd, &allset); /* 构造select监控文件描述符集 */ while(1) { rset = allset; /* 每次循环时都从新设置select监控信号集 */ nready = select(maxfd+1, &rset, NULL, NULL, NULL); if (nready < 0) { perr_exit("select error"); } if (FD_ISSET(listenfd, &rset)) /* new client connection */ { cliaddr_len = sizeof(cliaddr); connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len); printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port)); for (i = 0; i < FD_SETSIZE; i++) { if (client[i] < 0) { client[i] = connfd; /* 保存accept返回的文件描述符到client[]里 */ break; } } /* 达到select能监控的文件个数上限 1024 */ if (i == FD_SETSIZE) { fputs("too many clients\n", stderr); exit(1); } FD_SET(connfd, &allset); /* 添加一个新的文件描述符到监控信号集里 */ if (connfd > maxfd) { maxfd = connfd; /* select第一个参数需要 */ } if (i > maxi) { maxi = i; /* 更新client[]最大下标值 */ } if (--nready == 0) { continue; /* 如果没有更多的就绪文件描述符继续回到上面select阻塞监听,负责处理未处理完的就绪文件描述符 */ } } for (i = 0; i <= maxi; i++) { /* 检测哪个clients 有数据就绪 */ if ( (sockfd = client[i]) < 0) { continue; } if (FD_ISSET(sockfd, &rset)) { if ( (n = Read(sockfd, buf, MAXLINE)) == 0) { /* 当client关闭链接时,服务器端也关闭对应链接 */ Close(sockfd); FD_CLR(sockfd, &allset); /* 解除select监控此文件描述符 */ client[i] = -1; } else { int j; for (j = 0; j < n; j++) { buf[j] = toupper(buf[j]); } Write(sockfd, buf, n); } if (--nready == 0) { break; } } } } close(listenfd); return 0; }
客户端程序:
chunli@ubuntu:~/linux_c/select$ cat client.c /* client.c */ #include <stdio.h> #include <string.h> #include <unistd.h> #include <netinet/in.h> #include <arpa/inet.h> #include "wrap.h" #define MAXLINE 80 #define SERV_PORT 8000 int main(int argc, char *argv[]) { struct sockaddr_in servaddr; char buf[MAXLINE]; int sockfd, n; sockfd = Socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr); servaddr.sin_port = htons(SERV_PORT); Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); while (fgets(buf, MAXLINE, stdin) != NULL) { Write(sockfd, buf, strlen(buf)); n = Read(sockfd, buf, MAXLINE); if (n == 0) printf("the other side has been closed.\n"); else Write(STDOUT_FILENO, buf, n); } Close(sockfd); return 0; } chunli@ubuntu:~/linux_c/select$
服务器程序编译运行: chunli@ubuntu:~/linux_c/select$ gcc -Wall -o server server.c wrap.c && ./server 局域网找几个机器跑一下: chunli@ubuntu:~/linux_c/select$ scp client chunli@11.11.11.8:~ chunli@ubuntu:~/linux_c/select$ ssh chunli@11.11.11.8 chunli@ubuntu14:~$ ./client hh HH kkkkk KKKKK 多开几个客户端: chunli@ubuntu:~/linux_c/select$ gcc -Wall -o client client.c wrap.c && ./client kk KK 服务端输出: chunli@ubuntu:~/linux_c/select$ gcc -Wall -o server server.c wrap.c && ./server received from 127.0.0.1 at PORT 52536 received from 11.11.11.8 at PORT 47032 服务器只有一个进程: chunli@ubuntu:~$ ps aux | grep server chunli 7004 0.0 0.0 4360 664 pts/8 S+ 21:21 0:00 ./server chunli@ubuntu:~$ ps ajx | grep server 4718 7004 7004 4718 pts/8 7004 S+ 1000 0:00 ./server chunli@ubuntu:~$ ps -eLf| grep server chunli 7004 4718 7004 0 1 21:21 pts/8 00:00:00 ./server pselect比select多了一个信号屏蔽字
select帮助文档
chunli@ubuntu:~/linux_c/select$ man select SYNOPSIS /* According to POSIX.1-2001, POSIX.1-2008 */ #include <sys/select.h> /* According to earlier standards */ #include <sys/time.h> #include <sys/types.h> #include <unistd.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set); #include <sys/select.h> int pselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask); Feature Test Macro Requirements for glibc (see feature_test_macros(7)): pselect(): _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
poll模型,
poll模型,建议3000以下个客户端并发请求:
相当于谁有问题谁举手,然后轮询看看是谁有问题.
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 监控的事件 */
short revents; /* 监控事件中满足条件返回的事件 */
};
POLLIN普通或带外优先数据可读,即POLLRDNORM | POLLRDBAND
POLLRDNORM-数据可读
POLLRDBAND-优先级带数据可读
POLLPRI 高优先级可读数据
POLLOUT普通或带外数据可写
POLLWRNORM-数据可写
POLLWRBAND-优先级带数据可写
POLLERR 发生错误
POLLHUP 发生挂起
POLLNVAL 描述字不是一个打开的文件
nfds 监控数组中有多少文件描述符需要被监控
timeout 毫秒级等待
-1:阻塞等,#define INFTIM -1 Linux中没有定义此宏
0:立即返回,不阻塞进程
>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值
如果不再监控某个文件描述符时,可以把pollfd中,fd设置为-1,poll不再监控此pollfd,下次返回时,把revents设置为0。
ppoll GNU定义了ppoll(非POSIX标准),可以支持设置信号屏蔽字,大家可参考poll模型自行实现C/S
int ppoll(struct pollfd *fds, nfds_t nfds,const struct timespec *timeout_ts, const sigset_t *sigmask);
poll服务端模型:
1,依赖库
chunli@ubuntu:~/linux_c/poll$ cat wrap.h /* wrap.h */ #ifndef __WRAP_H_ #define __WRAP_H_ void perr_exit(const char *s); int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr); void Bind(int fd, const struct sockaddr *sa, socklen_t salen); void Connect(int fd, const struct sockaddr *sa, socklen_t salen); void Listen(int fd, int backlog); int Socket(int family, int type, int protocol); ssize_t Read(int fd, void *ptr, size_t nbytes); ssize_t Write(int fd, const void *ptr, size_t nbytes); void Close(int fd); ssize_t Readn(int fd, void *vptr, size_t n); ssize_t Writen(int fd, const void *vptr, size_t n); ssize_t my_read(int fd, char *ptr); ssize_t Readline(int fd, void *vptr, size_t maxlen); #endif chunli@ubuntu:~/linux_c/poll$ chunli@ubuntu:~/linux_c/poll$ cat wrap.c /* wrap.c */ #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <unistd.h> #include <sys/socket.h> void perr_exit(const char *s) { perror(s); exit(1); } int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) { int n; again: if ( (n = accept(fd, sa, salenptr)) < 0) { if ((errno == ECONNABORTED) || (errno == EINTR)) goto again; else perr_exit("accept error"); } return n; } void Bind(int fd, const struct sockaddr *sa, socklen_t salen) { if (bind(fd, sa, salen) < 0) perr_exit("bind error"); } void Connect(int fd, const struct sockaddr *sa, socklen_t salen) { if (connect(fd, sa, salen) < 0) perr_exit("connect error"); } void Listen(int fd, int backlog) { if (listen(fd, backlog) < 0) perr_exit("listen error"); } int Socket(int family, int type, int protocol) { int n; if ( (n = socket(family, type, protocol)) < 0) perr_exit("socket error"); return n; } ssize_t Read(int fd, void *ptr, size_t nbytes) { ssize_t n; again: if ( (n = read(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; } ssize_t Write(int fd, const void *ptr, size_t nbytes) { ssize_t n; again: if ( (n = write(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; } void Close(int fd) { if (close(fd) == -1) perr_exit("close error"); } ssize_t Readn(int fd, void *vptr, size_t n) { size_t nleft; ssize_t nread; char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nread = read(fd, ptr, nleft)) < 0) { if (errno == EINTR) nread = 0; else return -1; } else if (nread == 0) break; nleft -= nread; ptr += nread; } return n - nleft; } ssize_t Writen(int fd, const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft)) <= 0) { if (nwritten < 0 && errno == EINTR) nwritten = 0; else return -1; } nleft -= nwritten; ptr += nwritten; } return n; } static ssize_t my_read(int fd, char *ptr) { static int read_cnt; static char *read_ptr; static char read_buf[100]; if (read_cnt <= 0) { again: if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) { if (errno == EINTR) goto again; return -1; } else if (read_cnt == 0) return 0; read_ptr = read_buf; } read_cnt--; *ptr = *read_ptr++; return 1; } ssize_t Readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr; ptr = vptr; for (n = 1; n < maxlen; n++) { if ( (rc = my_read(fd, &c)) == 1) { *ptr++ = c; if (c == ‘\n‘) break; } else if (rc == 0) { *ptr = 0; return n - 1; } else return -1; } *ptr = 0; return n; } chunli@ubuntu:~/linux_c/poll$
服务端程序:
chunli@ubuntu:~/linux_c/poll$ cat server.c /* server.c */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <netinet/in.h> #include <arpa/inet.h> #include <poll.h> #include <errno.h> #include <ctype.h> #include "wrap.h" #define MAXLINE 80 #define SERV_PORT 8000 #define OPEN_MAX 1024 int main(int argc, char *argv[]) { int i, j, maxi, listenfd, connfd, sockfd; int nready; ssize_t n; char buf[MAXLINE], str[INET_ADDRSTRLEN]; socklen_t clilen; struct pollfd client[OPEN_MAX]; struct sockaddr_in cliaddr, servaddr; listenfd = Socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT); Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); Listen(listenfd, 20); client[0].fd = listenfd; client[0].events = POLLRDNORM; /* listenfd监听普通读事件 */ for (i = 1; i < OPEN_MAX; i++) { client[i].fd = -1; /* 用-1初始化client[]里剩下元素 */ } maxi = 0; /* client[]数组有效元素中最大元素下标 */ while (1) { nready = poll(client, maxi+1, -1); /* 阻塞 */ if (client[0].revents & POLLRDNORM) /* 有客户端链接请求 */ { clilen = sizeof(cliaddr); connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &clilen); printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port)); for (i = 1; i < OPEN_MAX; i++) { if (client[i].fd < 0) { client[i].fd = connfd; /* 找到client[]中空闲的位置,存放accept返回的connfd */ break; } } if (i == OPEN_MAX) { perr_exit("too many clients"); } client[i].events = POLLRDNORM; /* 设置刚刚返回的connfd,监控读事件 */ if (i > maxi) { maxi = i; /* 更新client[]中最大元素下标 */ } if (--nready <= 0) { continue; /* 没有更多就绪事件时,继续回到poll阻塞 */ } } for (i = 1; i <= maxi; i++) { /* 检测client[] */ if ( (sockfd = client[i].fd) < 0) { continue; } if (client[i].revents & (POLLRDNORM | POLLERR)) { if ( (n = Read(sockfd, buf, MAXLINE)) < 0) { if (errno == ECONNRESET) /* 当收到 RST标志时 */ { /* connection reset by client */ printf("client[%d] aborted connection\n", i); Close(sockfd); client[i].fd = -1; } else { perr_exit("read error"); } } else if (n == 0) { /* connection closed by client */ printf("client[%d] closed connection\n", i); Close(sockfd); client[i].fd = -1; } else { for (j = 0; j < n; j++) buf[j] = toupper(buf[j]); Writen(sockfd, buf, n); } if (--nready <= 0) { break; /* no more readable descriptors */ } } } } return 0; } chunli@ubuntu:~/linux_c/poll$
客户端程序:
chunli@ubuntu:~/linux_c/poll$ cat client.c /* client.c */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <netinet/in.h> #include "wrap.h" #define MAXLINE 80 #define SERV_PORT 8000 int main(int argc, char *argv[]) { if(argc < 2 ) { printf("input ip address!\n"); exit(1); } char ipaddr[16]; //存储main的IP参数 strncpy(ipaddr,argv[1],16); struct sockaddr_in servaddr; char buf[MAXLINE]; int sockfd, n; sockfd = Socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_pton(AF_INET, ipaddr, &servaddr.sin_addr); servaddr.sin_port = htons(SERV_PORT); Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); while (fgets(buf, MAXLINE, stdin) != NULL) { Write(sockfd, buf, strlen(buf)); n = Read(sockfd, buf, MAXLINE); if (n == 0) { printf("the other side has been closed.\n"); } else { Write(STDOUT_FILENO, buf, n); } } Close(sockfd); return 0; } chunli@ubuntu:~/linux_c/poll$
编译运行: 服务端编译运行: chunli@ubuntu:~/linux_c/poll$ gcc -Wall -o server server.c wrap.c && ./server 局域网内机子运行测试: chunli@ubuntu:~/linux_c/poll$ gcc -Wall -o client client.c wrap.c && ./client 11.11.11.3 客户端1 : chunli@ubuntu:~/linux_c/poll$ gcc -Wall -o client client.c wrap.c && ./client 11.11.11.3 haha HAHA kill KILL 客户端2: chunli@ubuntu:~/linux_c/poll$ ./client 11.11.11.3 kk KK uuuuuuuu UUUUUUUU sdadasdsadsafsdgdfg SDADASDSADSAFSDGDFG 客户端3: chunli@ubuntu:~/linux_c/poll$ scp client chunli@11.11.11.8:~ chunli@ubuntu:~/linux_c/poll$ ssh chunli@11.11.11.8 chunli@ubuntu14:~$ ./client 11.11.11.3 linux LINUX hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH jajajajjajaj JAJAJAJJAJAJ 服务端输出: chunli@ubuntu:~/linux_c/poll$ gcc -Wall -o server server.c wrap.c && ./server received from 11.11.11.3 at PORT 45464 received from 11.11.11.3 at PORT 45466 received from 11.11.11.8 at PORT 42765 可以看出,没用多进程 也没用多线程 chunli@ubuntu:~$ ps ajx | grep ./server 4965 6819 6819 4965 pts/9 6819 S+ 1000 0:00 ./server chunli@ubuntu:~$ ps -eLf | grep ./server chunli 6819 4965 6819 0 1 10:52 pts/9 00:00:00 ./server 关闭所有客户端后看服务器的显示: chunli@ubuntu:~/linux_c/poll$ gcc -Wall -o server server.c wrap.c && ./server received from 11.11.11.3 at PORT 45464 received from 11.11.11.3 at PORT 45466 received from 11.11.11.8 at PORT 42765 client[3] closed connection client[2] closed connection client[1] closed connection
epoll模型
epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点
原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。目前epell是linux大规模并发网络程序中的热门首选模型。
epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
一个进程打开大数目的socket描述符
如何突破打开文件最大个数:
查看机器最多能创建多少个文件描述符: chunli@ubuntu:~$ cat /proc/sys/fs/file-max 95260 chunli@ubuntu:~$ 查看系统默认限制: chunli@ubuntu:~$ ulimit -a | grep "open files" open files (-n) 1024 如何突破打开文件最大个数: 修改文件描述符配置文件,添加两行: chunli@ubuntu:~$ cat /etc/security/limits.conf * soft nofile 65536 * hard nofile 100000 # End of file chunli@ubuntu:~$ sudo reboot chunli@ubuntu:~$ sudo -s root@ubuntu:~# ulimit -n 100000 root@ubuntu:~# ulimit -a | grep "open files" open files (-n) 100000 调整内存看看: chunli@ubuntu:~$ free -m total used free shared buff/cache available Mem: 4866 176 4416 9 272 4427 Swap: 1021 0 1021 chunli@ubuntu:~$ cat /proc/sys/fs/file-max 492067
epoll API
1.创建一个epoll句柄,参数size用来告诉内核监听的文件描述符个数,跟内存大小有关
int epoll_create(int size)
size:告诉内核监听的数目
----------------------------------------------------------
2.控制某个epoll监控的文件描述符上的事件:注册、修改、删除。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epfd:为epoll_creat的句柄
op:表示动作,用3个宏来表示:
EPOLL_CTL_ADD(注册新的fd到epfd),
EPOLL_CTL_MOD(修改已经注册的fd的监听事件),
EPOLL_CTL_DEL(从epfd删除一个fd);
event:告诉内核需要监听的事件
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
EPOLLOUT:表示对应的文件描述符可以写
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
EPOLLERR:表示对应的文件描述符发生错误
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
----------------------------------------------------------------------
3.等待所监控文件描述符上有事件的产生,类似于select()调用。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
events:用来从内核得到事件的集合,
maxevents:告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,
timeout:是超时时间
-1:阻塞
0:立即返回,非阻塞
>0:指定微秒
返回值:成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1
epoll服务器模型
1,依赖库
chunli@ubuntu:~/linux_c/epoll$ cat wrap.h /* wrap.h */ #ifndef __WRAP_H_ #define __WRAP_H_ void perr_exit(const char *s); int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr); void Bind(int fd, const struct sockaddr *sa, socklen_t salen); void Connect(int fd, const struct sockaddr *sa, socklen_t salen); void Listen(int fd, int backlog); int Socket(int family, int type, int protocol); ssize_t Read(int fd, void *ptr, size_t nbytes); ssize_t Write(int fd, const void *ptr, size_t nbytes); void Close(int fd); ssize_t Readn(int fd, void *vptr, size_t n); ssize_t Writen(int fd, const void *vptr, size_t n); ssize_t my_read(int fd, char *ptr); ssize_t Readline(int fd, void *vptr, size_t maxlen); #endif chunli@ubuntu:~/linux_c/epoll$ cat wrap.c /* wrap.c */ #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <unistd.h> #include <sys/socket.h> void perr_exit(const char *s) { perror(s); exit(1); } int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) { int n; again: if ( (n = accept(fd, sa, salenptr)) < 0) { if ((errno == ECONNABORTED) || (errno == EINTR)) goto again; else perr_exit("accept error"); } return n; } void Bind(int fd, const struct sockaddr *sa, socklen_t salen) { if (bind(fd, sa, salen) < 0) perr_exit("bind error"); } void Connect(int fd, const struct sockaddr *sa, socklen_t salen) { if (connect(fd, sa, salen) < 0) perr_exit("connect error"); } void Listen(int fd, int backlog) { if (listen(fd, backlog) < 0) perr_exit("listen error"); } int Socket(int family, int type, int protocol) { int n; if ( (n = socket(family, type, protocol)) < 0) perr_exit("socket error"); return n; } ssize_t Read(int fd, void *ptr, size_t nbytes) { ssize_t n; again: if ( (n = read(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; } ssize_t Write(int fd, const void *ptr, size_t nbytes) { ssize_t n; again: if ( (n = write(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; } void Close(int fd) { if (close(fd) == -1) perr_exit("close error"); } ssize_t Readn(int fd, void *vptr, size_t n) { size_t nleft; ssize_t nread; char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nread = read(fd, ptr, nleft)) < 0) { if (errno == EINTR) nread = 0; else return -1; } else if (nread == 0) break; nleft -= nread; ptr += nread; } return n - nleft; } ssize_t Writen(int fd, const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft)) <= 0) { if (nwritten < 0 && errno == EINTR) nwritten = 0; else return -1; } nleft -= nwritten; ptr += nwritten; } return n; } static ssize_t my_read(int fd, char *ptr) { static int read_cnt; static char *read_ptr; static char read_buf[100]; if (read_cnt <= 0) { again: if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) { if (errno == EINTR) goto again; return -1; } else if (read_cnt == 0) return 0; read_ptr = read_buf; } read_cnt--; *ptr = *read_ptr++; return 1; } ssize_t Readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr; ptr = vptr; for (n = 1; n < maxlen; n++) { if ( (rc = my_read(fd, &c)) == 1) { *ptr++ = c; if (c == ‘\n‘) break; } else if (rc == 0) { *ptr = 0; return n - 1; } else return -1; } *ptr = 0; return n; } chunli@ubuntu:~/linux_c/epoll$
服务端程序:
chunli@ubuntu:~/linux_c/epoll$ cat server.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <ctype.h> #include <unistd.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/epoll.h> #include <errno.h> #include "wrap.h" #define MAXLINE 80 #define SERV_PORT 8000 #define OPEN_MAX 1024 int main(void) { int i, j, maxi, listenfd, connfd, sockfd; int nready, efd, res; ssize_t n; char buf[MAXLINE], str[INET_ADDRSTRLEN]; socklen_t clilen; int client[OPEN_MAX]; struct sockaddr_in cliaddr, servaddr; struct epoll_event tep, ep[OPEN_MAX]; listenfd = Socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT); Bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)); Listen(listenfd, 20); for (i = 0; i < OPEN_MAX; i++) { client[i] = -1; } maxi = -1; efd = epoll_create(OPEN_MAX); if (efd == -1) { perr_exit("epoll_create"); } tep.events = EPOLLIN; tep.data.fd = listenfd; res = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &tep); if (res == -1) { perr_exit("epoll_ctl"); } while (1) { nready = epoll_wait(efd, ep, OPEN_MAX, -1); /* 阻塞监听 */ if (nready == -1) { perr_exit("epoll_wait"); } for (i = 0; i < nready; i++) { if (!(ep[i].events & EPOLLIN)) { continue; } if (ep[i].data.fd == listenfd) { clilen = sizeof(cliaddr); connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &clilen); printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port)); for (j = 0; j < OPEN_MAX; j++) { if (client[j] < 0) { client[j] = connfd; /* save descriptor */ break; } } if (j == OPEN_MAX) { perr_exit("too many clients"); } if (j > maxi) { maxi = j; /* max index in client[] array */ } tep.events = EPOLLIN; tep.data.fd = connfd; res = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &tep); if (res == -1) { perr_exit("epoll_ctl"); } } else { sockfd = ep[i].data.fd; n = Read(sockfd, buf, MAXLINE); if (n == 0) { for (j = 0; j <= maxi; j++) { if (client[j] == sockfd) { client[j] = -1; break; } } res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL); if (res == -1) { perr_exit("epoll_ctl"); } Close(sockfd); printf("client[%d] closed connection\n", j); } else { for (j = 0; j < n; j++) { buf[j] = toupper(buf[j]); } Writen(sockfd, buf, n); } } } } close(listenfd); close(efd); return 0; } chunli@ubuntu:~/linux_c/epoll$
客户端程序:
chunli@ubuntu:~/linux_c/epoll$ cat client.c /* client.c */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <netinet/in.h> #include <arpa/inet.h> #include "wrap.h" #define MAXLINE 80 #define SERV_PORT 8000 int main(int argc, char *argv[]) { if(argc < 2) { printf("input ip address!\n"); exit(1); } struct sockaddr_in servaddr; char buf[MAXLINE]; int sockfd, n; sockfd = Socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr); servaddr.sin_port = htons(SERV_PORT); Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); while (fgets(buf, MAXLINE, stdin) != NULL) { Write(sockfd, buf, strlen(buf)); n = Read(sockfd, buf, MAXLINE); if (n == 0) { printf("the other side has been closed.\n"); } else { Write(STDOUT_FILENO, buf, n); } } Close(sockfd); return 0; } chunli@ubuntu:~/linux_c/epoll$
服务器程序编译执行: chunli@ubuntu:~/linux_c/epoll$ gcc -Wall -o server wrap.c server.c && ./server 客户端编译执行: chunli@ubuntu:~/linux_c/epoll$ gcc -Wall -o client client.c wrap.c && ./client 11.11.11.3 dfhssjdsfhocad DFHSSJDSFHOCAD qqqqq QQQQQ 开启客户端2: chunli@ubuntu14:~$ ./client 11.11.11.3 fsdfs FSDFS dasd DASD 开启客户端3: chunli@ubuntu14:~$ ./client input ip address! chunli@ubuntu14:~$ ./client 11.11.11.3 sdfkjnsdf SDFKJNSDF 服务端输出: chunli@ubuntu:~/linux_c/epoll$ gcc -Wall -o server wrap.c server.c && ./server received from 127.0.0.1 at PORT 46726 received from 11.11.11.8 at PORT 42766 received from 11.11.11.8 at PORT 42767 服务端仅仅是一个单线程: chunli@ubuntu:~/linux_c/epoll$ ps aux | grep ./server chunli 5163 0.0 0.0 4360 652 pts/9 S+ 13:57 0:00 ./server chunli@ubuntu:~/linux_c/epoll$ ps -eLf | grep ./server chunli 5163 4673 5163 0 1 13:57 pts/9 00:00:00 ./server 查看连接状态:2个局域网机器,1个localhost chunli@ubuntu:~/linux_c/epoll$ netstat -ant | grep 8000 tcp 0 0 0.0.0.0:8000 0.0.0.0:* LISTEN tcp 0 0 127.0.0.1:8000 127.0.0.1:46730 ESTABLISHED tcp 0 0 127.0.0.1:46730 127.0.0.1:8000 ESTABLISHED tcp 0 0 11.11.11.3:8000 11.11.11.8:42766 ESTABLISHED tcp 0 0 11.11.11.3:8000 11.11.11.8:42768 ESTABLISHED
udp组播模型:
组播组可以是永久的也可以是临时的。组播组地址中,有一部分由官方分配的,称为永久组播组。永久组播组保持不变的是它的ip地址,组中的成员构成可以发生变化。永久组播组中成员的数量都可以是任意的,甚至可以为零。那些没有保留下来供永久组播组使用的ip组播地址,可以被临时组播组利用。
224.0.0.0~224.0.0.255为预留的组播地址(永久组地址),地址224.0.0.0保留不做分配,其它地址供路由协议使用;
224.0.1.0~224.0.1.255是公用组播地址,可以用于Internet;
224.0.2.0~238.255.255.255为用户可用的组播地址(临时组地址),全网范围内有效;
239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效。
服务端程序:
chunli@ubuntu:~/linux_c/udp_group$ cat udp_group_server.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <netinet/in.h> #include <sys/socket.h> #include <sys/types.h> #include <net/if.h> #include <arpa/inet.h> #include <sys/types.h> #include <sys/socket.h> #include <unistd.h> #include <ctype.h> #define SERVER_PORT 8000 #define CLIENT_PORT 9000 #define GROUP "239.0.0.2" int main(void) { int sockfd; int i = 0; struct sockaddr_in serveraddr; struct sockaddr_in clientaddr; char buf[1024]; char ipstr[INET_ADDRSTRLEN]; //ipV4字符串最大16个字符 //char ipstr[INET6_ADDRSTRLEN]; //ipV6字符串 socklen_t socketlen = 0; ssize_t len = 0; struct ip_mreqn group; sockfd = socket(AF_INET,SOCK_DGRAM,0); //构建用于UDP通信的套接字 bzero(&serveraddr,sizeof(serveraddr)); //清零结构体数据 serveraddr.sin_family = AF_INET; //IPV4 serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);//允许来自本地任意IP的请求,INADDR_ANY 就是0 serveraddr.sin_port = htons(SERVER_PORT); bind(sockfd,(struct sockaddr *)&serveraddr,sizeof(serveraddr)); inet_pton(AF_INET,GROUP,&group.imr_multiaddr);//设置组播地址 inet_pton(AF_INET,"0.0.0.0",&group.imr_address);//设置本机任意IP group.imr_ifindex = if_nametoindex("ens32"); //网卡设备的编号 ifconfig -a setsockopt(sockfd,IPPROTO_IP,IP_MULTICAST_IF,&group,sizeof(group)); bzero(&clientaddr,sizeof(clientaddr)); clientaddr.sin_family = AF_INET; inet_pton(AF_INET,GROUP,&clientaddr.sin_addr.s_addr); clientaddr.sin_port = htons(CLIENT_PORT); while(1) { fgets(buf,sizeof(buf),stdin); sendto(sockfd,buf,strlen(buf),0,(struct sockaddr*)&(clientaddr),sizeof(clientaddr)); } close(sockfd); return 0; } chunli@ubuntu:~/linux_c/udp_group$
客户端程序:
chunli@ubuntu:~/linux_c/udp_group$ cat udp_group_client.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <netinet/in.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/stat.h> #include <fcntl.h> #include <net/if.h> #include <arpa/inet.h> #define CLIENT_PORT 9000 #define GROUP "239.0.0.2" int main(int argc,char **argv) { struct sockaddr_in serveraddr; struct sockaddr_in localaddr; int confd; ssize_t len; char buf[128]; struct ip_mreqn group; //组播结构体 confd = socket(AF_INET,SOCK_DGRAM,0);//创建socket文件描述符 bzero(&localaddr,sizeof(localaddr));//初始化结构体 localaddr.sin_family = AF_INET; inet_pton(AF_INET,"0.0.0.0",&localaddr.sin_addr.s_addr); localaddr.sin_port =htons(CLIENT_PORT); bind(confd,(struct sockaddr *)&localaddr,sizeof(localaddr)); inet_pton(AF_INET,GROUP,&group.imr_multiaddr);//设置组地址 inet_pton(AF_INET,"0.0.0.0",&group.imr_address);//本地任意IP group.imr_ifindex = if_nametoindex("ens32"); setsockopt(confd,IPPROTO_IP,IP_ADD_MEMBERSHIP,&group,sizeof(group)); while(1) { len = recvfrom(confd,buf,sizeof(buf),0,NULL,0); write(STDOUT_FILENO,buf,len); } close(confd); return 0; } chunli@ubuntu:~/linux_c/udp_group$
服务器编译运行: chunli@ubuntu:~/linux_c/udp_group$ gcc -o server udp_group_server.c &&./server kkk kdskfvskd dsfds 客户端1 : chunli@ubuntu:~/linux_c/udp_group$ gcc -o client udp_group_client.c && ./client kkk kdskfvskd dsfds 客户端2: chunli@ubuntu14:~$ ./client kkk kdskfvskd dsfds
本地socket unix domain socket
参考:http://blog.csdn.net/bingqingsuimeng/article/details/8470029
服务器端的步骤如下:
1. socket: 建立一个socket
2. bind: 将这个socket绑定在某个文件上(AF_UNIX)或某个端口上(AF_INET),我们会分别介绍这两种。
3. listen: 开始监听
4. accept: 如果监听到客户端连接,则调用accept接收这个连接并同时新建一个socket来和客户进行通信
5. read/write:读取或发送数据到客户端
6. close: 通信完成后关闭socket
客户端的步骤如下:
1. socket: 建立一个socket
2. connect: 主动连接服务器端的某个文件(AF_UNIX)或某个端口(AF_INET)
3. read/write:如果服务器同意连接(accept),则读取或发送数据到服务器端
4. close: 通信完成后关闭socket
例子实现的功能是客户端发送一个字符到服务器,服务器将这个字符+1后送回客户端,客户端再把它打印出来
====================================================
服务端程序:
chunli@ubuntu:~/linux_c/unix_socket_2$ cat server.c #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> int main() { /* delete the socket file */ unlink("server_socket"); /* create a socket */ int server_sockfd = socket(AF_UNIX, SOCK_STREAM, 0); struct sockaddr_un server_addr; server_addr.sun_family = AF_UNIX; strcpy(server_addr.sun_path, "server_socket"); /* bind with the local file */ bind(server_sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)); /* listen */ listen(server_sockfd, 5); char ch; int client_sockfd; struct sockaddr_un client_addr; socklen_t len = sizeof(client_addr); while(1) { printf("server waiting:\n"); /* accept a connection */ client_sockfd = accept(server_sockfd, (struct sockaddr *)&client_addr, &len); /* exchange data */ read(client_sockfd, &ch, 1); printf("get char from client: %c\n", ch); ++ch; write(client_sockfd, &ch, 1); /* close the socket */ close(client_sockfd); } return 0; } chunli@ubuntu:~/linux_c/unix_socket_2$
客户端程序:
chunli@ubuntu:~/linux_c/unix_socket_2$ cat client.c #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> int main() { /* create a socket */ int sockfd = socket(AF_UNIX, SOCK_STREAM, 0); struct sockaddr_un address; address.sun_family = AF_UNIX; strcpy(address.sun_path, "server_socket"); /* connect to the server */ int result = connect(sockfd, (struct sockaddr *)&address, sizeof(address)); if(result == -1) { perror("connect failed: "); exit(1); } /* exchange data */ char ch = ‘A‘; write(sockfd, &ch, 1); read(sockfd, &ch, 1); printf("get char from server: %c\n", ch); /* close the socket */ close(sockfd); return 0; } chunli@ubuntu:~/linux_c/unix_socket_2$
1,服务端编译执行: chunli@ubuntu:~/linux_c/unix_socket_2$ gcc -Wall -o server server.c && ./server server waiting: get char from client: A server waiting: 2,客户端编译执行: chunli@ubuntu:~/linux_c/unix_socket_2$ gcc -Wall -o client client.c && ./client get char from server: B
线程池模型:
threadpool库下载:
https://codeload.github.com/mbrossard/threadpool/zip/master
threadpool库源代码:
chunli@ubuntu:~/linux_c/thread_pool$ cat threadpool.h /* * Copyright (c) 2016, Mathias Brossard <mathias@brossard.org>. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef _THREADPOOL_H_ #define _THREADPOOL_H_ #ifdef __cplusplus extern "C" { #endif /** * @file threadpool.h * @brief Threadpool Header File */ /** * Increase this constants at your own risk * Large values might slow down your system */ #define MAX_THREADS 64 #define MAX_QUEUE 65536 typedef struct threadpool_t threadpool_t; typedef enum { threadpool_invalid = -1, threadpool_lock_failure = -2, threadpool_queue_full = -3, threadpool_shutdown = -4, threadpool_thread_failure = -5 } threadpool_error_t; typedef enum { threadpool_graceful = 1 } threadpool_destroy_flags_t; /** * @function threadpool_create * @brief Creates a threadpool_t object. * @param thread_count Number of worker threads. * @param queue_size Size of the queue. * @param flags Unused parameter. * @return a newly created thread pool or NULL */ threadpool_t *threadpool_create(int thread_count, int queue_size, int flags); /** * @function threadpool_add * @brief add a new task in the queue of a thread pool * @param pool Thread pool to which add the task. * @param function Pointer to the function that will perform the task. * @param argument Argument to be passed to the function. * @param flags Unused parameter. * @return 0 if all goes well, negative values in case of error (@see * threadpool_error_t for codes). */ int threadpool_add(threadpool_t *pool, void (*routine)(void *), void *arg, int flags); /** * @function threadpool_destroy * @brief Stops and destroys a thread pool. * @param pool Thread pool to destroy. * @param flags Flags for shutdown * * Known values for flags are 0 (default) and threadpool_graceful in * which case the thread pool doesn‘t accept any new tasks but * processes all pending tasks before shutdown. */ int threadpool_destroy(threadpool_t *pool, int flags); #ifdef __cplusplus } #endif #endif /* _THREADPOOL_H_ */
chunli@ubuntu:~/linux_c/thread_pool$ cat threadpool.c /* * Copyright (c) 2016, Mathias Brossard <mathias@brossard.org>. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /** * @file threadpool.c * @brief Threadpool implementation file */ #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include "threadpool.h" typedef enum { immediate_shutdown = 1, graceful_shutdown = 2 } threadpool_shutdown_t; /** * @struct threadpool_task * @brief the work struct * * @var function Pointer to the function that will perform the task. * @var argument Argument to be passed to the function. */ typedef struct { void (*function)(void *); void *argument; } threadpool_task_t; /** * @struct threadpool * @brief The threadpool struct * * @var notify Condition variable to notify worker threads. * @var threads Array containing worker threads ID. * @var thread_count Number of threads * @var queue Array containing the task queue. * @var queue_size Size of the task queue. * @var head Index of the first element. * @var tail Index of the next element. * @var count Number of pending tasks * @var shutdown Flag indicating if the pool is shutting down * @var started Number of started threads */ struct threadpool_t { pthread_mutex_t lock; pthread_cond_t notify; pthread_t *threads; threadpool_task_t *queue; int thread_count; int queue_size; int head; int tail; int count; int shutdown; int started; }; /** * @function void *threadpool_thread(void *threadpool) * @brief the worker thread * @param threadpool the pool which own the thread */ static void *threadpool_thread(void *threadpool); int threadpool_free(threadpool_t *pool); threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) { if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) { return NULL; } threadpool_t *pool; int i; (void) flags; if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { goto err; } /* Initialize */ pool->thread_count = 0; pool->queue_size = queue_size; pool->head = pool->tail = pool->count = 0; pool->shutdown = pool->started = 0; /* Allocate thread and task queue */ pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count); pool->queue = (threadpool_task_t *)malloc (sizeof(threadpool_task_t) * queue_size); /* Initialize mutex and conditional variable first */ if((pthread_mutex_init(&(pool->lock), NULL) != 0) || (pthread_cond_init(&(pool->notify), NULL) != 0) || (pool->threads == NULL) || (pool->queue == NULL)) { goto err; } /* Start worker threads */ for(i = 0; i < thread_count; i++) { if(pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool) != 0) { threadpool_destroy(pool, 0); return NULL; } pool->thread_count++; pool->started++; } return pool; err: if(pool) { threadpool_free(pool); } return NULL; } int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument, int flags) { int err = 0; int next; (void) flags; if(pool == NULL || function == NULL) { return threadpool_invalid; } if(pthread_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } next = (pool->tail + 1) % pool->queue_size; do { /* Are we full ? */ if(pool->count == pool->queue_size) { err = threadpool_queue_full; break; } /* Are we shutting down ? */ if(pool->shutdown) { err = threadpool_shutdown; break; } /* Add task to queue */ pool->queue[pool->tail].function = function; pool->queue[pool->tail].argument = argument; pool->tail = next; pool->count += 1; /* pthread_cond_broadcast */ if(pthread_cond_signal(&(pool->notify)) != 0) { err = threadpool_lock_failure; break; } } while(0); if(pthread_mutex_unlock(&pool->lock) != 0) { err = threadpool_lock_failure; } return err; } int threadpool_destroy(threadpool_t *pool, int flags) { int i, err = 0; if(pool == NULL) { return threadpool_invalid; } if(pthread_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } do { /* Already shutting down */ if(pool->shutdown) { err = threadpool_shutdown; break; } pool->shutdown = (flags & threadpool_graceful) ? graceful_shutdown : immediate_shutdown; /* Wake up all worker threads */ if((pthread_cond_broadcast(&(pool->notify)) != 0) || (pthread_mutex_unlock(&(pool->lock)) != 0)) { err = threadpool_lock_failure; break; } /* Join all worker thread */ for(i = 0; i < pool->thread_count; i++) { if(pthread_join(pool->threads[i], NULL) != 0) { err = threadpool_thread_failure; } } } while(0); /* Only if everything went well do we deallocate the pool */ if(!err) { threadpool_free(pool); } return err; } int threadpool_free(threadpool_t *pool) { if(pool == NULL || pool->started > 0) { return -1; } /* Did we manage to allocate ? */ if(pool->threads) { free(pool->threads); free(pool->queue); /* Because we allocate pool->threads after initializing the mutex and condition variable, we‘re sure they‘re initialized. Let‘s lock the mutex just in case. */ pthread_mutex_lock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_cond_destroy(&(pool->notify)); } free(pool); return 0; } static void *threadpool_thread(void *threadpool) { threadpool_t *pool = (threadpool_t *)threadpool; threadpool_task_t task; for(;;) { /* Lock must be taken to wait on conditional variable */ pthread_mutex_lock(&(pool->lock)); /* Wait on condition variable, check for spurious wakeups. When returning from pthread_cond_wait(), we own the lock. */ while((pool->count == 0) && (!pool->shutdown)) { pthread_cond_wait(&(pool->notify), &(pool->lock)); } if((pool->shutdown == immediate_shutdown) || ((pool->shutdown == graceful_shutdown) && (pool->count == 0))) { break; } /* Grab our task */ task.function = pool->queue[pool->head].function; task.argument = pool->queue[pool->head].argument; pool->head = (pool->head + 1) % pool->queue_size; pool->count -= 1; /* Unlock */ pthread_mutex_unlock(&(pool->lock)); /* Get to work */ (*(task.function))(task.argument); } pool->started--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); return(NULL); } chunli@ubuntu:~/linux_c/thread_pool$
测试程序:
chunli@ubuntu:~/linux_c/thread_pool$ cat main.c #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include "threadpool.h" void *process(void *arg) { printf("thread %x workoing on task %d\n",(unsigned int )pthread_self(),*(int *)arg); sleep(1); printf("task %d is end \n",*(int*)arg); return NULL; } int main(void) { threadpool_t *thp =threadpool_create(1,100,12); printf("pool inited\n"); int *num = (int *)malloc(sizeof(int*) * 20); int i ; for(i = 0;i<10;i++) { num[i] = i; printf("add task %d\n",i); threadpool_add(thp,(void*)process,(void*)&num[i],1); } sleep(10); threadpool_destroy(thp,1); return 0; } chunli@ubuntu:~/linux_c/thread_pool$
chunli@ubuntu:~/linux_c/thread_pool$ gcc main.c threadpool.c -lpthread && ./a.out pool inited add task 0 add task 1 add task 2 add task 3 add task 4 add task 5 add task 6 add task 7 add task 8 add task 9 thread 9b9a7700 workoing on task 0 task 0 is end thread 9b9a7700 workoing on task 1 task 1 is end thread 9b9a7700 workoing on task 2 task 2 is end thread 9b9a7700 workoing on task 3 task 3 is end thread 9b9a7700 workoing on task 4 task 4 is end thread 9b9a7700 workoing on task 5 task 5 is end thread 9b9a7700 workoing on task 6 task 6 is end thread 9b9a7700 workoing on task 7 task 7 is end thread 9b9a7700 workoing on task 8 task 8 is end thread 9b9a7700 workoing on task 9 task 9 is end chunli@ubuntu:~/linux_c/thread_pool$
本文出自 “魂斗罗” 博客,谢绝转载!
Linux统系统开发12 Socket API编程3 TCP状态转换 多路IO高并发select poll epoll udp组播 线程池
标签:linux统系统开发12 socket api编程3 tcp状态转换 多路io高并发select poll epoll udp组播 线程池
原文地址:http://990487026.blog.51cto.com/10133282/1841695