标签:
muduo是由陈硕(http://www.cnblogs.com/Solstice)开发的一个Linux多线程网络库,采用了很多新的Linux特性(例如eventfd、timerfd)和GCC内置函数。其主要特点为:
? ?
? ?
调用了如下GCC提供的原子操作内建函数:
? ?
线程安全的队列,内部实现为std::deque<T>
? ?
与BlockingQueue类似,但是内部容器基于boost::circular_buffer<T>
? ?
pthread_cond的封装
? ?
CountDownLatch,类似发令枪,对condition的再包装,可以保证所有线程同时启动。
(TODO: 代码示例)
? ?
backtrace_symbols和backtrace的包装类
? ?
MutexLock:pthread_mutex_*的包装类
? ?
? ?
void* Thread::startThread(void* obj)
{
Thread* thread = static_cast<Thread*>(obj);
thread->runInThread();//对func_的包装,调用了func_
return NULL;
}
? ?
void Thread::runInThread()
{
tid_ = CurrentThread::tid();
muduo::CurrentThread::t_threadName = name_.c_str();
try
{
func_();
muduo::CurrentThread::t_threadName = "finished";
}
…
}
? ?
typedef boost::function<void ()> ThreadFunc;
Thread::Thread(const ThreadFunc& func, const string& n)
: started_(false),
pthreadId_(0),
tid_(0),
//func_是实际上要在线程里执行的函数,以boost::function生成了一个函数对象 (functor)
func_(func),
name_(n)
{
numCreated_.increment();
}
? ?
? ?
依旧用pthread_get/setspecific(OP:为何不用__thread关键字?)。
? ?
线程单例模式,单例模板类的instance成员采用__thread关键字修饰,具有TLS属性。
? ?
void ThreadPool::run(const Task& task)
{
//如果没有线程,直接执行task定义的函数
if (threads_.empty())
{
task();
}
else
{
MutexLockGuard lock(mutex_);
//加入任务队列
queue_.push_back(task);
cond_.notify();
}
}
? ?
ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty() && running_)
{
//如果没有任务,则等待
cond_.wait();
}
Task task;
if(!queue_.empty())
{
task = queue_.front();
queue_.pop_front();
}
return task;
}
? ?
//此函数就是线程函数
void ThreadPool::runInThread()
{
try
{
while (running_)
{
//每个线程都从这里获取任务
Task task(take());
if (task)
{
//执行任务
task();
}
}
}
…
}
? ?
如果writable < datalen,但是prependable+writeable >= datalen,则将readIndex挪至最前,将prependable+writeable合并得到一个足够大的缓冲区(一般来说,这种情况是由于还有尚未读取的数据,readIndex向后移动位置造成的);如果prependable+writeable < datalen,说明全部可写区域之和也不足,则vertor::resize()扩展缓冲区。
void makeSpace(size_t len)
{
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
{
// FIXME: move readable data
buffer_.resize(writerIndex_+len);
}
else
{
// move readable data to the front, make space inside buffer
assert(kCheapPrepend < readerIndex_);
size_t readable = readableBytes();
std::copy(begin()+readerIndex_,
begin()+writerIndex_,
begin()+kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
assert(readable == readableBytes());
}
}
? ?
class Channel : boost::noncopyable
{
public:
typedef boost::function<void()> EventCallback;
typedef boost::function<void(Timestamp)> ReadEventCallback;
private:
EventLoop* loop_; //属于哪个reactor
const int fd_; //关联的FD
int events_; //关注事件
int revents_; //ready事件
bool eventHandling_; //当前正在处理事件
ReadEventCallback readCallback_;
EventCallback writeCallback_; //如何写数据
EventCallback closeCallback_; //如何关闭链接
EventCallback errorCallback_; //如何处理错误
};
如果loop有事件发生,将触发handleEvent回调:
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}
? ?
if (revents_ & POLLNVAL)
{
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
? ?
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
? ?
class EventLoop : boost::noncopyable
{
public:
void loop();
void quit();
? ?
/// Runs callback immediately in the loop thread.
/// It wakes up the loop, and run the cb.
/// If in the same loop thread, cb is run within the function.
/// Safe to call from other threads.
void runInLoop(const Functor& cb);
? ?
/// Queues callback in the loop thread.
/// Runs after finish pooling.
/// Safe to call from other threads.
void queueInLoop(const Functor& cb);
? ?
/// Runs callback at ‘time‘.
/// Safe to call from other threads.
TimerId runAt(const Timestamp& time, const TimerCallback& cb);
? ?
/// Runs callback after @c delay seconds.
/// Safe to call from other threads.
TimerId runAfter(double delay, const TimerCallback& cb);
? ?
/// Runs callback every @c interval seconds.
/// Safe to call from other threads.
TimerId runEvery(double interval, const TimerCallback& cb);
? ?
/// Cancels the timer.
/// Safe to call from other threads.
void cancel(TimerId timerId);
? ?
// internal usage
void wakeup();
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
private:
void handleRead(); // waked up
void doPendingFunctors();
typedef std::vector<Channel*> ChannelList;
? ?
bool looping_; /* atomic */
bool quit_; /* atomic */
bool eventHandling_; /* atomic */
bool callingPendingFunctors_; /* atomic */
const pid_t threadId_;
Timestamp pollReturnTime_;
boost::scoped_ptr<Poller> poller_;
boost::scoped_ptr<TimerQueue> timerQueue_;
int wakeupFd_;
// unlike in TimerQueue, which is an internal class,
// we don‘t expose Channel to client.
boost::scoped_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_;
Channel* currentActiveChannel_;
MutexLock mutex_;
std::vector<Functor> pendingFunctors_; // @BuardedBy mutex_
};
? ?
__thread EventLoop* t_loopInThisThread = 0;
t_loopInThisThread被定义为per thread的全局变量,并在EventLoop的构造函数中初始化:
? ?
epoll默认工作方式是LT。
? ?
从这个muduo的工作模型来看,可以采用an IO thread per fd的形式处理各connection的读/写/encode/decode等工作,计算线程池中的线程在一个eventfd上监听,激活后就将connection作为参数与decoded packet一起传递到计算线程池中,并在计算完成后将结果直接写入IO thread的fd。并采用round-robin的方式选出下一个计算线程。
不同的解决方案:实际上这些线程是可以归并的,仅仅取决于任务的性质:IO密集型或是计算密集型。限制仅仅在于:出于避免过多thread context切换造成性能下降和资源对thread数量的约束,不能采用a thread per fd的模型,而是将fd分为若干组比较均衡的分配到IO线程中。
? ?
EventLoop的跨线程激活:
EventLoop::EventLoop()
: wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_))
{
wakeupChannel_->setReadCallback(
boost::bind(&EventLoop::handleRead, this)); // 绑定到handleRead上面了
// we are always reading the wakeupfd
wakeupChannel_->enableReading();
}
? ?
跨线程激活的函数是wakeUp:
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); // 类似于管道直接写
}
? ?
一旦wakeup完成之后那么wakeUpFd_就是可读的,这样EventLoop就会被通知到并且立刻跳出epoll_wait开始处理。当然我们需要将这个wakeupFd_ 上面数据读出来,不然的话下一次又会被通知到,读取函数就是handleRead:
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
}
? ?
runInLoop和queueInLoop就是跨线程任务。
void EventLoop::runInLoop(const Functor& cb){
//如果这个函数在自己的线程调用,那么就可以立即执行
if (isInLoopThread()){
cb();
}else{
//如果是其他线程调用,那么加入到pendingFunctors里面去
queueInLoop(cb);
//并且通知这个线程,有任务到来
wakeup();
}
}
void EventLoop::queueInLoop(const Functor& cb){
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
}
/*被排上队之后如果是在自己线程并且正在执行pendingFunctors的话,那么就可以激活
否则下一轮完全可以被排上,所以没有必要激活*/
if (isInLoopThread() && callingPendingFunctors_){
wakeup();
}
}
? ?
? ?
调用栈:
addTimer(const TimerCallback& cb,Timestamp when, double interval) =>
addTimerInLoop(Timer* timer) =>
insert(timer)中:
typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;
bool earliestChanged = false;
Timestamp when = timer->expiration();
TimerList::iterator it = timers_.begin();
if (it == timers_.end() || when < it->first)
{
earliestChanged = true;
}
这里的微妙之处在于:如果是第一个定时器,begin()=end(),那么earliestChanged = true;会触发resetTimerfd:
void TimerQueue::addTimerInLoop(Timer* timer)
{
loop_->assertInLoopThread();
bool earliestChanged = insert(timer);
? ?
if (earliestChanged)
{
//调用::timerfd_settime(timerfd, 0, &newValue, &oldValue)启动定时器
resetTimerfd(timerfd_, timer->expiration());
}
}
当定时器触发后:
void TimerQueue::handleRead()
{
loop_->assertInLoopThread();
Timestamp now(Timestamp::now());
readTimerfd(timerfd_, now);
//我们可以知道有哪些计时器超时
std::vector<Entry> expired = getExpired(now);
// safe to callback outside critical section
for (std::vector<Entry>::iterator it = expired.begin();
it != expired.end(); ++it)
{
//对于这些超时的Timer,执行run()函数,对应也就是我们一开始注册的回调函数
it->second->run();
}
reset(expired, now);
}
? ?
TcpConnection完成的工作就是当TCP连接建立之后处理socket的读写以及关闭。同样我们看看TcpConnection的结构
class TcpConnection : boost::noncopyable, public boost::enable_shared_from_this<TcpConnection>
{
public:
/// Constructs a TcpConnection with a connected sockfd
///
/// User should not create this object.
TcpConnection(EventLoop* loop, // 建立连接需要一个Reactor
const string& name, // 连接名称
int sockfd, // 连接fd
const InetAddress& localAddr, // 本地IP@
const InetAddress& peerAddr); //对端IP@
// called when TcpServer accepts a new connection
void connectEstablished(); // should be called only once
// called when TcpServer has removed me from its map
void connectDestroyed(); // should be called only once
private:
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
void sendInLoop(const void* message, size_t len); // 发送消息
void setState(StateE s) { state_ = s; }
EventLoop* loop_;
string name_;
StateE state_; // FIXME: use atomic variable
// we don‘t expose those classes to client.
boost::scoped_ptr<Socket> socket_; // socket.
boost::scoped_ptr<Channel> channel_; // 连接channel
InetAddress localAddr_;
InetAddress peerAddr_;
ConnectionCallback connectionCallback_; // 连接回调,这个触发包括在连接建立和断开都会触发
MessageCallback messageCallback_; // 有数据可读的回调
WriteCompleteCallback writeCompleteCallback_; // 写完毕的回调
CloseCallback closeCallback_; // 连接关闭回调
Buffer inputBuffer_; // 数据读取buffer.
Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.
boost::any context_; // 上下文环境
// FIXME: creationTime_, lastReceiveTime_
// bytesReceived_, bytesSent_
};
首先TcpConnection在初始化的时候会建立好channel。然后一旦TcpClient或者是TcpServer建立连接之后的话,那么调用TcpConnection::connectEstablished。这个函数内部的话就会将channel设置成为可读。一旦可读的话那么TcpConnection内部就会调用handleRead这个动作,内部托管了读取数据这个操作。 读取完毕之后然后交给MessageBack这个回调进行操作。如果需要写的话调用sendInLoop,那么会将message放在outputBuffer里面,并且设置可写。当可写的话TcpConnection内部就托管写,然后写完之后的话会发生writeCompleteCallback这个回调。托管的读写操作都是非阻塞的。如果希望断开的话调用 shutdown。解除这个连接的话那么可以调用TcpConnection::connectDestroyed,内部大致操作就是从reactor移除这个channel。
在TcpConnection这层并不知道一次需要读取多少个字节,这个是在上层进行消息拆分的。TcpConnection一次最多读取64K字节的内容,然后交给Upper App。后者决定这些内容是否足够,如果不够的话那么直接返回让Reactor继续等待读。 同样写的话内部也是会分多次写。这样就要求reactor内部必须使用水平触发而不是边缘触发。
? ?
这个类主要包装了TcpConnector的功能。
TcpClient::TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& name)
: loop_(CHECK_NOTNULL(loop)),
connector_(new Connector(loop, serverAddr)),
name_(name),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
retry_(false),
connect_(true),
nextConnId_(1)
{
connector_->setNewConnectionCallback(
boost::bind(&TcpClient::newConnection, this, _1));
// FIXME setConnectFailedCallback
}
? ?
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg)
: loop_(CHECK_NOTNULL(loop)),
hostport_(listenAddr.toHostPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr)),
threadPool_(new EventLoopThreadPool(loop)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
started_(false),
nextConnId_(1)
{
acceptor_->setNewConnectionCallback(
boost::bind(&TcpServer::newConnection, this, _1, _2));
}
同样是建立好acceptor这个对象然后设置好回调为TcpServer::newConnection,同时在外部设置好TcpConnection的各个回调。然后调用start来启动服务器,start 会调用acceptor::listen这个方法,一旦有连接建立的话那么会调用newConnection。下面是newConnection代码:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
// FIXME poll with zero timeout to double confirm the new connection
TcpConnectionPtr conn(
new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
}
对于服务端来说连接都被唯一化了然后映射为字符串放在connections_这个容器内部。threadPool_->getNextLoop()可以轮询地将取出每一个线程然后将 TcpConnection::connectEstablished轮询地丢到每个线程里面去完成。存放在connections_是有原因了,每个TcpConnection有唯一一个名字,这样Server 就可以根据TcpConnection来从自己内部移除链接了。在析构函数里面可以遍历connections_内容得到所有建立的连接并且逐一释放。
? ?
? ?
? ?
标签:
原文地址:http://www.cnblogs.com/CodeComposer/p/4714373.html