class Epoller; class Event; class TimerQueue; class EventLoop final { public: EventLoop(const EventLoop&) = delete; EventLoop& operator=(const EventLoop&) = delete; EventLoop(); ~EventLoop(); bool isInThreadLoop(); void assertInThreadLoop(); void loop(); void stop(); void updateEvent(Event* event); void removeEvent(Event* event); typedef std::function<void()> Functor; void runInLoop(const Functor& cb); void queueInLoop(const Functor& cb); typedef std::function<void()> TimerCallback; void addSingleTimer(const TimerCallback& cb, uint32_t interval); private: void _handleRead(); void _wakeup(); void _doPendingFunctors(); int _wakeupFd; std::unique_ptr<Event> _wakeupEvent; bool _loop; std::unique_ptr<Epoller> _epoller; std::unique_ptr<TimerQueue> _timerQueue; std::vector<Functor> _functorLists; pid_t _tid; Base::Mutex _mutex; };说明几点:
namespace { const int loopSeconds = 10; int createEventFd() { int fd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (fd < 0) { LOG_SYSERR << "eventfd system error"; } return fd; } } EventLoop::EventLoop(): _wakeupFd(createEventFd()), _wakeupEvent(new Event(_wakeupFd, this)), _loop(false), _epoller(new Epoller(this)), _timerQueue(new TimerQueue(this)), _tid(CurrentThread::tid()), _mutex() { assert(_wakeupFd >= 0); assert(!_loop); assertInThreadLoop(); LOG_TRACE << "eventfd fd: " << _wakeupFd; LOG_INFO << "Thread tid: " << _tid << " run this EventLoop: " << this; _wakeupEvent->setReadCallback(std::bind(&EventLoop::_handleRead, this)); _wakeupEvent->enableReading(); }
(1)_wakeupEvent->setReadCallback(std::bind(&EventLoop::_handleRead, this));设置读事件的处理回调函数,仅仅会读取_wakeup()写入的数值;
EventLoop::~EventLoop() { if (_wakeupFd >= 0) ::close(_wakeupFd); }
void EventLoop::updateEvent(Event* event) { assertInThreadLoop(); _epoller->updateEvent(event); } void EventLoop::removeEvent(Event* event) //only invoke by _handclose { assertInThreadLoop(); _epoller->removeEvent(event); }
void EventLoop::runInLoop(const Functor& cb) //another thread can invoke { if (isInThreadLoop()) { if (cb) cb(); } else { queueInLoop(cb); } } void EventLoop::queueInLoop(const Functor& cb) //another thread can invoke { { MutexLockGuard lock(_mutex); _functorLists.push_back(cb); } if (!isInThreadLoop()) { _wakeup(); } } void EventLoop::_wakeup() //other thread wake up loop thread { uint64_t value = 1; auto s = ::write(_wakeupFd, &value, sizeof(uint64_t)); if (s != sizeof(uint64_t)) LOG_SYSERR << "write system error"; } void EventLoop::_handleRead() { assertInThreadLoop(); uint64_t value; auto s = ::read(_wakeupFd, &value, sizeof(uint64_t)); //when value is non-zero, when write more times, it only read one time by add the read value if (s != sizeof(uint64_t)) LOG_SYSERR << "read system error"; LOG_TRACE << "eventfd read times: " << value; }
(1)用户的Functor主要通过queueInLoop和runInLoop来提交;当提交的线程与IO线程不是同一个线程时,为了保持线程安全,runInLoop将会间接调用queueInLoop;_wakeupEvent用来queueInLoop提交Functor后,若不是当前IO线程的提交,将使用 _wakeup()来唤醒当前的IO线程来,及时处理Functor;
void EventLoop::_doPendingFunctors() { assertInThreadLoop(); std::vector<Functor> _functors; { MutexLockGuard lock(_mutex); _functors.swap( _functorLists); } for (auto& cb : _functors) { if (cb) cb(); } }
bool EventLoop::isInThreadLoop() { return _tid == CurrentThread::tid(); } void EventLoop::assertInThreadLoop() { //LOG_TRACE << "Current tid: " << CurrentThread::tid() << " , loop tid: " << _tid; assert(isInThreadLoop()); } void EventLoop::loop() //only invoke by loop thread { assertInThreadLoop(); assert(!_loop); _loop = true; while (_loop) { int seconds = _timerQueue->minUpdateSeconds() - TimeStamp::now().secondsSinceEpoch(); if (seconds <= 0) { seconds = loopSeconds; } _epoller->pollAndHandleEvent(seconds); _doPendingFunctors(); } } void EventLoop::stop() { _loop = false; }说明几点:
(1)loop函数中,使用 _epoller->pollAndHandleEvent(seconds)来处理epoll监听的事件,使用 _epoller->pollAndHandleEvent(seconds),其中seconds为需要阻塞的时间,一般是_timerQueue到期的第一个Timer的时间(绝对时间)减去当前的时间,但是如果_timerQueue没有Timer,那么seconds为周期性的时间loopSeconds
void EventLoop::addSingleTimer(const TimerCallback& cb, uint32_t interval) { _timerQueue->addTimer(cb, interval, false); }
class EventLoop; class EventLoopPool final { public: EventLoopPool(const EventLoopPool&) = delete; EventLoopPool& operator=(const EventLoopPool&) = delete; explicit EventLoopPool(size_t loopNums); ~EventLoopPool(); void start(); void stop(); EventLoop* getNextLoop(); private: void _run(); size_t _loopNums; bool _running; size_t _curIndex; std::vector<EventLoop*> _loops; std::vector<std::shared_ptr<Base::Thread>> _threads; Base::Mutex _mutex; Base::CountDownLatch _countDownLatch; };说明几点:
EventLoopPool::EventLoopPool(size_t loopNums) : _loopNums(loopNums), _running(false), _curIndex(0), _mutex(), _countDownLatch(loopNums) { assert(_loopNums > 0); _loops.reserve(loopNums); _threads.reserve(loopNums); } EventLoopPool::~EventLoopPool() { if (_running) stop(); }
void EventLoopPool::start() { assert(!_running); _running = true; for (size_t i = 0; i < _loopNums; ++i) { std::shared_ptr<Thread> thread(new Thread(std::bind(&EventLoopPool::_run, this))); //impotant _threads.push_back(thread); thread->start(); } LOG_TRACE << "wait , Current tid: " << CurrentThread::tid(); _countDownLatch.wait(); } void EventLoopPool::stop() { assert(_running); _running = false; for (auto& loop : _loops) { loop->stop(); } for (auto& thread : _threads) { thread->join(); } _loops.clear(); _threads.clear(); }
EventLoop* EventLoopPool::getNextLoop() { _curIndex %= _loopNums; EventLoop* loop = _loops[_curIndex]; ++_curIndex; return loop; }
void EventLoopPool::_run() { EventLoop loop; //一个loop std::function<void()> fun = std::bind(&EventLoop::loop, &loop); { MutexLockGuard lock(_mutex); _loops.push_back(&loop); //放入到loop缓冲池中 } LOG_TRACE << "countDown , Current tid: " << CurrentThread::tid(); _countDownLatch.countDown(); //loop should in the _loops, otherwise getNextLoop will have error if (fun) { while (_running) fun(); } }