码迷,mamicode.com
首页 > 其他好文 > 详细

bufferevent 与 socket

时间:2015-09-02 18:55:59      阅读:155      评论:0      收藏:0      [点我收藏+]

标签:

http://blog.sina.com.cn/s/blog_56dee71a0100qx4s.html

 

很多时候,除了响应事件之外,应用还希望做一定的数据缓冲。比如说,写入数据的时候,通常的运行模式是:

l 决定要向连接写入一些数据,把数据放入到缓冲区中

l 等待连接可以写入

l 写入尽量多的数据

l 记住写入了多少数据,如果还有更多数据要写入,等待连接再次可以写入

这种缓冲IO模式很通用,libevent为此提供了一种通用机制,即bufferevent。bufferevent由一个底层的传输端口(如套接字),一个读取缓冲区和一个写入缓冲区组成。与通常的事件在底层传输端口已经就绪,可以读取或者写入的时候执行回调不同的是,bufferevent在读取或者写入了足够量的数据之后调用用户提供的回调。

 

 

 

bufferevent 的简单范例

这里选取了 Libevent 的一个范例程序 hello-world.c 来看看 Libevent 的用法:

#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#ifndef WIN32
#include <netinet/in.h>
# ifdef _XOPEN_SOURCE_EXTENDED
#  include <arpa/inet.h>
# endif
#include <sys/socket.h>
#endif

// bufferevent
#include <event2/bufferevent.h>
// bufferevent 使用的 buffer
#include <event2/buffer.h>
// 连接监听器
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>

static const char MESSAGE[] = "Hello, World!\n";

static const int PORT = 9995;

// 新连接到来时的回调
static void listener_cb(struct evconnlistener *, evutil_socket_t,
    struct sockaddr *, int socklen, void *);
// 读取回调
static void conn_writecb(struct bufferevent *, void *);
// 事件回调
static void conn_eventcb(struct bufferevent *, short, void *);
// 信号回调
static void signal_cb(evutil_socket_t, short, void *);

int
main(int argc, char **argv)
{
  struct event_base *base;
  struct evconnlistener *listener;
  struct event *signal_event;

  struct sockaddr_in sin;
#ifdef WIN32
  WSADATA wsa_data;
  WSAStartup(0x0201, &wsa_data);
#endif

  // 首先构建 base
  base = event_base_new();
  if (!base) {
    fprintf(stderr, "Could not initialize libevent!\n");
    return 1;
  }

  memset(&sin, 0, sizeof(sin));
  sin.sin_family = AF_INET;
  sin.sin_port = htons(PORT);

  // 创建监听器
  listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
      LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
      (struct sockaddr*)&sin,
      sizeof(sin));

  if (!listener) {
    fprintf(stderr, "Could not create a listener!\n");
    return 1;
  }

  // 中断信号
  signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);

  if (!signal_event || event_add(signal_event, NULL)<0) {
    fprintf(stderr, "Could not create/add a signal event!\n");
    return 1;
  }

  event_base_dispatch(base);

  evconnlistener_free(listener);
  event_free(signal_event);
  event_base_free(base);

  printf("done\n");
  return 0;
}

static void
listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    struct sockaddr *sa, int socklen, void *user_data)
{
  struct event_base *base = user_data;
  struct bufferevent *bev;

  // 得到一个新的连接,通过连接 fd 构建一个 bufferevent
  bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
  if (!bev) {
    fprintf(stderr, "Error constructing bufferevent!");
    event_base_loopbreak(base);
    return;
  }
  // 设置创建的 bufferevent 的回调函数
  bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL);
  bufferevent_enable(bev, EV_WRITE);
  bufferevent_disable(bev, EV_READ);

  // 写入数据到 bufferevent 中
  bufferevent_write(bev, MESSAGE, strlen(MESSAGE));
}

static void
conn_writecb(struct bufferevent *bev, void *user_data)
{
  struct evbuffer *output = bufferevent_get_output(bev);
  if (evbuffer_get_length(output) == 0) {
    printf("flushed answer\n");
    bufferevent_free(bev);
  }
}

static void
conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{
  if (events & BEV_EVENT_EOF) {
    printf("Connection closed.\n");
  } else if (events & BEV_EVENT_ERROR) {
    printf("Got an error on the connection: %s\n",
        strerror(errno));/*XXX win32*/
  }
  /* None of the other events can happen here, since we haven‘t enabled
   * timeouts */
  bufferevent_free(bev);
}

static void
signal_cb(evutil_socket_t sig, short events, void *user_data)
{
  struct event_base *base = user_data;
  struct timeval delay = { 2, 0 };

  printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");

  // 停止事件循环
  event_base_loopexit(base, &delay);
}

研究 bufferevent 的关键代码

这里只研究基于 socket 的 bufferevent。从上面 bufferevent 的使用可以看出,有几个关键函数:

  1. 开始需要调用 bufferevent_socket_new 创建一个 bufferevent
  2. 调用 bufferevent_setcb 设置回调函数
  3. 调用 bufferevent_write 写入数据
  4. 调用 bufferevent_free 释放 bufferevent

bufferevent_socket_new 的源码以及分析如下:

// base --- 新创建的 bufferevent 关联的 base
// fd --- bufferevent 关联的文件描述符
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
  // bufferevent_private 结构体持有 bufferevent 的数据
  struct bufferevent_private *bufev_p;
  // bufev == &(bufev_p->bev);
  // struct bufferevent 中存放的是不同类型的 bufferevent 所共有的部分
  // struct bufferevent 是 struct bufferevent_private 的子集
  struct bufferevent *bufev;

  // windows 下如果启用 IOCP 则构建异步 IO bufferevent
#ifdef WIN32
  if (base && event_base_get_iocp(base))
    // 细节略
    return bufferevent_async_new(base, fd, options);
#endif

  if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
    return NULL;

  // 初始化 bufferevent_private
  // 由于 bufferevent 有不同类型,所以这里设计了 bufferevent_ops_socket
  // 对于不同类型的 bufferevent 有不同的 bufferevent_ops_socket 对象
  // bufferevent_ops_socket 包括函数指针和一些信息
  if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
            options) < 0) {
    mm_free(bufev_p);
    return NULL;
  }
  bufev = &bufev_p->bev;
  // 设置 EVBUFFER_FLAG_DRAINS_TO_FD,此选项和 evbuffer_add_file() 函数有关(详见文档)
  evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);

  // 初始化 read 和 write event
  // 一个 bufferevent(一个 fd)关联两个 event 对象 ev_read 和 ev_write
  // ev_read --- socket 可读或者超时
  // ev_write --- socket 可写或者超时
  // 它们都未使用 Edge triggered 方式
  event_assign(&bufev->ev_read, bufev->ev_base, fd,
      EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
  event_assign(&bufev->ev_write, bufev->ev_base, fd,
      EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

  // 为输出缓冲区设置回调
  // 当输出缓冲区被修改时调用 bufferevent_socket_outbuf_cb 回调函数
  evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

  // 防止输入缓冲区和输出缓冲区被意外修改
  evbuffer_freeze(bufev->input, 0);
  evbuffer_freeze(bufev->output, 1);

  return bufev;
}

其中 bufferevent_init_common 函数实现为:

int
bufferevent_init_common(struct bufferevent_private *bufev_private,
    struct event_base *base,
    const struct bufferevent_ops *ops,
    enum bufferevent_options options)
{
  struct bufferevent *bufev = &bufev_private->bev;

  // 创建输入缓冲区
  if (!bufev->input) {
    if ((bufev->input = evbuffer_new()) == NULL)
      return -1;
  }

  // 创建输出缓冲区
  if (!bufev->output) {
    if ((bufev->output = evbuffer_new()) == NULL) {
      evbuffer_free(bufev->input);
      return -1;
    }
  }

  // 初始化 bufferevent 的引用计数
  bufev_private->refcnt = 1;
  bufev->ev_base = base;

  /* Disable timeouts. */
  // 清理超时时间
  evutil_timerclear(&bufev->timeout_read);
  evutil_timerclear(&bufev->timeout_write);

  bufev->be_ops = ops;

  /*
   * Set to EV_WRITE so that using bufferevent_write is going to
   * trigger a callback.  Reading needs to be explicitly enabled
   * because otherwise no data will be available.
   */
  // enabled 被 bufferevent_get_enabled 函数返回
  // enabled 的值可以为 EV_WRITE EV_READ
  bufev->enabled = EV_WRITE;

  // bufferevent 相关线程初始化
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
  if (options & BEV_OPT_THREADSAFE) {
    if (bufferevent_enable_locking(bufev, NULL) < 0) {
      /* cleanup */
      evbuffer_free(bufev->input);
      evbuffer_free(bufev->output);
      bufev->input = NULL;
      bufev->output = NULL;
      return -1;
    }
  }
#endif
  // 选项正确性检查
  if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
      == BEV_OPT_UNLOCK_CALLBACKS) {
    event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
    return -1;
  }
  // defer callbacks 初始化
  if (options & BEV_OPT_DEFER_CALLBACKS) {
    if (options & BEV_OPT_UNLOCK_CALLBACKS)
      event_deferred_cb_init(&bufev_private->deferred,
          bufferevent_run_deferred_callbacks_unlocked,
          bufev_private);
    else
      event_deferred_cb_init(&bufev_private->deferred,
          bufferevent_run_deferred_callbacks_locked,
          bufev_private);
  }

  bufev_private->options = options;

  // 关联 bufferevent 和 buffer
  evbuffer_set_parent(bufev->input, bufev);
  evbuffer_set_parent(bufev->output, bufev);

  return 0;
}

bufferevent 创建成功之后,fd 上存在数据可读则调用 bufferevent_readcb

// fd 可读
static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
  struct bufferevent *bufev = arg;
  struct bufferevent_private *bufev_p =
      EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
  struct evbuffer *input;
  int res = 0;
  short what = BEV_EVENT_READING;
  ev_ssize_t howmuch = -1, readmax=-1;

  _bufferevent_incref_and_lock(bufev);

  // 如果超时了
  if (event == EV_TIMEOUT) {
    /* Note that we only check for event==EV_TIMEOUT. If
     * event==EV_TIMEOUT|EV_READ, we can safely ignore the
     * timeout, since a read has occurred */
    what |= BEV_EVENT_TIMEOUT;
    goto error;
  }

  input = bufev->input;

  /*
   * If we have a high watermark configured then we don‘t want to
   * read more data than would make us reach the watermark.
   */
  // 是否设置了输入缓冲区的最大大小
  if (bufev->wm_read.high != 0) {
    howmuch = bufev->wm_read.high - evbuffer_get_length(input);
    /* we somehow lowered the watermark, stop reading */
    // 缓冲区中数据过多
    if (howmuch <= 0) {
      // 暂停 bufferevent 的数据读取
      // 具体的做法是移除 read event(ev_read)
      bufferevent_wm_suspend_read(bufev);
      goto done;
    }
  }
  // 获取可读最大大小
  // 和限速有关,如果不限速,则为 MAX_TO_READ_EVER(16384) 也就是 16K
  readmax = _bufferevent_get_read_max(bufev_p);
  if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
                 * uglifies this code. XXXX */
    howmuch = readmax;
  // 如果读取暂停
  if (bufev_p->read_suspended)
    goto done;

  // 输入缓冲区可读
  evbuffer_unfreeze(input, 0);
  // 读取 fd 上的数据
  res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
  // 输入缓冲区禁止读取
  evbuffer_freeze(input, 0);

  // 读取数据失败
  if (res == -1) {
    // 获取到错误
    int err = evutil_socket_geterror(fd);
    // EINTR、EAGAIN
    // Windows 下为 WSAEWOULDBLOCK、WSAEINTR
    if (EVUTIL_ERR_RW_RETRIABLE(err))
      goto reschedule;
    // 如果错误是不可重试的,报错
    /* error case */
    what |= BEV_EVENT_ERROR;
  // eof
  } else if (res == 0) {
    /* eof case */
    what |= BEV_EVENT_EOF;
  }

  if (res <= 0)
    goto error;

  _bufferevent_decrement_read_buckets(bufev_p, res);

  /* Invoke the user callback - must always be called last */
  // 判断是否需要调用回调
  if (evbuffer_get_length(input) >= bufev->wm_read.low)
    _bufferevent_run_readcb(bufev);

  goto done;

 reschedule:
  goto done;

 error:
  // 出错后暂停读取数据
  bufferevent_disable(bufev, EV_READ);
  // 通过事件回调通知出错
  _bufferevent_run_eventcb(bufev, what);

 done:
  _bufferevent_decref_and_unlock(bufev);
}

这里比较关键的函数为 evbuffer_read:

int
evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
  struct evbuffer_chain **chainp;
  int n;
  int result;

#ifdef USE_IOVEC_IMPL
  int nvecs, i, remaining;
#else
  struct evbuffer_chain *chain;
  unsigned char *p;
#endif

  EVBUFFER_LOCK(buf);

  // buffer 是否可读
  if (buf->freeze_end) {
    result = -1;
    goto done;
  }

  // 获取当前 socket 可读的字节数
  n = get_n_bytes_readable_on_socket(fd);
  if (n <= 0 || n > EVBUFFER_MAX_READ)
    n = EVBUFFER_MAX_READ;
  // 尽可能多的读取
  if (howmuch < 0 || howmuch > n)
    howmuch = n;

  // 一种实现
#ifdef USE_IOVEC_IMPL
  /* Since we can use iovecs, we‘re willing to use the last
   * NUM_READ_IOVEC chains. */
  // 调整缓冲区(细节略)
  if (_evbuffer_expand_fast(buf, howmuch, NUM_READ_IOVEC) == -1) {
    result = -1;
    goto done;
  } else {
    IOV_TYPE vecs[NUM_READ_IOVEC];
#ifdef _EVBUFFER_IOVEC_IS_NATIVE
    nvecs = _evbuffer_read_setup_vecs(buf, howmuch, vecs,
        NUM_READ_IOVEC, &chainp, 1);
#else
    /* We aren‘t using the native struct iovec.  Therefore,
       we are on win32. */
    struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];
    nvecs = _evbuffer_read_setup_vecs(buf, howmuch, ev_vecs, 2,
        &chainp, 1);

    for (i=0; i < nvecs; ++i)
      WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]);
#endif

#ifdef WIN32
    {
      // 读取到的数据字节数
      DWORD bytesRead;
      DWORD flags=0;
      // windows 下进行 recv
      if (WSARecv(fd, vecs, nvecs, &bytesRead, &flags, NULL, NULL)) {
        /* The read failed. It might be a close,
         * or it might be an error. */
        // 这里使用 WSARecv 时需要注意 WSAECONNABORTED 表示了连接关闭了
        if (WSAGetLastError() == WSAECONNABORTED)
          n = 0;
        else
          n = -1;
      } else
        n = bytesRead;
    }
#else
    // 非 windows 平台 read
    n = readv(fd, vecs, nvecs);
#endif
  }

  // 使用另外一种实现
#else /*!USE_IOVEC_IMPL*/
  /* If we don‘t have FIONREAD, we might waste some space here */
  /* XXX we _will_ waste some space here if there is any space left
   * over on buf->last. */
  if ((chain = evbuffer_expand_singlechain(buf, howmuch)) == NULL) {
    result = -1;
    goto done;
  }

  /* We can append new data at this point */
  p = chain->buffer + chain->misalign + chain->off;

  // read
#ifndef WIN32
  n = read(fd, p, howmuch);
#else
  n = recv(fd, p, howmuch, 0);
#endif
#endif /* USE_IOVEC_IMPL */

  if (n == -1) {
    result = -1;
    goto done;
  }
  if (n == 0) {
    result = 0;
    goto done;
  }

#ifdef USE_IOVEC_IMPL
  remaining = n;
  for (i=0; i < nvecs; ++i) {
    ev_ssize_t space = (ev_ssize_t) CHAIN_SPACE_LEN(*chainp);
    if (space < remaining) {
      (*chainp)->off += space;
      remaining -= (int)space;
    } else {
      (*chainp)->off += remaining;
      buf->last_with_datap = chainp;
      break;
    }
    chainp = &(*chainp)->next;
  }
#else
  chain->off += n;
  advance_last_with_data(buf);
#endif
  buf->total_len += n;
  buf->n_add_for_cb += n;

  /* Tell someone about changes in this buffer */
  // 调用回调
  evbuffer_invoke_callbacks(buf);
  result = n;
done:
  EVBUFFER_UNLOCK(buf);
  return result;
}

读完了 bufferevent_readcb 接下来再看看 bufferevent_writecb:

// fd 可写
static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
  struct bufferevent *bufev = arg;
  struct bufferevent_private *bufev_p =
      EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
  int res = 0;
  short what = BEV_EVENT_WRITING;
  int connected = 0;
  ev_ssize_t atmost = -1;

  _bufferevent_incref_and_lock(bufev);

  // 如果超时了
  if (event == EV_TIMEOUT) {
    /* Note that we only check for event==EV_TIMEOUT. If
     * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
     * timeout, since a read has occurred */
    what |= BEV_EVENT_TIMEOUT;
    goto error;
  }
  // 正在连接
  if (bufev_p->connecting) {
    // 连接是否完成
    int c = evutil_socket_finished_connecting(fd);
    /* we need to fake the error if the connection was refused
     * immediately - usually connection to localhost on BSD */
    if (bufev_p->connection_refused) {
      bufev_p->connection_refused = 0;
      c = -1;
    }

    // 如果还没有连接完成,则不处理
    if (c == 0)
      goto done;

    // 如果连接完成
    bufev_p->connecting = 0;
    // 如果连接出错
    if (c < 0) {
      // 移除 bufferevent 的事件
      event_del(&bufev->ev_write);
      event_del(&bufev->ev_read);
      // 事件回调,告知连接出错
      _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
      goto done;
    // 如果连接成功
    } else {
      connected = 1;
#ifdef WIN32
      // 是否为异步 IO bufferevent(IOCP)
      if (BEV_IS_ASYNC(bufev)) {
        event_del(&bufev->ev_write);
        bufferevent_async_set_connected(bufev);
        _bufferevent_run_eventcb(bufev,
            BEV_EVENT_CONNECTED);
        goto done;
      }
#endif
      // 通知连接成功
      _bufferevent_run_eventcb(bufev,
          BEV_EVENT_CONNECTED);
      // 如果不可写
      if (!(bufev->enabled & EV_WRITE) ||
          bufev_p->write_suspended) {
        // 移除 ev_write
        event_del(&bufev->ev_write);
        goto done;
      }
    }
  }

  // 获取可写最大大小
  // 和限速有关,如果不限速,则为 MAX_TO_WRITE_EVER(16384) 也就是 16K
  atmost = _bufferevent_get_write_max(bufev_p);

  // 如果不可写
  if (bufev_p->write_suspended)
    goto done;

  // 如果输出缓冲区存在数据
  if (evbuffer_get_length(bufev->output)) {
    evbuffer_unfreeze(bufev->output, 1);
    // 写入尽可能多的数据到 fd
    res = evbuffer_write_atmost(bufev->output, fd, atmost);
    evbuffer_freeze(bufev->output, 1);
    if (res == -1) {
      int err = evutil_socket_geterror(fd);
      // 如果写入数据出错时可重试
      if (EVUTIL_ERR_RW_RETRIABLE(err))
        goto reschedule;
      // 真正出错则设置
      what |= BEV_EVENT_ERROR;
    // 连接断开
    } else if (res == 0) {
      /* eof case
         XXXX Actually, a 0 on write doesn‘t indicate
         an EOF. An ECONNRESET might be more typical.
       */
      what |= BEV_EVENT_EOF;
    }
    if (res <= 0)
      goto error;

    _bufferevent_decrement_write_buckets(bufev_p, res);
  }

  // 如果缓冲区所有数据都被写入完了
  if (evbuffer_get_length(bufev->output) == 0) {
    // 清除 ev_write(无需继续写入数据了)
    event_del(&bufev->ev_write);
  }

  /*
   * Invoke the user callback if our buffer is drained or below the
   * low watermark.
   */
  // 尝试调用写入回调函数
  if ((res || !connected) &&
      evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
    _bufferevent_run_writecb(bufev);
  }

  goto done;

 reschedule:
  // 如果无法写入,但是输出缓冲区已经为空时
  if (evbuffer_get_length(bufev->output) == 0) {
    // 无需继续写入数据了
    event_del(&bufev->ev_write);
  }
  goto done;

 error:
  // 出错时告知上层
  bufferevent_disable(bufev, EV_WRITE);
  _bufferevent_run_eventcb(bufev, what);

 done:
  _bufferevent_decref_and_unlock(bufev);
}

bufferevent_writecb 中比较关键的一个函数为 evbuffer_write_atmost:

int
evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd,
    ev_ssize_t howmuch)
{
  int n = -1;

  EVBUFFER_LOCK(buffer);

  // 是否不可写
  if (buffer->freeze_start) {
    goto done;
  }

  // 写尽量多的数据
  if (howmuch < 0 || (size_t)howmuch > buffer->total_len)
    howmuch = buffer->total_len;

  // 如果有数据需要写
  if (howmuch > 0) {
    // 使用 evbuffer_write_sendfile 写数据
#ifdef USE_SENDFILE
    struct evbuffer_chain *chain = buffer->first;
    if (chain != NULL && (chain->flags & EVBUFFER_SENDFILE))
      n = evbuffer_write_sendfile(buffer, fd, howmuch);
    else {
#endif
#ifdef USE_IOVEC_IMPL
    // 使用 evbuffer_write_iovec 写数据
    n = evbuffer_write_iovec(buffer, fd, howmuch);
#elif defined(WIN32)
    /* XXX(nickm) Don‘t disable this code until we know if
     * the WSARecv code above works. */
    void *p = evbuffer_pullup(buffer, howmuch);
    // windows 下 send
    n = send(fd, p, howmuch, 0);
#else
    void *p = evbuffer_pullup(buffer, howmuch);
    // 其他平台 write
    n = write(fd, p, howmuch);
#endif
#ifdef USE_SENDFILE
    }
#endif
  }

  if (n > 0)
    // 如果写入的数据大于 0 则缓冲区对应移除相关数据
    evbuffer_drain(buffer, n);

done:
  EVBUFFER_UNLOCK(buffer);
  return (n);
}

代码读到这里,对于 bufferevent 的创建、socket 读写已经有了一定的了解,下面再看看 bufferevent_write,此函数实际上只是直接向输出缓冲区写入数据,缓冲区写入数据后,会调用回调 bufferevent_socket_outbuf_cb(创建 bufferevent 时设置的),此回调工作内容比较简单,主要就是将 ev_write 注册到 base 中去:

static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
  struct bufferevent *bufev = arg;
  struct bufferevent_private *bufev_p =
      EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

    // evbuffer 中有数据
  if (cbinfo->n_added &&
    // bufferevent 可以写入数据
      (bufev->enabled & EV_WRITE) &&
    // 检测 ev_write 是否已经是 pending 状态或者已经被调度
    // 这里无需重复注册 ev_write
      !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
    // bufferevent 是否已经禁止写入
      !bufev_p->write_suspended) {
    /* Somebody added data to the buffer, and we would like to
     * write, and we were not writing.  So, start writing. */
    // 注册 ev_write 写入数据
    if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
        /* Should we log this? */
    }
  }
}

最后来看看释放过程:

void
bufferevent_free(struct bufferevent *bufev)
{
  BEV_LOCK(bufev);
  // 清理回调
  bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
  // 此函数似乎啥也没做
  _bufferevent_cancel_all(bufev);
  // 真正的清理发生在 bufferevent 引用计数为 0 时
  _bufferevent_decref_and_unlock(bufev);
}

int
_bufferevent_decref_and_unlock(struct bufferevent *bufev)
{
  struct bufferevent_private *bufev_private =
      EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
  struct bufferevent *underlying;

  EVUTIL_ASSERT(bufev_private->refcnt > 0);

  // 引用计数减 1
  if (--bufev_private->refcnt) {
    BEV_UNLOCK(bufev);
    return 0;
  }
  // 如果 bufferevent 引用技术为 0 了

  // 获取底层 bufferevent
  underlying = bufferevent_get_underlying(bufev);

  /* Clean up the shared info */
  if (bufev->be_ops->destruct)
    // 调用 be_socket_destruct
    // 清理 ev_read 和 ev_write
    // 关闭 socket
    bufev->be_ops->destruct(bufev);

  /* XXX what happens if refcnt for these buffers is > 1?
   * The buffers can share a lock with this bufferevent object,
   * but the lock might be destroyed below. */
  /* evbuffer will free the callbacks */
  // 释放缓冲区
  evbuffer_free(bufev->input);
  evbuffer_free(bufev->output);

  // 如果使用了限速,则进行相关清理
  if (bufev_private->rate_limiting) {
    if (bufev_private->rate_limiting->group)
      bufferevent_remove_from_rate_limit_group_internal(bufev,0);
    if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event))
      event_del(&bufev_private->rate_limiting->refill_bucket_event);
    event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event);
    mm_free(bufev_private->rate_limiting);
    bufev_private->rate_limiting = NULL;
  }

  event_debug_unassign(&bufev->ev_read);
  event_debug_unassign(&bufev->ev_write);

  BEV_UNLOCK(bufev);
  if (bufev_private->own_lock)
    EVTHREAD_FREE_LOCK(bufev_private->lock,
        EVTHREAD_LOCKTYPE_RECURSIVE);

  /* Free the actual allocated memory. */
  mm_free(((char*)bufev) - bufev->be_ops->mem_offset);

  /* Release the reference to underlying now that we no longer need the
   * reference to it.  We wait this long mainly in case our lock is
   * shared with underlying.
   *
   * The ‘destruct‘ function will also drop a reference to underlying
   * if BEV_OPT_CLOSE_ON_FREE is set.
   *
   * XXX Should we/can we just refcount evbuffer/bufferevent locks?
   * It would probably save us some headaches.
   */
  if (underlying)
    bufferevent_decref(underlying);

  return 1;
}

更多详细的内容还需要更进一步阅读源码。

bufferevent 与 socket

标签:

原文地址:http://www.cnblogs.com/diegodu/p/4779114.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!