码迷,mamicode.com
首页 > 其他好文 > 详细

muduo源码-EventLoop.h

时间:2018-03-27 14:37:44      阅读:178      评论:0      收藏:0      [点我收藏+]

标签:add   handle   lock   ror   context   on()   print   构造   llb   

0 设计

EventLoop 类,主要是用来管理一个进程中的channel、计时器、以及epoll 的类。

每个类只有一个,(因为如果有两个,那么一个eventloop 在loop()的时候,另一个eventloop 得不到执行)。

每一个channel或者说是每一个文件描述符都必须属于eventloop,换句话说,每一个文件描述符属于一个线程。(这是必然的,因为eventloop在代表了一个线程)

eventloop 类的主要工作,就是wait epoll,然后执行channel 的事件处理函数。

 

1 源码

#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H

#include <vector>

#include <boost/any.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>

#include <muduo/base/Mutex.h>
#include <muduo/base/CurrentThread.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/TimerId.h>

namespace muduo
{
namespace net
{

class Channel;
class Poller;
class TimerQueue;

// 事件处理类
class EventLoop : boost::noncopyable
{
  public:
    typedef boost::function<void()> Functor;

    EventLoop();
    ~EventLoop();

    // 里面一个while循环,直接开始工作.
    void loop();

    // 别的线程调用eventloop 的quit,设置 loop() 中循环的判断条件
    // 从而在下一次循环的时候停止
    void quit();

    // 返回上一次 epoll 的返回时间
    Timestamp pollReturnTime() const { return pollReturnTime_; }

    int64_t iteration() const { return iteration_; }

    // 添加函数对象到eventloop线程中执行。
    void runInLoop(const Functor &cb);

    // 当调用函数添加函数对象到eventloop中执行的时候
    // 需要判断是否是在eventloop 线程中.如果是在当前eventloop 的线程中
    // 那么直接执行,否则,就添加到队列中,然后唤醒epoll
    void queueInLoop(const Functor &cb);

    // 返回当前 等待执行函数对象的队列长度
    size_t queueSize() const;

    // 下面三个函数,是用来添加计时器对象的
    TimerId runAt(const Timestamp &time, const TimerCallback &cb);

    TimerId runAfter(double delay, const TimerCallback &cb);

    TimerId runEvery(double interval, const TimerCallback &cb);

    void cancel(TimerId timerId);

    // wakeup函数用来显示的唤醒 epoll ,主要是在 eventloop执行了quit
    // 还有添加了函数对象到队列中的时候 主动的唤醒 epoll
    // 能唤醒的原因在于,eventloop 对象持有一个 eventfd,注册在了epoll中
    // 然后需要唤醒的时候,直接 eventfd 中写数据即可.
    void wakeup();

    // 这两个函数被channel 的update 和remove 调用
    // 然后,这俩函数由去调用epoll 的对应接口
    void updateChannel(Channel *channel);
    void removeChannel(Channel *channel);

    // 用来判断一个 epoll 是否持有 这个 channel
    // 调用的是 epoll 的对象
    bool hasChannel(Channel *channel);

    // 用来判断 绑定在eventloop 中的channel 的运行是否在 本eventloop 线程中
    // 或是其他操作,需要属于本eventloop 线程
    void assertInLoopThread()
    {
        if (!isInLoopThread())
        {
            abortNotInLoopThread();
        }
    }

    // 被上面的函数调用,就是简单的判断下初始化eventloop 的线程是不是当前运行函数的线程
    bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

    // eventHandling() 用来判断是否是在已就绪channel的事件处理函数。
    // 因为channel的处理只能是在 eventloop 线程中,也就是说在 loop 处理的时候
    // 如果这个时候 remove channel 那么要删除的channel 就是当前正在处理channel
    bool eventHandling() const { return eventHandling_; }

    // 没被用到,貌似
    void setContext(const boost::any &context)
    {
        context_ = context;
    }

    const boost::any &getContext() const
    {
        return context_;
    }

    boost::any *getMutableContext()
    {
        return &context_;
    }

    static EventLoop *getEventLoopOfCurrentThread();

  private:
    void abortNotInLoopThread();
    // 这个函数是在,eventloop 主动唤醒 epoll 的时候处理eventfd 读事件的函数
    void handleRead(); // waked up
    // 执行队列中的函数对象
    void doPendingFunctors();
    // 用来打印本次已就绪channel 发生的对应的时间,也就是去调用了channel 的tostring
    void printActiveChannels() const; // DEBUG

    typedef std::vector<Channel *> ChannelList;

    bool looping_;
    bool quit_;
    bool eventHandling_;
    bool callingPendingFunctors_;
    // iteration_ 是poller 被唤醒的次数
    int64_t iteration_;
    // 初始化时候,本eventloop 所在的线程
    const pid_t threadId_;
    // 上一次epoll被唤醒的时间戳
    Timestamp pollReturnTime_;
    // 具体的epoll ,实现上可是用poll的,因此是一个继承体系
    boost::scoped_ptr<Poller> poller_;
    // 计时器队列.
    boost::scoped_ptr<TimerQueue> timerQueue_;
    // eventfd,是eventloop主动唤醒epoll的fd
    int wakeupFd_;
    // 是分装eventfd的channel
    boost::scoped_ptr<Channel> wakeupChannel_;
    boost::any context_;

    // 这个容器被epoll 填充。装了所有已就绪的channel
    ChannelList activeChannels_;
    // 当前正在处理的 channel
    Channel *currentActiveChannel_;

    mutable MutexLock mutex_;
    // 添加到eventloop 中需要执行的函数对象,就在这里。
    std::vector<Functor> pendingFunctors_;
};
}
}
#endif // MUDUO_NET_EVENTLOOP_H

 

实现

#include <muduo/net/EventLoop.h>

#include <muduo/base/Logging.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/Channel.h>
#include <muduo/net/Poller.h>
#include <muduo/net/SocketsOps.h>
#include <muduo/net/TimerQueue.h>

#include <boost/bind.hpp>

#include <signal.h>
#include <sys/eventfd.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

// 当定义一个命名空间时,可以忽略这个命名空间的名称:
//  编译器在内部会为这个命名空间生成一个唯一的名字,而且还会为这个匿名的命名空间生成一条using指令。
// 命名空间都是具有external 连接属性的,只是匿名的命名空间产生的__UNIQUE_NAME__在别的文件中无法得到,这个唯一的名字是不可见的.
namespace
{
// 使用了 __thread 每个线程保存自己的eventloop指针
__thread EventLoop *t_loopInThisThread = 0;

// epoll 的超时时间
const int kPollTimeMs = 10000;

// 是eventloop 的eventfd ,用来唤醒 epoll
// 唤醒的时机在前面说了
int createEventfd()
{
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0)
    {
        LOG_SYSERR << "Failed in eventfd";
        // 如果创建失败,那么直接结束程序,没有必要执行下去了
        abort();
    }
    return evtfd;
}

// 这是两条预处理执行令,表示发生 -Wold-style-cast 的时候忽略
// 然后到下一条.
#pragma GCC diagnostic ignored "-Wold-style-cast"

// 这个对象只是用一次。
// 是为了防止管道破裂的。因此在构造的时候,直接将SIGPIPE设置为忽略
// 并且初始化的时候就执行了
class IgnoreSigPipe
{
  public:
    IgnoreSigPipe()
    {
        ::signal(SIGPIPE, SIG_IGN);
        // LOG_TRACE << "Ignore SIGPIPE";
    }
};
// 到这里的时候开启.
#pragma GCC diagnostic error "-Wold-style-cast"

IgnoreSigPipe initObj;
}

// 返回本线程的 eventloop 对象
EventLoop *EventLoop::getEventLoopOfCurrentThread()
{
    return t_loopInThisThread;
}

EventLoop::EventLoop()
    : looping_(false),
      quit_(false),
      eventHandling_(false),
      callingPendingFunctors_(false),
      iteration_(0),
      threadId_(CurrentThread::tid()),
      poller_(Poller::newDefaultPoller(this)), // 使用一个工厂模式返回poll
      timerQueue_(new TimerQueue(this)),       // 和计时器有关的
      wakeupFd_(createEventfd()),              // 创建eventloop 主动唤醒 epoll 的channel
      wakeupChannel_(new Channel(this, wakeupFd_)),
      currentActiveChannel_(NULL)
{
    LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
    //   如果当前线程已经有一个eventloop 了那么就退出啊。 LOG_FATAL 应该会直接停了。
    if (t_loopInThisThread)
    {
        LOG_FATAL << "Another EventLoop " << t_loopInThisThread
                  << " exists in this thread " << threadId_;
    }
    else
    {
        t_loopInThisThread = this;
    }
    wakeupChannel_->setReadCallback(
        boost::bind(&EventLoop::handleRead, this));
    // we are always reading the wakeupfd
    wakeupChannel_->enableReading();
}

// 析构函数
EventLoop::~EventLoop()
{
    LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
              << " destructs in thread " << CurrentThread::tid();
    // 先将 wakechannel 注销掉,然后直接关闭就行了。
    // 因为也没什么需要回收的资源,都是使用指针指针.
    wakeupChannel_->disableAll();
    wakeupChannel_->remove();
    ::close(wakeupFd_);
    t_loopInThisThread = NULL;
}

void EventLoop::loop()
{
    assert(!looping_);
    assertInLoopThread();
    looping_ = true;
    quit_ = false; 
    LOG_TRACE << "EventLoop " << this << " start looping";

    // 使用 quit 判断是否应该继续循环
    while (!quit_)
    {
        // activeChannels_ 用来填充就绪channel的
        activeChannels_.clear();
        // 传入了 activeChannels_ 的地址,由poll 填充已就绪的channel 到activeChannels_中
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        // iteration_ 是poller 被唤醒的次数
        ++iteration_;

        // 根据日志级别,决定是否打印出所有的channel 的事件
        if (Logger::logLevel() <= Logger::TRACE)
        {
            printActiveChannels();
        }

        // 接下来遍历所有的 activeChannels_ 中的元素,去执行它们的
        // handleEvent() 实际的执行channel 的事件处理函数
        // 这里面可能有 TimerQueue 的channel ,用来处理已经超时的计时器
        // 这些计时器,被 TimerQueue 通过 add 添加函数对象的形式添加到了队列中?或是直接执行了。
        // 想一下,应该是直接执行了,因为是在本线程中。
        // 嗯在for中直接执行了每一个超时的计时器
        eventHandling_ = true;
        for (ChannelList::iterator it = activeChannels_.begin();
             it != activeChannels_.end(); ++it)
        {
            // 这里保存一次  currentActiveChannel_ 的意义是
            // 只是在删除额时候判断,删除的是否是这个channel、
            // 还有一个没看懂
            currentActiveChannel_ = *it;
            currentActiveChannel_->handleEvent(pollReturnTime_);
        }
        currentActiveChannel_ = NULL;
        eventHandling_ = false;

        // 执行add 到eventloop 的函数对象。
        doPendingFunctors();
    }

    LOG_TRACE << "EventLoop " << this << " stop looping";
    looping_ = false;
}

// 退出。
void EventLoop::quit()
{
    quit_ = true;

    // 推出的时候,直接wakeup 唤醒 epoll
    if (!isInLoopThread())
    {
        wakeup();
    }
}

void EventLoop::runInLoop(const Functor &cb)
{
    // 添加函数对象,如果是在本线程中就直接执行,否则就添加到队列中
    if (isInLoopThread())
    {
        cb();
    }
    else
    {
        queueInLoop(cb);
    }
}

// 添加到队列中
void EventLoop::queueInLoop(const Functor &cb)
{
    {
        MutexLockGuard lock(mutex_);
        pendingFunctors_.push_back(cb);
    }

    // 如果不是在本队列中,或是在执行添加的函数对象
    // 那么本次添加的就已经不能在本次epoll唤醒的时候调用了
    // 因此这里直接wakeup。那么epoll 的wait直接被唤醒了
    if (!isInLoopThread() || callingPendingFunctors_)
    {
        wakeup();
    }
}

size_t EventLoop::queueSize() const
{
    MutexLockGuard lock(mutex_);
    return pendingFunctors_.size();
}
// 下面三个函数用来添加计时器的
TimerId EventLoop::runAt(const Timestamp &time, const TimerCallback &cb)
{
    return timerQueue_->addTimer(cb, time, 0.0);
}

TimerId EventLoop::runAfter(double delay, const TimerCallback &cb)
{
    Timestamp time(addTime(Timestamp::now(), delay));
    return runAt(time, cb);
}

TimerId EventLoop::runEvery(double interval, const TimerCallback &cb)
{
    Timestamp time(addTime(Timestamp::now(), interval));
    return timerQueue_->addTimer(cb, time, interval);
}

// 取消计时器。
void EventLoop::cancel(TimerId timerId)
{
    return timerQueue_->cancel(timerId);
}

// channel 调用的updatechannel,被转发了
void EventLoop::updateChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    poller_->updateChannel(channel);
}

// 同上
void EventLoop::removeChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    // 这里使用了 eventHandling_ ,如果是正在处理channel 的事件函数,
    // 此时又要移除channel,那么要溢出的channel 必须是本channel 或是不在 activechannels中的channel
    // 否则报错.这里为什么要这样处理? 一般来说都是本channel移除自己.
    if (eventHandling_)
    {
        assert(currentActiveChannel_ == channel ||
               std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
    }
    poller_->removeChannel(channel);
}

bool EventLoop::hasChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    return poller_->hasChannel(channel);
}

// 当不是在eventloop 的线程执行 eventloop 的相关操作,就会调用该函数.end
// 打印日志,然后退出
void EventLoop::abortNotInLoopThread()
{
    LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
              << " was created in threadId_ = " << threadId_
              << ", current thread id = " << CurrentThread::tid();
}

// 主动唤醒的函数,向eventfd 中写入数据。
void EventLoop::wakeup()
{
    uint64_t one = 1;
    ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
    if (n != sizeof one)
    {
        LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
    }
}

// 主动唤醒epoll 后,需要处理eventfd 的读事件,因此读出来就行了
void EventLoop::handleRead()
{
    uint64_t one = 1;
    ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
    if (n != sizeof one)
    {
        LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
    }
}

// 执行添加到eventloop 的函数对象。
// 这里没有使用拷贝,而是通过交换 vector 的方式。
// 更加效率
void EventLoop::doPendingFunctors()
{
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
        MutexLockGuard lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for (size_t i = 0; i < functors.size(); ++i)
    {
        functors[i]();
    }
    callingPendingFunctors_ = false;
}

// 打印所有就绪channel 事件的函数
void EventLoop::printActiveChannels() const
{
    for (ChannelList::const_iterator it = activeChannels_.begin();
         it != activeChannels_.end(); ++it)
    {
        const Channel *ch = *it;
        LOG_TRACE << "{" << ch->reventsToString() << "} ";
    }
}

 

2 什么时候需要runinloop()

eventloop所在线程是实际进行IO操作的线程。

虽然,eventloop 被绑定在每一个channel,但是实际上channel 又被关联在各种其他的类中,比如TcpConnection 中,当TcpConnection在别的线程中,需要发送数据的时候,那么会调用send,此时TcpConnection判断运行函数的线程是否是关联的channel绑定的evenloop,如果是,那么直接执行,如果不是,那么runinloop(),通过bind的方式传入。因为send需要传入数据,而在bind中绑定数据即可。

对于这点,后面再添加。

因为涉及到一个什么时候析构的问题。

muduo源码-EventLoop.h

标签:add   handle   lock   ror   context   on()   print   构造   llb   

原文地址:https://www.cnblogs.com/perfy576/p/8656746.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!