0 设计
EventLoop 类,主要是用来管理一个进程中的channel、计时器、以及epoll 的类。
每个类只有一个,(因为如果有两个,那么一个eventloop 在loop()的时候,另一个eventloop 得不到执行)。
每一个channel或者说是每一个文件描述符都必须属于eventloop,换句话说,每一个文件描述符属于一个线程。(这是必然的,因为eventloop在代表了一个线程)
eventloop 类的主要工作,就是wait epoll,然后执行channel 的事件处理函数。
1 源码
#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H
#include <vector>
#include <boost/any.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <muduo/base/Mutex.h>
#include <muduo/base/CurrentThread.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/TimerId.h>
namespace muduo
{
namespace net
{
class Channel;
class Poller;
class TimerQueue;
// 事件处理类
class EventLoop : boost::noncopyable
{
public:
typedef boost::function<void()> Functor;
EventLoop();
~EventLoop();
// 里面一个while循环,直接开始工作.
void loop();
// 别的线程调用eventloop 的quit,设置 loop() 中循环的判断条件
// 从而在下一次循环的时候停止
void quit();
// 返回上一次 epoll 的返回时间
Timestamp pollReturnTime() const { return pollReturnTime_; }
int64_t iteration() const { return iteration_; }
// 添加函数对象到eventloop线程中执行。
void runInLoop(const Functor &cb);
// 当调用函数添加函数对象到eventloop中执行的时候
// 需要判断是否是在eventloop 线程中.如果是在当前eventloop 的线程中
// 那么直接执行,否则,就添加到队列中,然后唤醒epoll
void queueInLoop(const Functor &cb);
// 返回当前 等待执行函数对象的队列长度
size_t queueSize() const;
// 下面三个函数,是用来添加计时器对象的
TimerId runAt(const Timestamp &time, const TimerCallback &cb);
TimerId runAfter(double delay, const TimerCallback &cb);
TimerId runEvery(double interval, const TimerCallback &cb);
void cancel(TimerId timerId);
// wakeup函数用来显示的唤醒 epoll ,主要是在 eventloop执行了quit
// 还有添加了函数对象到队列中的时候 主动的唤醒 epoll
// 能唤醒的原因在于,eventloop 对象持有一个 eventfd,注册在了epoll中
// 然后需要唤醒的时候,直接 eventfd 中写数据即可.
void wakeup();
// 这两个函数被channel 的update 和remove 调用
// 然后,这俩函数由去调用epoll 的对应接口
void updateChannel(Channel *channel);
void removeChannel(Channel *channel);
// 用来判断一个 epoll 是否持有 这个 channel
// 调用的是 epoll 的对象
bool hasChannel(Channel *channel);
// 用来判断 绑定在eventloop 中的channel 的运行是否在 本eventloop 线程中
// 或是其他操作,需要属于本eventloop 线程
void assertInLoopThread()
{
if (!isInLoopThread())
{
abortNotInLoopThread();
}
}
// 被上面的函数调用,就是简单的判断下初始化eventloop 的线程是不是当前运行函数的线程
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
// eventHandling() 用来判断是否是在已就绪channel的事件处理函数。
// 因为channel的处理只能是在 eventloop 线程中,也就是说在 loop 处理的时候
// 如果这个时候 remove channel 那么要删除的channel 就是当前正在处理channel
bool eventHandling() const { return eventHandling_; }
// 没被用到,貌似
void setContext(const boost::any &context)
{
context_ = context;
}
const boost::any &getContext() const
{
return context_;
}
boost::any *getMutableContext()
{
return &context_;
}
static EventLoop *getEventLoopOfCurrentThread();
private:
void abortNotInLoopThread();
// 这个函数是在,eventloop 主动唤醒 epoll 的时候处理eventfd 读事件的函数
void handleRead(); // waked up
// 执行队列中的函数对象
void doPendingFunctors();
// 用来打印本次已就绪channel 发生的对应的时间,也就是去调用了channel 的tostring
void printActiveChannels() const; // DEBUG
typedef std::vector<Channel *> ChannelList;
bool looping_;
bool quit_;
bool eventHandling_;
bool callingPendingFunctors_;
// iteration_ 是poller 被唤醒的次数
int64_t iteration_;
// 初始化时候,本eventloop 所在的线程
const pid_t threadId_;
// 上一次epoll被唤醒的时间戳
Timestamp pollReturnTime_;
// 具体的epoll ,实现上可是用poll的,因此是一个继承体系
boost::scoped_ptr<Poller> poller_;
// 计时器队列.
boost::scoped_ptr<TimerQueue> timerQueue_;
// eventfd,是eventloop主动唤醒epoll的fd
int wakeupFd_;
// 是分装eventfd的channel
boost::scoped_ptr<Channel> wakeupChannel_;
boost::any context_;
// 这个容器被epoll 填充。装了所有已就绪的channel
ChannelList activeChannels_;
// 当前正在处理的 channel
Channel *currentActiveChannel_;
mutable MutexLock mutex_;
// 添加到eventloop 中需要执行的函数对象,就在这里。
std::vector<Functor> pendingFunctors_;
};
}
}
#endif // MUDUO_NET_EVENTLOOP_H
实现
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/Channel.h>
#include <muduo/net/Poller.h>
#include <muduo/net/SocketsOps.h>
#include <muduo/net/TimerQueue.h>
#include <boost/bind.hpp>
#include <signal.h>
#include <sys/eventfd.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
// 当定义一个命名空间时,可以忽略这个命名空间的名称:
// 编译器在内部会为这个命名空间生成一个唯一的名字,而且还会为这个匿名的命名空间生成一条using指令。
// 命名空间都是具有external 连接属性的,只是匿名的命名空间产生的__UNIQUE_NAME__在别的文件中无法得到,这个唯一的名字是不可见的.
namespace
{
// 使用了 __thread 每个线程保存自己的eventloop指针
__thread EventLoop *t_loopInThisThread = 0;
// epoll 的超时时间
const int kPollTimeMs = 10000;
// 是eventloop 的eventfd ,用来唤醒 epoll
// 唤醒的时机在前面说了
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
LOG_SYSERR << "Failed in eventfd";
// 如果创建失败,那么直接结束程序,没有必要执行下去了
abort();
}
return evtfd;
}
// 这是两条预处理执行令,表示发生 -Wold-style-cast 的时候忽略
// 然后到下一条.
#pragma GCC diagnostic ignored "-Wold-style-cast"
// 这个对象只是用一次。
// 是为了防止管道破裂的。因此在构造的时候,直接将SIGPIPE设置为忽略
// 并且初始化的时候就执行了
class IgnoreSigPipe
{
public:
IgnoreSigPipe()
{
::signal(SIGPIPE, SIG_IGN);
// LOG_TRACE << "Ignore SIGPIPE";
}
};
// 到这里的时候开启.
#pragma GCC diagnostic error "-Wold-style-cast"
IgnoreSigPipe initObj;
}
// 返回本线程的 eventloop 对象
EventLoop *EventLoop::getEventLoopOfCurrentThread()
{
return t_loopInThisThread;
}
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
iteration_(0),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)), // 使用一个工厂模式返回poll
timerQueue_(new TimerQueue(this)), // 和计时器有关的
wakeupFd_(createEventfd()), // 创建eventloop 主动唤醒 epoll 的channel
wakeupChannel_(new Channel(this, wakeupFd_)),
currentActiveChannel_(NULL)
{
LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
// 如果当前线程已经有一个eventloop 了那么就退出啊。 LOG_FATAL 应该会直接停了。
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}
wakeupChannel_->setReadCallback(
boost::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
wakeupChannel_->enableReading();
}
// 析构函数
EventLoop::~EventLoop()
{
LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
<< " destructs in thread " << CurrentThread::tid();
// 先将 wakechannel 注销掉,然后直接关闭就行了。
// 因为也没什么需要回收的资源,都是使用指针指针.
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = NULL;
}
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;
LOG_TRACE << "EventLoop " << this << " start looping";
// 使用 quit 判断是否应该继续循环
while (!quit_)
{
// activeChannels_ 用来填充就绪channel的
activeChannels_.clear();
// 传入了 activeChannels_ 的地址,由poll 填充已就绪的channel 到activeChannels_中
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
// iteration_ 是poller 被唤醒的次数
++iteration_;
// 根据日志级别,决定是否打印出所有的channel 的事件
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// 接下来遍历所有的 activeChannels_ 中的元素,去执行它们的
// handleEvent() 实际的执行channel 的事件处理函数
// 这里面可能有 TimerQueue 的channel ,用来处理已经超时的计时器
// 这些计时器,被 TimerQueue 通过 add 添加函数对象的形式添加到了队列中?或是直接执行了。
// 想一下,应该是直接执行了,因为是在本线程中。
// 嗯在for中直接执行了每一个超时的计时器
eventHandling_ = true;
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
// 这里保存一次 currentActiveChannel_ 的意义是
// 只是在删除额时候判断,删除的是否是这个channel、
// 还有一个没看懂
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
// 执行add 到eventloop 的函数对象。
doPendingFunctors();
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
// 退出。
void EventLoop::quit()
{
quit_ = true;
// 推出的时候,直接wakeup 唤醒 epoll
if (!isInLoopThread())
{
wakeup();
}
}
void EventLoop::runInLoop(const Functor &cb)
{
// 添加函数对象,如果是在本线程中就直接执行,否则就添加到队列中
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(cb);
}
}
// 添加到队列中
void EventLoop::queueInLoop(const Functor &cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
}
// 如果不是在本队列中,或是在执行添加的函数对象
// 那么本次添加的就已经不能在本次epoll唤醒的时候调用了
// 因此这里直接wakeup。那么epoll 的wait直接被唤醒了
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
size_t EventLoop::queueSize() const
{
MutexLockGuard lock(mutex_);
return pendingFunctors_.size();
}
// 下面三个函数用来添加计时器的
TimerId EventLoop::runAt(const Timestamp &time, const TimerCallback &cb)
{
return timerQueue_->addTimer(cb, time, 0.0);
}
TimerId EventLoop::runAfter(double delay, const TimerCallback &cb)
{
Timestamp time(addTime(Timestamp::now(), delay));
return runAt(time, cb);
}
TimerId EventLoop::runEvery(double interval, const TimerCallback &cb)
{
Timestamp time(addTime(Timestamp::now(), interval));
return timerQueue_->addTimer(cb, time, interval);
}
// 取消计时器。
void EventLoop::cancel(TimerId timerId)
{
return timerQueue_->cancel(timerId);
}
// channel 调用的updatechannel,被转发了
void EventLoop::updateChannel(Channel *channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}
// 同上
void EventLoop::removeChannel(Channel *channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
// 这里使用了 eventHandling_ ,如果是正在处理channel 的事件函数,
// 此时又要移除channel,那么要溢出的channel 必须是本channel 或是不在 activechannels中的channel
// 否则报错.这里为什么要这样处理? 一般来说都是本channel移除自己.
if (eventHandling_)
{
assert(currentActiveChannel_ == channel ||
std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
}
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel *channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
return poller_->hasChannel(channel);
}
// 当不是在eventloop 的线程执行 eventloop 的相关操作,就会调用该函数.end
// 打印日志,然后退出
void EventLoop::abortNotInLoopThread()
{
LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
<< " was created in threadId_ = " << threadId_
<< ", current thread id = " << CurrentThread::tid();
}
// 主动唤醒的函数,向eventfd 中写入数据。
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
// 主动唤醒epoll 后,需要处理eventfd 的读事件,因此读出来就行了
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
}
}
// 执行添加到eventloop 的函数对象。
// 这里没有使用拷贝,而是通过交换 vector 的方式。
// 更加效率
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}
// 打印所有就绪channel 事件的函数
void EventLoop::printActiveChannels() const
{
for (ChannelList::const_iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
const Channel *ch = *it;
LOG_TRACE << "{" << ch->reventsToString() << "} ";
}
}
2 什么时候需要runinloop()
eventloop所在线程是实际进行IO操作的线程。
虽然,eventloop 被绑定在每一个channel,但是实际上channel 又被关联在各种其他的类中,比如TcpConnection 中,当TcpConnection在别的线程中,需要发送数据的时候,那么会调用send,此时TcpConnection判断运行函数的线程是否是关联的channel绑定的evenloop,如果是,那么直接执行,如果不是,那么runinloop(),通过bind的方式传入。因为send需要传入数据,而在bind中绑定数据即可。
对于这点,后面再添加。
因为涉及到一个什么时候析构的问题。