标签:
做Linux网络开发,一般绕不开标题中几种网络编程模型。网上已有很多写的不错的分析文章,它们的基本论点是差不多的。但是我觉得他们讲的还不够详细,在一些关键论点上缺乏数据支持。所以我决定好好研究这几个模型。(转载请指明出于breaksoftware的csdn博客)
在研究这些模型前,我决定按如下步骤去做:
err = init_print_thread(); if (err < 0) { perror("create print thread error"); exit(EXIT_FAILURE); }init_print_thread函数将被各个模型使用,wait_print_thread是用于等待该打印结果的线程退出。由于我并不准备让这个线程退出,所以wait_print_thread往往用来阻塞主线程。
pthread_t g_print_thread; int init_print_thread() { return pthread_create(&g_print_thread, NULL, print_count, NULL); } void wait_print_thread() { pthread_join(g_print_thread, NULL); }print_count函数是用于线程执行的实体,它每隔一秒钟打印一条记录
static int g_request_count = 0; static int g_server_suc = 0; static int g_client_suc = 0; static int g_read_suc = 0; static int g_write_suc = 0; static int g_server_fai = 0; static int g_client_fai = 0; static int g_read_fai = 0; static int g_write_fai = 0; void* print_count(void* arg) { struct timeval cur_time; int index = 0; fprintf(stderr, "index\tseconds_micro_seconds\tac\tst\tsr\tsw\tft\tfr\tfw\n"); while (1) { sleep(1); gettimeofday(&cur_time, NULL); fprintf(stderr, "%d\t%ld\t%d\t%d\t%d\t%d\t%d\t%d\t%d\n", index, cur_time.tv_sec * 1000000 + cur_time.tv_usec, g_request_count, g_server_suc > g_client_suc ? g_server_suc : g_client_suc, g_read_suc, g_write_suc, g_server_fai > g_client_fai ? g_server_fai : g_client_fai, g_read_fai, g_write_fai); index++; } }上述各数据的定义如下:
listen_sock = make_socket(0);我们对make_socket传入了参数0,是因为我们不要求创建的监听Socket具有异步属性。
int make_socket(int asyn) { int listen_sock = -1; int rc = -1; int on = 1; struct sockaddr_in name; listen_sock = socket(AF_INET, SOCK_STREAM, 0); if (listen_sock < 0) { perror("create socket error"); exit(EXIT_FAILURE); } rc = setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on)); if (rc < 0) { perror("setsockopt error"); exit(EXIT_FAILURE); } if (asyn) { rc = ioctl(listen_sock, FIONBIO, (char*)&on); if (rc < 0) { perror("ioctl failed"); exit(EXIT_FAILURE); } } name.sin_family = AF_INET; name.sin_port = htons(PORT); name.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(listen_sock, (struct sockaddr*)&name, sizeof(name)) < 0) { perror("bind error"); exit(EXIT_FAILURE); } return listen_sock; }这个函数中我们使用了socket函数创建了一个TCP的Socket。并使用bind函数将该socket绑定到本机特定的端口上。
if (listen(listen_sock, SOMAXCONN) < 0) { perror("listen error"); exit(EXIT_FAILURE); }SOMAXCONN是可以同时处理的最大连接数,它是一个系统宏。在我系统上它的值是128。
while (1) { int new_sock; new_sock = accept(listen_sock, NULL, NULL); if (new_sock < 0) { perror("accept error"); exit(EXIT_FAILURE); }通过accept我们将获得接入的socket。如果socket值合法,我们则需要让接受的请求数自增1
request_add(1);request_add函数将在之后不同模型以及测试程序中被调用,而且会是在不同的线程中调用。于是这儿就引入一个多线程的问题。我并不打算使用锁等方法,而是利用简单的原子操作来实现。
void request_add(int count) { __sync_fetch_and_add(&g_request_count, count); }由于我们设计的朴素模式是一个同步过程,所以接入的socket不是异步的。当一些特殊情况发生时,之后的读取socket内容的行为或者往socket中写入内容的行为可能会卡住。这样将导致整个服务都卡住,这是我们不希望看到的。于是我们需要对该同步socket设置操作超时属性。
set_block_filedes_timeout(new_sock);
void set_block_filedes_timeout(int filedes) { struct timeval tv_out, tv_in; tv_in.tv_sec = READ_TIMEOUT_S; tv_in.tv_usec = READ_TIMEOUT_US; if (setsockopt(filedes, SOL_SOCKET, SO_RCVTIMEO, &tv_in, sizeof(tv_in)) < 0) { perror("set rcv timeout error"); exit(EXIT_FAILURE); } tv_out.tv_sec = WRITE_TIMEOUT_S; tv_out.tv_usec = WRITE_TIMEOUT_US; if (setsockopt(filedes, SOL_SOCKET, SO_SNDTIMEO, &tv_out, sizeof(tv_out)) < 0) { perror("set rcv timeout error"); exit(EXIT_FAILURE); } }这儿要说明下,我在网上看过很多人提问说通过上述方法设置超时属性无效。其实是他们犯了一个错误,就是将socket设置为异步属性。如果socket既设置为异步属性,又设置了超时,socket当然是按异步特点去执行的,超时设置也就无效了。
while (nbytes > 0) { nbytes = recv(filedes, buffer, sizeof(buffer) - 1, 0); if (nbytes > 0) { total_length_recv += nbytes; } //buffer[nbytes] = 0; //fprintf(stderr, "%s", buffer); }这段服务器read操作考虑到了一次性可能读不完全部数据的问题。但是如果客户端发送完数据后,服务器第一次recv可以把全部数据读取出来了。由于读取的数据大于0,于是再次进入读取操作,这个时候,客户端已经处于读取服务器返回的阶段。由于socket是同步的,且未设置超时,导致服务器一直卡在再次读取的操作中,这样就发生了“死锁”。其实这个过程非常有意思,当我们对一段不健壮的代码进行加固时,往往会掉到另外一个坑里。但是只要我们努力的从坑里跳出来,就会豁然开朗且认识到很多别人忽视的问题。
if (0 == server_read(new_sock)) { server_write(new_sock); } close(new_sock);server_read方法在底层调用了read_data方法,read_data方法是我们整个代码的两个关键行为之一
int is_nonblock(int fd) { int flags = fcntl(fd, F_GETFL); if (flags == -1) { perror("get fd flags error"); exit(EXIT_FAILURE); } return (flags & O_NONBLOCK) ? 1 : 0; } int read_data(int filedes, int from_server) { char buffer[MAXMSG]; int nbytes; int total_len_recv; int wait_count = 0; int rec_suc = 0; total_len_recv = 0; while (1) { nbytes = recv(filedes, buffer, sizeof(buffer) - 1, 0); if (nbytes < 0) { if (is_nonblock(filedes)) { if (EAGAIN == errno || EWOULDBLOCK == errno || EINTR == errno) { if (wait_count < WAIT_COUNT_MAX) { wait_count++; usleep(wait_count); continue; } } } break; } if (nbytes == 0) { //fprintf(stderr, "read end\n"); break; } else if (nbytes > 0) { total_len_recv += nbytes; //buffer[nbytes] = 0; //fprintf(stderr, "%s", buffer); } if ((from_server && is_server_recv_finish(total_len_recv)) || (!from_server && is_client_recv_finish(total_len_recv))) { rec_suc = 1; break; } }
if (from_server) { if (rec_suc) { __sync_fetch_and_add(&g_read_suc, 1); return 0; } else { __sync_fetch_and_add(&g_read_fai, 1); __sync_fetch_and_add(&g_server_fai, 1); return -1; } } else { if (rec_suc) { __sync_fetch_and_add(&g_read_suc, 1); __sync_fetch_and_add(&g_client_suc, 1); return 0; } else { __sync_fetch_and_add(&g_read_fai, 1); __sync_fetch_and_add(&g_client_fai, 1); return -1; } } }如果读取操作成功,则进行发送操作。server_write方法在底层调用了write_data方法
int write_data(int filedes, int from_server) { int nbytes; int total_len_send; int wait_count = 0; int index; int send_suc = 0; total_len_send = 0; index = 0; while (1) { if (from_server) { nbytes = send(filedes, get_server_send_ptr(index), get_server_send_len(index), 0); } else { nbytes = send(filedes, get_client_send_ptr(index), get_client_send_len(index), 0); } if (nbytes < 0) { if (is_nonblock(filedes)) { if (EAGAIN == errno || EWOULDBLOCK == errno || EINTR == errno) { if (wait_count < WAIT_COUNT_MAX) { wait_count++; usleep(wait_count); continue; } } } break; } else if (nbytes == 0) { break; } else if (nbytes > 0) { total_len_send += nbytes; } if ((from_server && is_server_send_finish(total_len_send)) ||(!from_server && is_client_send_finish(total_len_send))){ send_suc = 1; break; } }其实现和read_data思路一致,也考虑到一次性写不完的情况和同步异步socket问题。写入操作完成后再去统计相关行为
if (from_server) { if (send_suc) { __sync_fetch_and_add(&g_write_suc, 1); __sync_fetch_and_add(&g_server_suc, 1); return 0; } else { __sync_fetch_and_add(&g_write_fai, 1); __sync_fetch_and_add(&g_server_fai, 1); return -1; } } else { if (send_suc) { __sync_fetch_and_add(&g_write_suc, 1); return 0; } else { __sync_fetch_and_add(&g_write_fai, 1); __sync_fetch_and_add(&g_client_fai, 1); return -1; } } }最后我们讲下测试程序的实现。为了便于测试,我要求测试程序可以接受至少2个参数,第一个参数是用于标识启动多少个线程发送请求;第二个参数用于指定线程中等待多少毫秒发送一次请求;第三个参数是可选的,标识一共发送多少次请求。这样我们可以通过这些参数控制测试程序的行为
#define MAXREQUESTCOUNT 100000 static int g_total = 0; static int g_max_total = 0; void* send_data(void* arg) { int wait_time; int client_sock; wait_time = *(int*)arg; while (__sync_fetch_and_add(&g_total, 1) < g_max_total) { usleep(wait_time); client_sock = make_client_socket(); connect_server(client_sock); request_add(1); set_block_filedes_timeout(client_sock); if (0 == client_write(client_sock)) { client_read(client_sock); } close(client_sock); client_sock = 0; } } int main(int argc, char* argv[]) { int thread_count; int index; int err; int wait_time; pthread_t thread_id; if (argc < 3) { fprintf(stderr, "error! example: client 10 50\n"); return 0; } err = init_print_thread(); if (err < 0) { perror("create print thread error"); exit(EXIT_FAILURE); } thread_count = atoi(argv[1]); wait_time = atoi(argv[2]); g_max_total = MAXREQUESTCOUNT; if (argc > 3) { g_max_total = atoi(argv[3]); } for (index = 0; index < thread_count; index++) { err = pthread_create(&thread_id, NULL, send_data, &wait_time); if (err != 0) { perror("can't create send thread"); exit(EXIT_FAILURE); } } wait_print_thread(); return 0; }线程中,首先通过make_client_socket创建socket并绑定到本地端口上
int make_client_socket() { int client_sock = -1; struct sockaddr_in client_addr; client_sock = socket(AF_INET, SOCK_STREAM, 0); if (client_sock < 0) { perror("create socket error"); exit(EXIT_FAILURE); } bzero(&client_addr, sizeof(client_addr)); client_addr.sin_family = AF_INET; client_addr.sin_addr.s_addr = htons(INADDR_ANY); client_addr.sin_port = htons(0); if (bind(client_sock, (struct sockaddr*)&client_addr, sizeof(client_addr)) < 0) { perror("bind error"); exit(EXIT_FAILURE); } return client_sock; }然后通过connect_server连接服务器
void connect_server(int client_sock) { struct sockaddr_in server_addr; bzero(&server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; if (inet_aton("127.0.0.1", &server_addr.sin_addr) == 0) { perror("set server ip error"); exit(EXIT_FAILURE); } server_addr.sin_port = htons(PORT); if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) { perror("client connect server error"); exit(EXIT_FAILURE); } }最后通过client_write和client_read和服务器通信。这两个函数都是调用上面介绍的write_data和read_data,所以没什么好讲的。
int client_read(int filedes) { return read_data(filedes, 0); } int client_write(int filedes) { return write_data(filedes, 0); }我们启动一千个线程,发送30万次请求。看看朴素模型的处理能力。
朴素、Select、Poll和Epoll网络编程模型实现和分析——朴素模型
标签:
原文地址:http://blog.csdn.net/breaksoftware/article/details/51597255