标签:
Because generating and reading the select() bit arrays takes time proportional to the largest fd that you provided for select(), the select() call scales terribly when the number of sockets is high.
Different operating systems have provided different replacement functions for select. These include poll(), epoll(), kqueue(), evports, and /dev/poll. All of these give better performance than select(), and all but poll() give O(1) performance for adding a socket, removing a socket, and for noticing that a socket is ready for IO.
Unfortunately, none of the efficient interfaces is a ubiquitous standard.
Linux has epoll(), the BSDs (including Darwin) have kqueue(), Solaris has evports and /dev/poll… and none of these operating systems has any of the others.
So if you want to write a portable high-performance asynchronous application, you’ll need an abstraction that wraps all of these interfaces, and provides whichever one of them is the most efficient.
And that’s what the lowest level of the Libevent API does for you. It provides a consistent interface to various select() replacements, using the most efficient version available on the computer where it’s running.
Here’s yet another version of our asynchronous ROT13 server. This time, it uses Libevent 2 instead of select(). Note that the fd_sets are gone now: instead, we associate and disassociate events with a struct event_base, which might be implemented in terms of select(), poll(), epoll(), kqueue(), etc.
贴一例代码吧
Example: A low-level ROT13 server with Libevent
1 /* For sockaddr_in */ 2 #include <netinet/in.h> 3 /* For socket functions */ 4 #include <sys/socket.h> 5 /* For fcntl */ 6 #include <fcntl.h> 7 8 #include <event2/event.h> 9 10 #include <assert.h> 11 #include <unistd.h> 12 #include <string.h> 13 #include <stdlib.h> 14 #include <stdio.h> 15 #include <errno.h> 16 17 #define MAX_LINE 16384 18 19 void do_read(evutil_socket_t fd, short events, void *arg); 20 void do_write(evutil_socket_t fd, short events, void *arg); 21 22 char 23 rot13_char(char c) 24 { 25 /* We don‘t want to use isalpha here; setting the locale would change 26 * which characters are considered alphabetical. */ 27 if ((c >= ‘a‘ && c <= ‘m‘) || (c >= ‘A‘ && c <= ‘M‘)) 28 return c + 13; 29 else if ((c >= ‘n‘ && c <= ‘z‘) || (c >= ‘N‘ && c <= ‘Z‘)) 30 return c - 13; 31 else 32 return c; 33 } 34 35 struct fd_state { 36 char buffer[MAX_LINE]; 37 size_t buffer_used; 38 39 size_t n_written; 40 size_t write_upto; 41 42 struct event *read_event; 43 struct event *write_event; 44 }; 45 46 struct fd_state * 47 alloc_fd_state(struct event_base *base, evutil_socket_t fd) 48 { 49 struct fd_state *state = malloc(sizeof(struct fd_state)); 50 if (!state) 51 return NULL; 52 state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state); 53 if (!state->read_event) { 54 free(state); 55 return NULL; 56 } 57 state->write_event = 58 event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state); 59 60 if (!state->write_event) { 61 event_free(state->read_event); 62 free(state); 63 return NULL; 64 } 65 66 state->buffer_used = state->n_written = state->write_upto = 0; 67 68 assert(state->write_event); 69 return state; 70 } 71 72 void 73 free_fd_state(struct fd_state *state) 74 { 75 event_free(state->read_event); 76 event_free(state->write_event); 77 free(state); 78 } 79 80 void 81 do_read(evutil_socket_t fd, short events, void *arg) 82 { 83 struct fd_state *state = arg; 84 char buf[1024]; 85 int i; 86 ssize_t result; 87 while (1) { 88 assert(state->write_event); 89 result = recv(fd, buf, sizeof(buf), 0); 90 if (result <= 0) 91 break; 92 93 for (i=0; i < result; ++i) { 94 if (state->buffer_used < sizeof(state->buffer)) 95 state->buffer[state->buffer_used++] = rot13_char(buf[i]); 96 if (buf[i] == ‘\n‘) { 97 assert(state->write_event); 98 event_add(state->write_event, NULL); 99 state->write_upto = state->buffer_used; 100 } 101 } 102 } 103 104 if (result == 0) { 105 free_fd_state(state); 106 } else if (result < 0) { 107 if (errno == EAGAIN) // XXXX use evutil macro 108 return; 109 perror("recv"); 110 free_fd_state(state); 111 } 112 } 113 114 void 115 do_write(evutil_socket_t fd, short events, void *arg) 116 { 117 struct fd_state *state = arg; 118 119 while (state->n_written < state->write_upto) { 120 ssize_t result = send(fd, state->buffer + state->n_written, 121 state->write_upto - state->n_written, 0); 122 if (result < 0) { 123 if (errno == EAGAIN) // XXX use evutil macro 124 return; 125 free_fd_state(state); 126 return; 127 } 128 assert(result != 0); 129 130 state->n_written += result; 131 } 132 133 if (state->n_written == state->buffer_used) 134 state->n_written = state->write_upto = state->buffer_used = 1; 135 136 event_del(state->write_event); 137 } 138 139 void 140 do_accept(evutil_socket_t listener, short event, void *arg) 141 { 142 struct event_base *base = arg; 143 struct sockaddr_storage ss; 144 socklen_t slen = sizeof(ss); 145 int fd = accept(listener, (struct sockaddr*)&ss, &slen); 146 if (fd < 0) { // XXXX eagain?? 147 perror("accept"); 148 } else if (fd > FD_SETSIZE) { 149 close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */ 150 } else { 151 struct fd_state *state; 152 evutil_make_socket_nonblocking(fd); 153 state = alloc_fd_state(base, fd); 154 assert(state); /*XXX err*/ 155 assert(state->write_event); 156 event_add(state->read_event, NULL); 157 } 158 } 159 160 void 161 run(void) 162 { 163 evutil_socket_t listener; 164 struct sockaddr_in sin; 165 struct event_base *base; 166 struct event *listener_event; 167 168 base = event_base_new(); 169 if (!base) 170 return; /*XXXerr*/ 171 172 sin.sin_family = AF_INET; 173 sin.sin_addr.s_addr = 0; 174 sin.sin_port = htons(40713); 175 176 listener = socket(AF_INET, SOCK_STREAM, 0); 177 evutil_make_socket_nonblocking(listener); 178 179 #ifndef WIN32 180 { 181 int one = 1; 182 setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); 183 } 184 #endif 185 186 if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { 187 perror("bind"); 188 return; 189 } 190 191 if (listen(listener, 16)<0) { 192 perror("listen"); 193 return; 194 } 195 196 listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); 197 /*XXX check it */ 198 event_add(listener_event, NULL); 199 200 event_base_dispatch(base); 201 } 202 203 int 204 main(int c, char **v) 205 { 206 setvbuf(stdout, NULL, _IONBF, 0); 207 208 run(); 209 return 0; 210 }
(Other things to note in the code: instead of typing the sockets as "int", we’re using the type evutil_socket_t. Instead of calling fcntl(O_NONBLOCK) to make the sockets nonblocking, we’re calling evutil_make_socket_nonblocking. These changes make our code compatible with the divergent parts of the Win32 networking API.)
Fortunately, the Libevent 2 "bufferevents" interface solves both of these issues: it makes programs much simpler to write, and provides an interface that Libevent can implement efficiently on Windows and on Unix.
Here’s our ROT13 server one last time, using the bufferevents API.
1 /* For sockaddr_in */ 2 #include <netinet/in.h> 3 /* For socket functions */ 4 #include <sys/socket.h> 5 /* For fcntl */ 6 #include <fcntl.h> 7 8 #include <event2/event.h> 9 #include <event2/buffer.h> 10 #include <event2/bufferevent.h> 11 12 #include <assert.h> 13 #include <unistd.h> 14 #include <string.h> 15 #include <stdlib.h> 16 #include <stdio.h> 17 #include <errno.h> 18 19 #define MAX_LINE 16384 20 21 void do_read(evutil_socket_t fd, short events, void *arg); 22 void do_write(evutil_socket_t fd, short events, void *arg); 23 24 char 25 rot13_char(char c) 26 { 27 /* We don‘t want to use isalpha here; setting the locale would change 28 * which characters are considered alphabetical. */ 29 if ((c >= ‘a‘ && c <= ‘m‘) || (c >= ‘A‘ && c <= ‘M‘)) 30 return c + 13; 31 else if ((c >= ‘n‘ && c <= ‘z‘) || (c >= ‘N‘ && c <= ‘Z‘)) 32 return c - 13; 33 else 34 return c; 35 } 36 37 void 38 readcb(struct bufferevent *bev, void *ctx) 39 { 40 struct evbuffer *input, *output; 41 char *line; 42 size_t n; 43 int i; 44 input = bufferevent_get_input(bev); 45 output = bufferevent_get_output(bev); 46 47 while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) { 48 for (i = 0; i < n; ++i) 49 line[i] = rot13_char(line[i]); 50 evbuffer_add(output, line, n); 51 evbuffer_add(output, "\n", 1); 52 free(line); 53 } 54 55 if (evbuffer_get_length(input) >= MAX_LINE) { 56 /* Too long; just process what there is and go on so that the buffer 57 * doesn‘t grow infinitely long. */ 58 char buf[1024]; 59 while (evbuffer_get_length(input)) { 60 int n = evbuffer_remove(input, buf, sizeof(buf)); 61 for (i = 0; i < n; ++i) 62 buf[i] = rot13_char(buf[i]); 63 evbuffer_add(output, buf, n); 64 } 65 evbuffer_add(output, "\n", 1); 66 } 67 } 68 69 void 70 errorcb(struct bufferevent *bev, short error, void *ctx) 71 { 72 if (error & BEV_EVENT_EOF) { 73 /* connection has been closed, do any clean up here */ 74 /* ... */ 75 } else if (error & BEV_EVENT_ERROR) { 76 /* check errno to see what error occurred */ 77 /* ... */ 78 } else if (error & BEV_EVENT_TIMEOUT) { 79 /* must be a timeout event handle, handle it */ 80 /* ... */ 81 } 82 bufferevent_free(bev); 83 } 84 85 void 86 do_accept(evutil_socket_t listener, short event, void *arg) 87 { 88 struct event_base *base = arg; 89 struct sockaddr_storage ss; 90 socklen_t slen = sizeof(ss); 91 int fd = accept(listener, (struct sockaddr*)&ss, &slen); 92 if (fd < 0) { 93 perror("accept"); 94 } else if (fd > FD_SETSIZE) { 95 close(fd); 96 } else { 97 struct bufferevent *bev; 98 evutil_make_socket_nonblocking(fd); 99 bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); 100 bufferevent_setcb(bev, readcb, NULL, errorcb, NULL); 101 bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE); 102 bufferevent_enable(bev, EV_READ|EV_WRITE); 103 } 104 } 105 106 void 107 run(void) 108 { 109 evutil_socket_t listener; 110 struct sockaddr_in sin; 111 struct event_base *base; 112 struct event *listener_event; 113 114 base = event_base_new(); 115 if (!base) 116 return; /*XXXerr*/ 117 118 sin.sin_family = AF_INET; 119 sin.sin_addr.s_addr = 0; 120 sin.sin_port = htons(40713); 121 122 listener = socket(AF_INET, SOCK_STREAM, 0); 123 evutil_make_socket_nonblocking(listener); 124 125 #ifndef WIN32 126 { 127 int one = 1; 128 setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); 129 } 130 #endif 131 132 if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { 133 perror("bind"); 134 return; 135 } 136 137 if (listen(listener, 16)<0) { 138 perror("listen"); 139 return; 140 } 141 142 listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); 143 /*XXX check it */ 144 event_add(listener_event, NULL); 145 146 event_base_dispatch(base); 147 } 148 149 int 150 main(int c, char **v) 151 { 152 setvbuf(stdout, NULL, _IONBF, 0); 153 154 run(); 155 return 0; 156 }
标签:
原文地址:http://www.cnblogs.com/xiaokuang/p/4550585.html