标签:
看了一些网上使用bufferevent的例子,一般都是一个连接对应一个bufferevent,连接accept的时候,给bufferevent设置上fd和对应的回调,在连接断开或者发生错误的时候,将bufferevent释放掉。
昨天在使用bufferevent的时候,突发奇想,准备了一个connection对象池,每个connection关联一个bufferevent,然后多线程处理connection的数据收发,每个线程一个connection链表。在connection出错或者关闭的时候,清空connection的相关状态,重新放入队列中。
1.每次有新的连接被accept后取出队列里空闲的connection,设置相关fd和回掉
1 bool Connection::SetupConnection(evutil_socket_t fd) 2 { 3 m_fd = fd; 4 evutil_make_socket_nonblocking(fd); 5 bufferevent_setcb(m_be, DoRead, NULL, DoError, this); 6 bufferevent_setfd(m_be, fd); 7 if (bufferevent_enable(m_be, EV_READ|EV_WRITE) != -1) 8 { 9 m_bUsed = true; 10 } 11 return m_bUsed; 12 }
2.在event_cb中,如果收到EOF或者Error的时候,关闭connection
1 void Connection::DoEvent(struct bufferevent *bev, short what, void *ctx) 2 { 3 Connection *pConn = (Connection*)ctx; 4 if(what & (BEV_EVENT_ERROR|BEV_EVENT_EOF)) 5 { 6 pConn->CloseConnection(); 7 } 8 ...... 9 }
void Connection::CloseConnection() { //因为CloseConnection是在libevent线程的Event回掉中调用的,直接 //bufferevent_setfd就会自动把之前的event给delete掉 //bufferevent_disable(m_be, EV_READ | EV_WRITE); bufferevent_setfd(m_be, -1); evutil_closesocket(m_fd); m_fd = -1; //当bufferevent_setfd设置fd是-1的时候,并不会将write和read的event //加入到event_base,如果event_base中监听的event数量为0时候, //event_base_loop会退出,导致libevent线程结束,所以这里重新手动 //开启write,read事件 bufferevent_enable(m_be, EV_READ|EV_WRITE) ; m_bUsed = false; }
3.在有新的客户端accept的时候,重新调用SetupConnection建立新的connection连接。
总所周知,bufferevent有个管理收发数据的利器evbuffer,一般发送数据都是调用bufferevent_write,将数据写入发送的evbuffer里,在socket可以发送数据的时候再发送数据,read也是这样,将socket中的数据先读取到接收的evbuffer里,在调用read_cb通知用户数据可读。那么,问题来了,在CloseConnection里调用的一系列bufferevent接口中,这两个evbuffer的数据并没有清空,这样就会造成在重新SetupConnection的时候,bufferevent会有一部分之前的fd残留下来的数据,导致新的fd在处理接收和发送数据的时候被干扰。
第一反应肯定是将evbuffer里的数据清空掉,于是乎就想到evbuffer_drain接口来清理数据。清理接收evbuffer的时候,没问题,而且一般在socket断开或者出错后,也会一次性将接收evbuffer里的数据都读取出来处理的。但是在清理发送evbuffer的时候,发现evbuffer_drain失败,于是乎翻了下libevent的源码:
int evbuffer_drain(struct evbuffer *buf, size_t len) { struct evbuffer_chain *chain, *next; size_t remaining, old_len; int result = 0; EVBUFFER_LOCK(buf); old_len = buf->total_len; if (old_len == 0) goto done; //在这个地方,直接goto done了 if (buf->freeze_start) { result = -1; goto done; } .... }
buf->freeze_start为1,直接结束了,并没有清空evbuffer。参考了一下http://blog.csdn.net/zhwei_87/article/details/43304383的源码分析后发现bufferevent的发送evbuffer只有在真正发送数据的时候才会unfreeze start,其他时间都是freeze的(所以只允许你将新数据写入到发送evbuffer里,因为写是写到end的,drain是由libevent自己处理的),这也很好理解,因为发送的evbuffer里的数据是不允许用户自己随意drain start的(头部的数据是要发送的),关于start跟end,是因为evbuffer是一个链表结构来存放发送数据的。代码如下,这个函数是libevent真正用来发送数据的回掉,用户的回掉在这个函数最后才会执行。
static void bufferevent_writecb(evutil_socket_t fd, short event, void *arg) { struct bufferevent *bufev = arg; struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); int res = 0; short what = BEV_EVENT_WRITING; int connected = 0; ev_ssize_t atmost = -1; _bufferevent_incref_and_lock(bufev); if (event == EV_TIMEOUT) { /* Note that we only check for event==EV_TIMEOUT. If * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the * timeout, since a read has occurred */ what |= BEV_EVENT_TIMEOUT; goto error; } if (bufev_p->connecting) { int c = evutil_socket_finished_connecting(fd); /* we need to fake the error if the connection was refused * immediately - usually connection to localhost on BSD */ if (bufev_p->connection_refused) { bufev_p->connection_refused = 0; c = -1; } if (c == 0) goto done; bufev_p->connecting = 0; if (c < 0) { event_del(&bufev->ev_write); event_del(&bufev->ev_read); _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR); goto done; } else { connected = 1; #ifdef WIN32 if (BEV_IS_ASYNC(bufev)) { event_del(&bufev->ev_write); bufferevent_async_set_connected(bufev); _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED); goto done; } #endif _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED); if (!(bufev->enabled & EV_WRITE) || bufev_p->write_suspended) { event_del(&bufev->ev_write); goto done; } } } atmost = _bufferevent_get_write_max(bufev_p); if (bufev_p->write_suspended) goto done; //这个地方才会unfree掉,并且发送数据 if (evbuffer_get_length(bufev->output)) { evbuffer_unfreeze(bufev->output, 1); res = evbuffer_write_atmost(bufev->output, fd, atmost); evbuffer_freeze(bufev->output, 1); if (res == -1) { int err = evutil_socket_geterror(fd); if (EVUTIL_ERR_RW_RETRIABLE(err)) goto reschedule; what |= BEV_EVENT_ERROR; } else if (res == 0) { /* eof case XXXX Actually, a 0 on write doesn‘t indicate an EOF. An ECONNRESET might be more typical. */ what |= BEV_EVENT_EOF; } if (res <= 0) goto error; _bufferevent_decrement_write_buckets(bufev_p, res); } if (evbuffer_get_length(bufev->output) == 0) { event_del(&bufev->ev_write); } /* * Invoke the user callback if our buffer is drained or below the * low watermark. */ if ((res || !connected) && evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { _bufferevent_run_writecb(bufev); } goto done; reschedule: if (evbuffer_get_length(bufev->output) == 0) { event_del(&bufev->ev_write); } goto done; error: bufferevent_disable(bufev, EV_WRITE); _bufferevent_run_eventcb(bufev, what); done: _bufferevent_decref_and_unlock(bufev); }
所以,如果需要清空bufferevent的发送evbuffer里的数据,必须手动调用evbuffer_unfreeze将start unfreeze掉后才能drain。当然,一般bufferevent_write都是在其他线程中做的,所以要确保drain的时候没有线程在进行bufferevent_write,我这里先不处理多线程write这块了,或者直接在SetupConnection的时候去drain。
void Connection::CloseConnection() { //需要先unfreeze output evbuffer evbuffer_unfreeze(bufferevent_get_output(m_be), 1); if (evbuffer_drain(bufferevent_get_input(m_be), evbuffer_get_length(bufferevent_get_input(m_be)))) { ... } size_t length = evbuffer_get_length(bufferevent_get_output(m_be)); if (evbuffer_drain(bufferevent_get_output(m_be), length)) { ... } bufferevent_setfd(m_be, -1); evutil_closesocket(m_fd); m_fd = -1; bufferevent_enable(m_be, EV_READ|EV_WRITE) ; m_bUsed = false; }
这样bufferevent就可以重复使用了,不知道这样是不是正确的做法,有点麻烦,还是重新分配bufferevent来得方便感觉。
标签:
原文地址:http://www.cnblogs.com/fgokey/p/5123805.html