标签:char ati func 一个 连接 conf 循环 abs lease
前段时间使用libevent网络库实现了一个游戏服务器引擎,在此记录下其中遇到的一个问题。我在设计服务器上选择把逻辑和网络分线程,线程之间通信使用队列。但是这样做会有个问题,当逻辑线程想要主动的发一个数据包的时候,需要一种唤醒网络线程的机制。由于对libevent的api不熟悉,起初我是自己实现这个功能的。实现确实是不复杂,但是缺违背了我的初心:只写简单必要的代码,保证尽可能少的bug。直到后来和同事探讨了一番,才发现原来libevent是有对此做支持的,但是具体怎么做,文档里面没有详细的说,因此同事也说不出个所以然。鉴于此情况,我决定,把libevent中与此相关的源码粗略的过一遍,以求能弄明白以下两件事:
(1)与跨线程唤醒事件等待相关的api有哪些,以及如何使用?
(2)这些api背后到底做了哪些工作?
相关API,及用法
和我起初想得不一样,libevent相关的api很简单并且只有一个:
/*call event_use_pthreads() if use posix threads.*/ evthread_use_windows_threads(); struct event_base* ev_base = event_base_new();
需要注意的是函数evthread_use_windows_threads的调用必须在初始化event_base之前。在此之后,逻辑线程执行bufferevent_write的时候,libevent会自动唤醒网络线程的事件循环,并执行发送数据。
隐藏在API背后的逻辑
先看看evthread_use_windows_threads函数做了什么?
int evthread_use_windows_threads(void) {
...
evthread_set_lock_callbacks(&cbs);
evthread_set_id_callback(evthread_win32_get_id);
evthread_set_condition_callbacks(&cond_cbs);
return 0;
}
通过调用evthread_use_windows_threads,我们设置了一些回调函数,包括设置了libevent获取线程id的回调函数evthread_id_fn_。
看看初始化事件循环的函数event_base_new做了什么:
// event.c
struct event_base *
event_base_new(void) {
...
base = event_base_new_with_config(cfg);
}
struct event_base *
event_base_new_with_config(const struct event_config *cfg) {
...
#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (EVTHREAD_LOCKING_ENABLED() &&
(!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
int r;
EVTHREAD_ALLOC_LOCK(base->th_base_lock, 0);
EVTHREAD_ALLOC_COND(base->current_event_cond);
r = evthread_make_base_notifiable(base);
if (r<0) {
event_warnx("%s: Unable to make base notifiable.", __func__);
event_base_free(base);
return NULL;
}
}
#endif
...
}
int
evthread_make_base_notifiable(struct event_base *base) {
int r;
if (!base)
return -1;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
r = evthread_make_base_notifiable_nolock_(base);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return r;
}
static int
evthread_make_base_notifiable_nolock_(struct event_base *base) {
...
if (evutil_make_internal_pipe_(base->th_notify_fd) == 0) {
notify = evthread_notify_base_default;
cb = evthread_notify_drain_default;
} else {
return -1;
}
base->th_notify_fn = notify;
}
/* Internal function: Set fd[0] and fd[1] to a pair of fds such that writes on
* fd[1] get read from fd[0]. Make both fds nonblocking and close-on-exec.
* Return 0 on success, -1 on failure.
*/
int
evutil_make_internal_pipe_(evutil_socket_t fd[2]) {
}
它通过如下调用:
event_base_new-->event_base_new_with_config-->evthread_make_base_notifiable-->evthread_make_base_notifiable_nolock_-->evutil_make_internal_pipe_
最后通过evutil_socketpair构造了两个本地的互相连接的socket(windows环境下,用此来模拟pipe)。
唤醒相关的核心逻辑在evthread_notify_base_default函数中:
static int evthread_notify_base_default(struct event_base *base) {
char buf[1];
int r;
buf[0] = (char) 0;
#ifdef _WIN32
r = send(base->th_notify_fd[1], buf, 1, 0);
#else
r = write(base->th_notify_fd[1], buf, 1);
#endif
return (r < 0 && ! EVUTIL_ERR_IS_EAGAIN(errno)) ? -1 : 0;
}
可以看出,在windows下libevent的唤醒机制实际也是self pipe trick,只不过它通过构造一对socket来模拟pipe,当需要唤醒的时候,它就往其中一个socket写入1个字节。
再去看看bufferevent_write:
// bufferevent.c
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) {
if (evbuffer_add(bufev->output, data, size) == -1)
return (-1);
return 0;
}
// buffer.c
int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) {
...
evbuffer_invoke_callbacks_(buf);
}
它会触发一系列列回调函数,而这些回调函数在创建bufferevent的时候被指定:
//bufferevent_sock.c
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options) {
...
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
}
static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *cbinfo, void *arg) {
...
if (cbinfo->n_added &&
(bufev->enabled & EV_WRITE) &&
!event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
!bufev_p->write_suspended) {
/* Somebody added data to the buffer, and we would like to
* write, and we were not writing. So, start writing. */
if (bufferevent_add_event_(&bufev->ev_write, &bufev->timeout_write) == -1) {
/* Should we log this? */
}
}
}
//bufferevent.c
int
bufferevent_add_event_(struct event *ev, const struct timeval *tv) {
if (!evutil_timerisset(tv))
return event_add(ev, NULL);
else
return event_add(ev, tv);
}
//event.c
int
event_add(struct event *ev, const struct timeval *tv) {
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
res = event_add_nolock_(ev, tv, 0);
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
}
int
event_add_nolock_(struct event *ev, const struct timeval *tv, int tv_is_absolute) {
...
/* if we are not in the right thread, we need to wake up the loop */
//如果在构造event_base之前调用了evthread_use_windows_threads,EVBASE_NEED_NOTIFY此时会返回true,否则为false。
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
}
由代码可知,在往bufferevent写数据后执行的回调函数中,就有唤醒网络线程逻辑(evthread_notify_base)。那为什么还需要手动调用evthread_use_windows_threads函数呢?
这里再说一下:
#define EVBASE_NEED_NOTIFY(base) ((base)->running_loop && ((base)->th_owner_id != evthreadimpl_get_id_()))
unsigned long
evthreadimpl_get_id_() {
return evthread_id_fn_ ? evthread_id_fn_() : 1;
}
之前说过,当调用evthread_use_windows_threads,设置了libevent获取线程id的回调函数evthread_id_fn_。也正因为此,才会去跑下去执行evthread_notify_base函数:
static int
evthread_notify_base(struct event_base *base) {
EVENT_BASE_ASSERT_LOCKED(base);
if (!base->th_notify_fn)
return -1;
if (base->is_notify_pending)
return 0;
base->is_notify_pending = 1;
return base->th_notify_fn(base);
}
所以,当我们在逻辑线程调用bufferevent_write尝试发送一段数据的时候,它会依据以下的调用,通知网络线程:
bufferevent_write-->evbuffer_add-->evbuffer_invoke_callbacks_-->bufferevent_socket_evtbuf_cb_-->bufferevent_add_event_-->event_add-->event_add_nolock_-->evthread_notify_base
以上便是libevent跨线程唤醒的逻辑。
标签:char ati func 一个 连接 conf 循环 abs lease
原文地址:http://www.cnblogs.com/adinosaur/p/7113631.html