码迷,mamicode.com
首页 > 其他好文 > 详细

muduo Library

时间:2015-08-11 07:04:49      阅读:223      评论:0      收藏:0      [点我收藏+]

标签:

muduo是由陈硕(http://www.cnblogs.com/Solstice)开发的一个Linux多线程网络库,采用了很多新的Linux特性(例如eventfdtimerfd)和GCC内置函数。其主要特点为:

  • 线程安全,支持多核多线程
  • 不考虑可移植性,不跨平台,只支持 Linux,不支持 Windows // 支持Windows有时候代价太大了
  • 在不增加复杂度的前提下可以支持 FreeBSD/Darwin,方便将来用 Mac 作为开发用机,但不为它做性能优化。也就是说 IO multiplexing 使用 poll epoll
  • 主要支持 x86-64,兼顾 IA32
  • 不支持 UDP,只支持 TCP
  • 不支持 IPv6,只支持 IPv4
  • 不考虑广域网应用,只考虑局域网 // 不会存在慢连接,所以即使是阻塞读也不会花去太长时间用在阻塞上面
  • 只支持一种使用模式:non-blocking IO + one event loop per thread,不考虑阻塞 IO
  • API 简单易用,只暴露具体类和标准库里的类,不使用 non-trivial templates,也不使用虚函数 // GP而非OO
  • 只满足常用需求的 90%,不面面俱到,必要的时候以 app 来适应 lib
  • 只做 library,不做成 framework
  • 争取全部代码在 5000 行以内(不含测试)
  • 以上条件都满足时,可以考虑搭配 Google Protocol Buffers RPC // RPC可以简化很多东西
  • Base Class
    • Atomic

      调用了如下GCC提供的原子操作内建函数:

      • __sync_lock_test_and_set
      • __sync_val_compare_and_swap
      • __sync_fetch_and_add
    • BlockingQueue

      线程安全的队列,内部实现为std::deque<T>

    • BoundedBlockingQueue

      BlockingQueue类似,但是内部容器基于boost::circular_buffer<T>

    • Condition

      pthread_cond的封装

    • CountDownLatch

      CountDownLatch,类似发令枪,对condition的再包装,可以保证所有线程同时启动。

    • Exception

      backtrace_symbolsbacktrace的包装类

    • Mutex

      MutexLockpthread_mutex_*的包装类

    • Singleton
      • 依旧是静态成员变量作为单例对象,但是用pthread_once保证多线程访问的唯一性;
      • ::atexit(destroy),在进程退出时销毁之。
    • Thread
      • syscall(SYS_gettid)等同于gettid
      • numCreated_,类静态成员,最后调用__sync_fetch_and_add
      • __thread关键字表示线程本地存储(TSS)
      • boost::functionboost::bind实现类似C#delegate

void*?Thread::startThread(void*?obj)

{

??Thread*?thread?=?static_cast<Thread*>(obj);

??thread->runInThread();//func_的包装,调用了func_

??return?NULL;

}

? ?

void?Thread::runInThread()

{

??tid_?=?CurrentThread::tid();

??muduo::CurrentThread::t_threadName?=?name_.c_str();

??try

??{

????func_();

????muduo::CurrentThread::t_threadName?=?"finished";

??}

?…

}

? ?

typedef?boost::function<void?()>?ThreadFunc;

Thread::Thread(const?ThreadFunc&?func,?const?string&?n):?started_(false),??pthreadId_(0),?tid_(0),

????//func_是实际上要在线程里执行的函数,以boost::function生成了一个函数对象??(functor)

????func_(func),?name_(n)

{

??numCreated_.increment();

}

  • int pthread_atfork(void (*prepare) (void), void (*parent) (void), void (*child) (void) );
    pthread_atfork() 函数声明了在调用 fork() 的线程的上下文中的 fork() 前后调用的 fork() 处理程序。
    ??? fork() 启动前调用 prepare 处理程序。
    ??? 在父进程中返回 fork() 后调用 parent 处理程序。
    ??? 在子进程中返回 fork() 后调用 child 处理程序。
    可以将任何处理程序参数都设置为 NULL。对 pthread_atfork() 进行连续调用的顺序非常重要。例如,prepare 处理程序可能会获取所有需要的互斥。然后,parent child 处理程序可能会释放互斥。获取所有需要的互斥的 prepare 处理程序可确保在对进程执行 fork 之前,所有相关的锁定都由调用 fork 函数的线程持有。此技术可防止子进程中出现死锁。
  • ThreadLocal

    依旧用pthread_get/setspecificOP:为何不用__thread关键字?)。

  • ThreadLocalSingleton

    线程单例模式,单例模板类的instance成员采用__thread关键字修饰,具有TLS属性。

  • ThreadPool

void?ThreadPool::run(const?Task&?task)

{

??//如果没有线程,直接执行task定义的函数

??if?(threads_.empty())

??{

????task();

??}

??else

??{

????MutexLockGuard?lock(mutex_);

????//加入任务队列

????queue_.push_back(task);

????cond_.notify();

??}

}

? ?

ThreadPool::Task?ThreadPool::take()

{

??MutexLockGuard?lock(mutex_);

??//?always?use?a?while-loop,?due?to?spurious?wakeup

??while?(queue_.empty()?&&?running_)

??{

????//如果没有任务,则等待

????cond_.wait();

??}

??Task?task;

??if(!queue_.empty())

??{

????task?=?queue_.front();

????queue_.pop_front();

??}

??return?task;

}

? ?

//此函数就是线程函数

void?ThreadPool::runInThread()

{

??try

??{

????while?(running_)

????{

??????????????//每个线程都从这里获取任务

??????Task?task(take());

??????if?(task)

??????{

??????????????????//执行任务

????????task();

??????}

????}

??}

??…

}

  • Net Classes
    • Buffer

      技术分享

      • Buffervector<char>实现。
      • prependable是用来在多次序列化消息后一次性在其前部写入长度之用的。
      • 分别以readIndexwriteIndex表示可读和可写的缓冲区位置。要写入x字节,则writeIndex += xreadIndex不变,此时可读区域长度为writeIndex - readIndex。如果readIndex == writeIndex,说明无数据供Upper Application读取。
      • makeSpace用于扩展或者重整整个缓冲区,其逻辑如下:

        如果writable < datalen,但是prependable+writeable >= datalen,则将readIndex挪至最前,将prependable+writeable合并得到一个足够大的缓冲区(一般来说,这种情况是由于还有尚未读取的数据,readIndex向后移动位置造成的);如果prependable+writeable < datalen,说明全部可写区域之和也不足,则vertor::resize()扩展缓冲区。

void?makeSpace(size_t?len)

{

????if?(writableBytes()?+?prependableBytes()?<?len?+?kCheapPrepend)

????{

??????//?FIXME:?move?readable?data

??????buffer_.resize(writerIndex_+len);

????}

????else

????{

??????//?move?readable?data?to?the?front,?make?space?inside?buffer

??????assert(kCheapPrepend?<?readerIndex_);

??????size_t?readable?=?readableBytes();

??????std::copy(begin()+readerIndex_,

????????????????begin()+writerIndex_,

????????????????begin()+kCheapPrepend);

??????readerIndex_?=?kCheapPrepend;

??????writerIndex_?=?readerIndex_?+?readable;

??????assert(readable?==?readableBytes());

????}

}

  • Channel

class?Channel?:?boost::noncopyable

{

public:

??typedef?boost::function<void()>?EventCallback;

??typedef?boost::function<void(Timestamp)>?ReadEventCallback;

private:

??EventLoop*?loop_;?//属于哪个reactor

??const?int??fd_;?//关联的FD

??int????????events_;?//关注事件

??int????????revents_;?//ready事件

??bool?eventHandling_;?//当前正在处理事件

??ReadEventCallback?readCallback_;

??EventCallback?writeCallback_;?//如何写数据

??EventCallback?closeCallback_;?//如何关闭链接

??EventCallback?errorCallback_;?//如何处理错误

};

如果loop有事件发生,将触发handleEvent回调:

void?Channel::handleEventWithGuard(Timestamp?receiveTime)

{

??eventHandling_?=?true;

??if?((revents_?&?POLLHUP)?&&?!(revents_?&?POLLIN))

??{

????if?(logHup_)

????{

??????LOG_WARN?<<?"Channel::handle_event()?POLLHUP";

????}

????if?(closeCallback_)?closeCallback_();

??}

? ?

??if?(revents_?&?POLLNVAL)

??{

????LOG_WARN?<<?"Channel::handle_event()?POLLNVAL";

??}

? ?

??if?(revents_?&?(POLLERR?|?POLLNVAL))

??{

????if?(errorCallback_)?errorCallback_();

??}

??if?(revents_?&?(POLLIN?|?POLLPRI?|?POLLRDHUP))

??{

????if?(readCallback_)?readCallback_(receiveTime);

??}

??if?(revents_?&?POLLOUT)

??{

????if?(writeCallback_)?writeCallback_();

??}

??eventHandling_?=?false;

}

  • EventLoop

class?EventLoop?:?boost::noncopyable

{

public:

??void?loop();

??void?quit();

? ?

??///?Runs?callback?immediately?in?the?loop?thread.

??///?It?wakes?up?the?loop,?and?run?the?cb.

??///?If?in?the?same?loop?thread,?cb?is?run?within?the?function.

??///?Safe?to?call?from?other?threads.

??void?runInLoop(const?Functor&?cb);

? ?

??///?Queues?callback?in?the?loop?thread.

??///?Runs?after?finish?pooling.

??///?Safe?to?call?from?other?threads.

??void?queueInLoop(const?Functor&?cb);

? ?

??///?Runs?callback?at?‘time‘.

??///?Safe?to?call?from?other?threads.

??TimerId?runAt(const?Timestamp&?time,?const?TimerCallback&?cb);

? ?

??///?Runs?callback?after?@c?delay?seconds.

??///?Safe?to?call?from?other?threads.

??TimerId?runAfter(double?delay,?const?TimerCallback&?cb);

? ?

??///?Runs?callback?every?@c?interval?seconds.

??///?Safe?to?call?from?other?threads.

??TimerId?runEvery(double?interval,?const?TimerCallback&?cb);

? ?

??///?Cancels?the?timer.

??///?Safe?to?call?from?other?threads.

??void?cancel(TimerId?timerId);

? ?

??//?internal?usage

??void?wakeup();

??void?updateChannel(Channel*?channel);

??void?removeChannel(Channel*?channel);

??bool?isInLoopThread()?const?{?return?threadId_?==?CurrentThread::tid();?}

private:

??void?handleRead();??//?waked?up

??void?doPendingFunctors();

??typedef?std::vector<Channel*>?ChannelList;

? ?

??bool?looping_;?/*?atomic?*/

??bool?quit_;?/*?atomic?*/

??bool?eventHandling_;?/*?atomic?*/

??bool?callingPendingFunctors_;?/*?atomic?*/

??const?pid_t?threadId_;

??Timestamp?pollReturnTime_;

??boost::scoped_ptr<Poller>?poller_;

??boost::scoped_ptr<TimerQueue>?timerQueue_;

??int?wakeupFd_;

??//?unlike?in?TimerQueue,?which?is?an?internal?class,

??//?we?don‘t?expose?Channel?to?client.

??boost::scoped_ptr<Channel>?wakeupChannel_;

??ChannelList?activeChannels_;

??Channel*?currentActiveChannel_;

??MutexLock?mutex_;

??std::vector<Functor>?pendingFunctors_;?//?@BuardedBy?mutex_

};

? ?

__thread?EventLoop*?t_loopInThisThread?=?0;

t_loopInThisThread被定义为per thread的全局变量,并在EventLoop的构造函数中初始化:

? ?

epoll默认工作方式是LT

? ?

技术分享

从这个muduo的工作模型来看,可以采用an IO thread per fd的形式处理各connection的读//encode/decode等工作,计算线程池中的线程在一个eventfd上监听,激活后就将connection作为参数与decoded packet一起传递到计算线程池中,并在计算完成后将结果直接写入IO threadfd。并采用round-robin的方式选出下一个计算线程。

不同的解决方案:实际上这些线程是可以归并的,仅仅取决于任务的性质:IO密集型或是计算密集型。限制仅仅在于:出于避免过多thread context切换造成性能下降和资源对thread数量的约束,不能采用a thread per fd的模型,而是将fd分为若干组比较均衡的分配到IO线程中。

? ?

EventLoop的跨线程激活:

EventLoop::EventLoop()

??:?wakeupFd_(createEventfd()),

????wakeupChannel_(new?Channel(this,?wakeupFd_))

{

??wakeupChannel_->setReadCallback(

??????boost::bind(&EventLoop::handleRead,?this));?//?绑定到handleRead上面了

??//?we?are?always?reading?the?wakeupfd

??wakeupChannel_->enableReading();

}

跨线程激活的函数是wakeUp

void?EventLoop::wakeup()

{

??uint64_t?one?=?1;

??ssize_t?n?=?sockets::write(wakeupFd_,?&one,?sizeof?one);?//?类似于管道直接写

}

一旦wakeup完成之后那么wakeUpFd_就是可读的,这样EventLoop就会被通知到并且立刻跳出epoll_wait开始处理。当然我们需要将这个wakeupFd_ 上面数据读出来,不然的话下一次又会被通知到,读取函数就是handleRead

void?EventLoop::handleRead()

{

??uint64_t?one?=?1;

??ssize_t?n?=?sockets::read(wakeupFd_,?&one,?sizeof?one);

}

runInLoopqueueInLoop就是跨线程任务。

void?EventLoop::runInLoop(const?Functor&?cb){

?????//如果这个函数在自己的线程调用,那么就可以立即执行

??if?(isInLoopThread()){?

????cb();

??}else{

??????????//如果是其他线程调用,那么加入到pendingFunctors里面去

?????queueInLoop(cb);

??????????//并且通知这个线程,有任务到来

?????wakeup();?

??}

}

? ?

void?EventLoop::queueInLoop(const?Functor&?cb){

??{

??MutexLockGuard?lock(mutex_);

??pendingFunctors_.push_back(cb);

??}

??/*被排上队之后如果是在自己线程并且正在执行pendingFunctors的话,那么就可以激活

??否则下一轮完全可以被排上,所以没有必要激活*/

??if?(isInLoopThread()?&&?callingPendingFunctors_){

????wakeup();?

??}

}

  • muduo的工作方式:
    • 一个主线程,处理IO相关事宜。loop放在主线程中,其pendingFunctors_成员是IO/timer相关任务的集合,包括:
      • Connector::startInLoop
      • TcpConnection::setCloseCallback
      • TcpConnection::sendInLoop
      • TcpConnection::shutdownInLoop
      • TcpConnection::connectDestroyed
      • Acceptor::listen
      • TcpConnection::connectEstablished
      • TcpServer::removeConnectionInLoop
      • TimerQueue::addTimerInLoop
      • TimerQueue::cancelInLoop
    • ThreadPool用于处理计算任务,调用栈为:ThreadPool::run(const Task& task) => queue_.push_back(task),然后在ThreadPool::runInThread会作为每个线程的runner,不停的去take()任务并执行。如果需要输出,根据对应的connection,将结果提交至EventLoop的发送队列。

      ? ?

  • 定时器

    调用栈:

    addTimer(const TimerCallback& cb,Timestamp when, double interval) => addTimerInLoop(Timer* timer) =>insert(timer)中:

typedef?std::pair<Timestamp,?Timer*>?Entry;

typedef?std::set<Entry>?TimerList;

bool?earliestChanged?=?false;

Timestamp?when?=?timer->expiration();

TimerList::iterator?it?=?timers_.begin();

if?(it?==?timers_.end()?||?when?<?it->first)

{

??earliestChanged?=?true;

}

这里的微妙之处在于:如果是第一个定时器,begin()=end(),那么earliestChanged = true;会触发resetTimerfd

void?TimerQueue::addTimerInLoop(Timer*?timer)

{

??loop_->assertInLoopThread();

??bool?earliestChanged?=?insert(timer);

? ?

??if?(earliestChanged)

??{

????//调用::timerfd_settime(timerfd,?0,?&newValue,?&oldValue)启动定时器

????resetTimerfd(timerfd_,?timer->expiration());

??}

}

当定时器触发后:

void?TimerQueue::handleRead()

{

??loop_->assertInLoopThread();

??Timestamp?now(Timestamp::now());

??readTimerfd(timerfd_,?now);

??//我们可以知道有哪些计时器超时

??std::vector<Entry>?expired?=?getExpired(now);?

??//?safe?to?callback?outside?critical?section

??for?(std::vector<Entry>::iterator?it?=?expired.begin();

??????it?!=?expired.end();?++it)

??{

?????//对于这些超时的Timer,执行run()函数,对应也就是我们一开始注册的回调函数

?????it->second->run();?

??}

??reset(expired,?now);

}

  • TcpConnection Class

    TcpConnection完成的工作就是当TCP连接建立之后处理socket的读写以及关闭。同样我们看看TcpConnection的结构

class?TcpConnection?:?boost::noncopyable,?public?boost::enable_shared_from_this<TcpConnection>

{

??public:

????///?Constructs?a?TcpConnection?with?a?connected?sockfd

????///

????///?User?should?not?create?this?object.

????TcpConnection(EventLoop*?loop,?//?建立连接需要一个Reactor

??????????????????const?string&?name,?//?连接名称

??????????????????int?sockfd,?//?连接fd

??????????????????const?InetAddress&?localAddr,?//?本地IP@

??????????????????const?InetAddress&?peerAddr);?//对端IP@

????//?called?when?TcpServer?accepts?a?new?connection

????void?connectEstablished();???//?should?be?called?only?once

????//?called?when?TcpServer?has?removed?me?from?its?map

????void?connectDestroyed();??//?should?be?called?only?once

??private:

????enum?StateE?{?kDisconnected,?kConnecting,?kConnected,?kDisconnecting?};

????void?sendInLoop(const?void*?message,?size_t?len);?//?发送消息

????void?setState(StateE?s)?{?state_?=?s;?}

? ?

????EventLoop*?loop_;

????string?name_;

????StateE?state_;??//?FIXME:?use?atomic?variable

????//?we?don‘t?expose?those?classes?to?client.

????boost::scoped_ptr<Socket>?socket_;?//?socket.

????boost::scoped_ptr<Channel>?channel_;?//?连接channel

????InetAddress?localAddr_;

????InetAddress?peerAddr_;

????ConnectionCallback?connectionCallback_;?//?连接回调,这个触发包括在连接建立和断开都会触发

????MessageCallback?messageCallback_;?//?有数据可读的回调

????WriteCompleteCallback?writeCompleteCallback_;?//?写完毕的回调

????CloseCallback?closeCallback_;?//?连接关闭回调

????Buffer?inputBuffer_;?//?数据读取buffer.

????Buffer?outputBuffer_;?//?FIXME:?use?list<Buffer>?as?output?buffer.

????boost::any?context_;?//?上下文环境

????//?FIXME:?creationTime_,?lastReceiveTime_

????//????????bytesReceived_,?bytesSent_

};

首先TcpConnection在初始化的时候会建立好channel。然后一旦TcpClient或者是TcpServer建立连接之后的话,那么调用TcpConnection::connectEstablished。这个函数内部的话就会将channel设置成为可读。一旦可读的话那么TcpConnection内部就会调用handleRead这个动作,内部托管了读取数据这个操作。 读取完毕之后然后交给MessageBack这个回调进行操作。如果需要写的话调用sendInLoop,那么会将message放在outputBuffer里面,并且设置可写。当可写的话TcpConnection内部就托管写,然后写完之后的话会发生writeCompleteCallback这个回调。托管的读写操作都是非阻塞的。如果希望断开的话调用 shutdown。解除这个连接的话那么可以调用TcpConnection::connectDestroyed,内部大致操作就是从reactor移除这个channel

TcpConnection这层并不知道一次需要读取多少个字节,这个是在上层进行消息拆分的。TcpConnection一次最多读取64K字节的内容,然后交给Upper App。后者决定这些内容是否足够,如果不够的话那么直接返回让Reactor继续等待读。 同样写的话内部也是会分多次写。这样就要求reactor内部必须使用水平触发而不是边缘触发。

  • TcpClient Class

    这个类主要包装了TcpConnector的功能。

TcpClient::TcpClient(EventLoop*?loop,

?????????????????????const?InetAddress&?serverAddr,

?????????????????????const?string&?name)

????????:?loop_(CHECK_NOTNULL(loop)),

??????????connector_(new?Connector(loop,?serverAddr)),

??????????name_(name),

??????????connectionCallback_(defaultConnectionCallback),

??????????messageCallback_(defaultMessageCallback),

??????????retry_(false),

??????????connect_(true),

??????????nextConnId_(1)

{

????connector_->setNewConnectionCallback(

????????boost::bind(&TcpClient::newConnection,?this,?_1));

????//?FIXME?setConnectFailedCallback

}

  • TcpServer Class

TcpServer::TcpServer(EventLoop*?loop,

?????????????????????const?InetAddress&?listenAddr,

?????????????????????const?string&?nameArg)

??:?loop_(CHECK_NOTNULL(loop)),

????hostport_(listenAddr.toHostPort()),

????name_(nameArg),

????acceptor_(new?Acceptor(loop,?listenAddr)),

????threadPool_(new?EventLoopThreadPool(loop)),

????connectionCallback_(defaultConnectionCallback),

????messageCallback_(defaultMessageCallback),

????started_(false),

????nextConnId_(1)

{

??acceptor_->setNewConnectionCallback(

??????boost::bind(&TcpServer::newConnection,?this,?_1,?_2));

}

同样是建立好acceptor这个对象然后设置好回调为TcpServer::newConnection,同时在外部设置好TcpConnection的各个回调。然后调用start来启动服务器,start 会调用acceptor::listen这个方法,一旦有连接建立的话那么会调用newConnection。下面是newConnection代码:

void?TcpServer::newConnection(int?sockfd,?const?InetAddress&?peerAddr)

{

????loop_->assertInLoopThread();

????EventLoop*?ioLoop?=?threadPool_->getNextLoop();

????char?buf[32];

????snprintf(buf,?sizeof?buf,?":%s#%d",?hostport_.c_str(),?nextConnId_);

????++nextConnId_;

????string?connName?=?name_?+?buf;

????//?FIXME?poll?with?zero?timeout?to?double?confirm?the?new?connection

????TcpConnectionPtr?conn(

????????new?TcpConnection(ioLoop,?connName,?sockfd,?localAddr,?peerAddr));

????connections_[connName]?=?conn;

????conn->setConnectionCallback(connectionCallback_);

????conn->setMessageCallback(messageCallback_);

????conn->setWriteCompleteCallback(writeCompleteCallback_);

????conn->setCloseCallback(

????????boost::bind(&TcpServer::removeConnection,?this,?_1));?//?FIXME:?unsafe

????ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished,?conn));

}

对于服务端来说连接都被唯一化了然后映射为字符串放在connections_这个容器内部。threadPool_->getNextLoop()可以轮询地将取出每一个线程然后将 TcpConnection::connectEstablished轮询地丢到每个线程里面去完成。存放在connections_是有原因了,每个TcpConnection有唯一一个名字,这样Server 就可以根据TcpConnection来从自己内部移除链接了。在析构函数里面可以遍历connections_内容得到所有建立的连接并且逐一释放。

muduo Library

标签:

原文地址:http://www.cnblogs.com/CodeComposer/p/4719783.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!