码迷,mamicode.com
首页 > 其他好文 > 详细

【网络组件】事件循环

时间:2015-03-31 18:14:35      阅读:140      评论:0      收藏:0      [点我收藏+]

标签:

   本节研究事件循环EventLoop以及EventLoopPool,并给出C++实现;


线程模型

本网络组件的线程模型是一个master线程和多个IO线程模型;master线程负责accept连接,然后会将该连接派发到IO线程,可以按照轮转的方式来派发到各IO线程;每一个IO线程有一个EventLoop(事件循环),一个TCP连接必须归这个EventLoop来管理,所有的IO都会转移到这个线程,其他线程是无法读写该TCP连接信息的,也是说TCP连接的fd也只能由这个IO线程去读写;其中EventLoop是要求线程安全的;具体示意图如下:

技术分享



事件循环

(1)每一个IO线程一个EventLoop对象,并拥有它;

(2)在loop循环中,将会执行_epoller->pollAndHandleEvent(seconds)来处理到期的事件,以及会执行_doPendingFunctors()来处理提交给本IO线程的回调函数Functor,这些回调函数不应该阻塞当前IO线程,否则会造成当前IO线程无法及时处理到来的IO事件;


事件循环时序图如下:

技术分享



EventLoop

EventLoop声明

class Epoller;
class Event;
class TimerQueue;

class EventLoop final
{
public:
  EventLoop(const EventLoop&) = delete;
  EventLoop& operator=(const EventLoop&) = delete;

  EventLoop();

  ~EventLoop();
  bool isInThreadLoop();

  void assertInThreadLoop();

  void loop();
  
  void stop();

  void updateEvent(Event* event);

  void removeEvent(Event* event);

  typedef std::function<void()> Functor;
  void runInLoop(const Functor& cb);

  void queueInLoop(const Functor& cb);

  typedef std::function<void()> TimerCallback;
  void addSingleTimer(const TimerCallback& cb, uint32_t interval);

private:
  void _handleRead();
  void _wakeup();
  void _doPendingFunctors();

  int _wakeupFd;

  std::unique_ptr<Event> _wakeupEvent;

  bool _loop;

  std::unique_ptr<Epoller> _epoller;
  std::unique_ptr<TimerQueue> _timerQueue;

  std::vector<Functor> _functorLists;

  pid_t _tid;
  Base::Mutex _mutex;
};
说明几点:

(1)updateEvent,removeEvent来使用epoller来更新或删除相关事件;

(2)_doPendingFunctors为用户提交的需要处理的回调函数Functor;用户的Functor主要通过queueInLoop和runInLoop来提交;当提交的线程与IO线程不是同一个线程时,为了保持线程安全,runInLoop将会间接调用queueInLoop;_wakeupEvent用来queueInLoop提交Functor后,来唤醒当前的IO线程来及时处理Functor;

(3)_loop变量控制loop循环的退出,通过stop()函数,可以让loop循环退出;stop()主要由其他线程调用;

(4)_epoller,_timerQueue,_wakeupEvent都是EventLoop的成员,生命周期由其控制;


构造函数

namespace
{
const int loopSeconds = 10;

int createEventFd()
{
  int fd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
  if (fd < 0)
    {
      LOG_SYSERR << "eventfd system error";
    }

  return fd;
}

}

EventLoop::EventLoop():
    _wakeupFd(createEventFd()),
    _wakeupEvent(new Event(_wakeupFd, this)),
    _loop(false),
    _epoller(new Epoller(this)),
    _timerQueue(new TimerQueue(this)),
    _tid(CurrentThread::tid()),
    _mutex()
{
  assert(_wakeupFd >= 0);
  assert(!_loop);
  assertInThreadLoop();
  
  LOG_TRACE << "eventfd fd: " << _wakeupFd;
  LOG_INFO << "Thread tid: " << _tid << " run this EventLoop: " << this;
  
  _wakeupEvent->setReadCallback(std::bind(&EventLoop::_handleRead, this));
  _wakeupEvent->enableReading();
}

说明几点:

(1)_wakeupEvent->setReadCallback(std::bind(&EventLoop::_handleRead, this));设置读事件的处理回调函数,仅仅会读取_wakeup()写入的数值;

(2)_wakeupEvent->enableReading()会将_wakeupFd加入到本epoll的监听事件中去;

(3)assertInThreadLoop();保证执行的线程为拥有EventLoop的IO线程;


析构函数

EventLoop::~EventLoop()
{
  if (_wakeupFd >= 0)
    ::close(_wakeupFd);
}

Event操作

void EventLoop::updateEvent(Event* event)
{
  assertInThreadLoop();
  _epoller->updateEvent(event);
}

void EventLoop::removeEvent(Event* event)  //only invoke by _handclose
{
  assertInThreadLoop();
  _epoller->removeEvent(event);
}


提交Functor

void EventLoop::runInLoop(const Functor& cb)    //another thread can invoke
{
  if (isInThreadLoop())
    {
      if (cb)
        cb();
    }
  else
    {
      queueInLoop(cb);
    }
}

void EventLoop::queueInLoop(const Functor& cb)  //another thread can invoke
{
  {
    MutexLockGuard lock(_mutex);
    _functorLists.push_back(cb);
  }

  if (!isInThreadLoop())
    {
      _wakeup();
    }
}

void EventLoop::_wakeup()         //other thread wake up loop thread
{ 
   uint64_t value = 1;
   auto s = ::write(_wakeupFd, &value, sizeof(uint64_t));
   
   if (s != sizeof(uint64_t))
      LOG_SYSERR << "write system error";
}

void EventLoop::_handleRead()
{
  assertInThreadLoop();
  
  uint64_t value;
  auto s = ::read(_wakeupFd, &value, sizeof(uint64_t));   
  //when value is non-zero, when write more times, it only read one time by add the read value
  
 if (s != sizeof(uint64_t))
     LOG_SYSERR << "read system error"; 
     
  LOG_TRACE << "eventfd read times: " << value;
}

说明几点:

(1)用户的Functor主要通过queueInLoop和runInLoop来提交;当提交的线程与IO线程不是同一个线程时,为了保持线程安全,runInLoop将会间接调用queueInLoop;_wakeupEvent用来queueInLoop提交Functor后,若不是当前IO线程的提交,将使用 _wakeup()来唤醒当前的IO线程来,及时处理Functor;

(2)_handleRead()为_wakeupEvent读事件的处理回调函数;

(3)queueInLoop要保证线程安全,因此加入到_functorLists,使用互斥量来保护临界区;


执行Functor

void EventLoop::_doPendingFunctors()
{
  assertInThreadLoop();
  
  std::vector<Functor> _functors;
  {
    MutexLockGuard lock(_mutex);
    _functors.swap( _functorLists);
  }

  for (auto& cb : _functors)
    {
      if (cb)
        cb();
    }
}

说明几点:

(1)使用_functors栈对象来和当前的_functorLists交换,减小操作_functorLists的时间,这样不会过多的阻塞其他线程向_functorLists提交Functor回调


loop函数

bool EventLoop::isInThreadLoop()
{
  return _tid == CurrentThread::tid();
}

void EventLoop::assertInThreadLoop()
{
  //LOG_TRACE << "Current tid: " << CurrentThread::tid() << " , loop tid: " << _tid;
  assert(isInThreadLoop());
}

void EventLoop::loop()    //only invoke by loop thread 
{
  assertInThreadLoop();
  
  assert(!_loop);
  _loop = true;
  while (_loop)
    {
      int seconds = _timerQueue->minUpdateSeconds() - TimeStamp::now().secondsSinceEpoch();
      if (seconds <= 0) {
        seconds = loopSeconds;
      }
      
      _epoller->pollAndHandleEvent(seconds);

      _doPendingFunctors();
    }
}

void EventLoop::stop()
{
  _loop = false;
}
说明几点:

(1)loop函数中,使用 _epoller->pollAndHandleEvent(seconds)来处理epoll监听的事件,使用 _epoller->pollAndHandleEvent(seconds),其中seconds为需要阻塞的时间,一般是_timerQueue到期的第一个Timer的时间(绝对时间)减去当前的时间,但是如果_timerQueue没有Timer,那么seconds为周期性的时间loopSeconds

(2)_doPendingFunctors()为执行用户提交的Functor;


提交非周期性定时函数

void EventLoop::addSingleTimer(const TimerCallback& cb, uint32_t interval)
{
  _timerQueue->addTimer(cb, interval, false);
}


EventLoopPool

每一个IO线程都拥有一个EventLoop对象,当主线程派发socket连接时,将使用轮转法从EventLoopPool获得某IO线程的EventLoop对象,该socket连接后面事件的监听将会由该IO线程负责;


EventLoopPool声明

class EventLoop;

class EventLoopPool final
{
public:
  EventLoopPool(const EventLoopPool&) = delete;
  EventLoopPool& operator=(const EventLoopPool&) = delete;

  explicit EventLoopPool(size_t loopNums);

  ~EventLoopPool();

  void start();

  void stop();

  EventLoop* getNextLoop();

private:
  void _run();

  size_t _loopNums;
  bool _running;

  size_t _curIndex;

  std::vector<EventLoop*> _loops;
  std::vector<std::shared_ptr<Base::Thread>> _threads;

  Base::Mutex _mutex;
  Base::CountDownLatch _countDownLatch;
};
说明几点:

(1)master线程调用getNextLoop()可按照轮转法来获得EventLoop对象;

(2)CountDownLatch是倒计时,当所有的IO线程初始化好EventLoop对象后,master线程才会继续执行,防止master线程已经需要派发连接socket,而getNextLoop()时,对应的IO线程尚初始化好EventLoop对象(尚未放入_loops)而造成错误;


构造与析构函数

EventLoopPool::EventLoopPool(size_t loopNums) :
    _loopNums(loopNums),
    _running(false),
    _curIndex(0),
    _mutex(),
    _countDownLatch(loopNums)
{
  assert(_loopNums > 0);
  _loops.reserve(loopNums);
  _threads.reserve(loopNums);
}

EventLoopPool::~EventLoopPool()
{
  if (_running)
    stop();
}

启动和停止IO线程

void EventLoopPool::start()
{
  assert(!_running);

  _running = true;
  for (size_t i = 0; i < _loopNums; ++i)
    {
      std::shared_ptr<Thread> thread(new Thread(std::bind(&EventLoopPool::_run, this)));    //impotant
      _threads.push_back(thread);

      thread->start();
    }

  LOG_TRACE << "wait , Current tid: " << CurrentThread::tid();
  _countDownLatch.wait();
}

void  EventLoopPool::stop()
{
  assert(_running);
  _running = false;

for (auto& loop : _loops)
    {
      loop->stop();
    }
    
for (auto& thread : _threads)
    {
      thread->join();
    }

  _loops.clear();
  _threads.clear();
}

说明几点:

(1)在master线程start后,产生多个IO线程后,将会执行_countDownLatch.wait()来等待所有的IO线程初始化好EventLoop对象,各个IO线程将会调用_countDownLatch.countDown(); 

(2)在stop函数中,首先会停止各个EventLoop继续loop,此后这些线程将会退出,此时使用thread->join()来依次等待各个IO线程结束;


轮转法派发EventLoop

EventLoop* EventLoopPool::getNextLoop()
{
  _curIndex %= _loopNums;

  EventLoop* loop = _loops[_curIndex];
  ++_curIndex;

  return loop;
}


生成EventLoop对象

void EventLoopPool::_run()
{
  EventLoop loop;     //一个loop
  std::function<void()> fun = std::bind(&EventLoop::loop, &loop);
  {
    MutexLockGuard lock(_mutex);
    _loops.push_back(&loop);        //放入到loop缓冲池中
  }

  LOG_TRACE << "countDown , Current tid: " << CurrentThread::tid();
  _countDownLatch.countDown();    //loop should in the _loops, otherwise getNextLoop will have error

  if (fun) {
    while (_running)
       fun();
  }
}

说明几点:

(1)EventLoop为栈对象,当加入到_loops后说明EventLoop已经完整初始化,此时调用_countDownLatch.countDown(),倒计时计数器将会减1;


【网络组件】事件循环

标签:

原文地址:http://blog.csdn.net/skyuppour/article/details/44781029

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!