标签:笔记 queue epo handle 函数 version pre 自己 base
int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0
本文主要是分析代码,方便自己日后查阅.
=========================================
在上一篇中讲到io_thread_t的线程循环函数实际上调用的,是根据不同平台下的首选I/O多路复用(select_t/poll_t/epoll_t/kqueue_t)的成员函数loop().
怎样确定选用哪种I/O多路复用,由一些预编译宏确定,请看poller.hpp头文件.
本文是在windows平台下进行分析. windows下选用的是select_t,并不是iocp.
1. I/O线程
io_thread_t有三个成员变量:
// I/O thread accesses incoming commands via this mailbox.
mailbox_t mailbox; //接收命令消息的邮箱,mailbox相关资料会在后面的文章展开介绍. 当需要和io_thread_t通信时,给它的邮箱发一个command_t命令
// Handle associated with mailbox‘ file descriptor.
poller_t::handle_t mailbox_handle; //与邮箱绑定的句柄
// I/O multiplexing is performed using a poller object.
poller_t *poller; //选用的i/o多路复用
io_thread_t这个类的功能很简洁,主要操作有: 线程开启,线程结束,处理在mailbox里的命令队列的消息(in_events函数).而mailbox里有消息待处理,是通过mailbox的fd状态可读来进行通知的,这个fd就是io_thread_t的成员变量mailbox_handle.
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_)
{
poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);//初始化时加进去poller_t的可读fd集合了.
}
2. poller_t
poller_t实际上是一个typedef的类型:
typedef select_t poller_t;
typedef epoll_t poller_t;
...
它根据不同平台下的首选I/O多路复用(select_t/poll_t/epoll_t/kqueue_t).本文只分析select_t.
就select_t而言,在windows和linux平台下,套接字集合的管理有所不同,但原理也差不多. 在select_t里有两个平台无关的通用的结构体:
// Internal state.
struct fds_set_t //select_t对感兴趣的fd的事件集合管理
{
fds_set_t ();
fds_set_t (const fds_set_t& other_);
fds_set_t& operator=(const fds_set_t& other_);
// Convinient method to descriptor from all sets.
void remove_fd (const fd_t& fd_);
fd_set read;
fd_set write;
fd_set error;
};
struct fd_entry_t //与fd对应的事件处理对象(可理解成实现了i_poll_events相关接口并需要在io线程处理事件的对象)
{
fd_t fd;
zmq::i_poll_events* events;
};
typedef std::vector<fd_entry_t> fd_entries_t;
#if defined ZMQ_HAVE_WINDOWS
......
#else
fd_entries_t fd_entries; //fd对应的事件对象集合
fds_set_t fds_set; //select系统调用需要的所有感兴趣的fd集合
fd_t maxfd; //select系统调用需要的最大fd值
bool retired; //是否需要移除fd对应的事件对象,如果为true,则从fd_entries删除fd所对应的事件对象
#endif
poller_t还有一个thread_t成员变量worker,它才是系统线程的包装 (io_thread_t.poller->worker).worker线程开启后,实际执行的就是poller_t:loop()函数.
// Handle of the physical thread doing the I/O work.
thread_t worker;
poller_t对某些事件对象(实现了i_poll_events:in_events接口)感兴趣,就以fd为key,加进去集合fd_entries_t.同时把fd加到fds_set.error集合里,监听fd的错误事件.
zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
{
fd_entry_t fd_entry;
fd_entry.fd = fd_;
fd_entry.events = events_;
#if defined ZMQ_HAVE_WINDOWS
......
#else
fd_entries.push_back (fd_entry);
FD_SET (fd_, &fds_set.error);
if (fd_ > maxfd)
maxfd = fd_;
#endif
adjust_load (1); //fd数量调整,原子增减操作
return fd_;
}
需要注意的是,add_fd并没有把fd放进fds_set.read和fds_set.write,也就是说add_fd加进去的fd并不能被select监听到读写事件.
但是删除fd时,会把fd相关的信息都从poller_t里移除掉.
void zmq::select_t::rm_fd (handle_t handle_)
{
#if defined ZMQ_HAVE_WINDOWS
......
#else
fd_entries_t::iterator fd_entry_it;
for (fd_entry_it = fd_entries.begin ();
fd_entry_it != fd_entries.end (); ++fd_entry_it)
if (fd_entry_it->fd == handle_) //遍历集合找到目标元素
break;
zmq_assert (fd_entry_it != fd_entries.end ());
fd_entry_it->fd = retired_fd; //标记设置为移除,注意找到目标元素后并没有立即从vector里remove掉,而是标记retired为true,在select系统调用完成后统一移除.
fds_set.remove_fd (handle_); //从select集合里去掉
if (handle_ == maxfd) { //更新最大的fd值
maxfd = retired_fd;
for (fd_entry_it = fd_entries.begin (); fd_entry_it != fd_entries.end ();
++fd_entry_it)
if (fd_entry_it->fd > maxfd)
maxfd = fd_entry_it->fd;
}
retired = true; //标记为需要删除
#endif
adjust_load (-1); //fd数量调整
}
poller_t对fd的读写监听是通过这几个函数来操作的:
void set_pollin (handle_t handle_); //监听fd可读状态
void reset_pollin (handle_t handle_); //移除fd监听可读
void set_pollout (handle_t handle_); //监听fd可写状态
void reset_pollout (handle_t handle_); //移除fd监听可写
poller_t继承自poller_base_t,含有定时器集合:
// Clock instance private to this I/O thread.
clock_t clock;
// List of active timers.
struct timer_info_t
{
zmq::i_poll_events *sink;
int id;
};
typedef std::multimap <uint64_t, timer_info_t> timers_t;
timers_t timers;
定时器集合timers_t是用std:multimap容器,能保证timer的重复键值并有序.处理timer事件也很简洁,从最小时间值的元素开始与当前时间戳比较一下,大于当前时间就是timer时间到来,此时执行timer事件处理,处理完后从定时器集合移除.
uint64_t zmq::poller_base_t::execute_timers ()
{
// Fast track.
if (timers.empty ())
return 0;
// Get the current time.
uint64_t current = clock.now_ms ();
// Execute the timers that are already due.
timers_t::iterator it = timers.begin ();
while (it != timers.end ()) {
if (it->first > current)
return it->first - current;
// Trigger the timer.
it->second.sink->timer_event (it->second.id);
// Remove it from the list of active timers.
timers_t::iterator o = it;
++it;
timers.erase (o);
}
// There are no more timers.
return 0;
}
3. I/O线程的循环函数
循环里做了三件事情:
1.执行已注册的定时器
2.对fds_set的read/write/error的fd集合进行select,并处理各个fd发生的事件.
3.从事件集合fd_entries里移除已经标记为retired_fd的事件对象.
void zmq::select_t::loop ()
{
while (!stopping) {
// Execute any due timers.
int timeout = (int) execute_timers ();
int rc = 0;
#if defined ZMQ_HAVE_WINDOWS
......
#else
fds_set_t local_fds_set = fds_set;
rc = select (maxfd + 1, &local_fds_set.read, &local_fds_set.write,
&local_fds_set.error, timeout ? &tv : NULL);
if (rc == -1) {
errno_assert (errno == EINTR);
continue;
}
// Size is cached to avoid iteration through just added descriptors.
for (fd_entries_t::size_type i = 0, size = fd_entries.size (); i < size && rc > 0; ++i) {
fd_entry_t& fd_entry = fd_entries [i];
......
if (FD_ISSET (fd_entry.fd, &local_fds_set.read)) {
fd_entry.events->in_event ();
--rc;
}
......
if (FD_ISSET (fd_entry.fd, &local_fds_set.write)) {
fd_entry.events->out_event ();
--rc;
}
......
if (FD_ISSET (fd_entry.fd, &local_fds_set.error)) {
fd_entry.events->in_event ();
--rc;
}
}
if (retired) { //等待select返回并处理完fd的事件后,再统一从fd_entries集合里移除标记为retired_fd的元素
retired = false;
fd_entries.erase (std::remove_if (fd_entries.begin (), fd_entries.end (),
is_retired_fd), fd_entries.end ());
}
#endif
}
}
标签:笔记 queue epo handle 函数 version pre 自己 base
原文地址:http://www.cnblogs.com/wqchen/p/6930751.html