标签:muduo 线程库 threadpoll 多线程
线程池本质上是一个生产者消费者的模型。在线程池有一个存放现场的ptr_vector,相当于消费者;有一个存放任务的deque,相当于仓库。线程(消费者)去仓库取任务,然后执行;当有新程序员是生产者,当有新任务时,就把任务放到deque(仓库)。
任务队列(仓库)是有边界的,所以在实现时需要有两个信号量,相当与BoundedBlockingQueue。
每个线程在第一次运行时都会调用一次回调函数threadInitCallback_,为线程执行做准备。
在线程池开始运行之前,要先设置任务队列的大小(即调用setMaxQueueSize),因为运行线程池时,线程会从任务队列取任务。
逻辑清楚了,后面分析源代码就容易了。
ThreadPoll.h
class ThreadPool : boost::noncopyable
{
public:
typedef boost::function<void ()> Task;//函数对象,相当与任务
explicit ThreadPool(const string& nameArg = string("ThreadPool"));
~ThreadPool();
// Must be called before start().因为在start()调用时,会从队列取任务
void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }//设置任务队列的大小,必须在start之前设置。
void setThreadInitCallback(const Task& cb)//设置回调函数,每次在执行任务前先调用回调函数
{ threadInitCallback_ = cb; }
void start(int numThreads);//设置线程池的大小
void stop();//停止线程池
const string& name() const
{ return name_; }
size_t queueSize() const;
// Could block if maxQueueSize > 0
void run(const Task& f);//把任务添加到任务队列,可能不是立即执行。
#ifdef __GXX_EXPERIMENTAL_CXX0X__
void run(Task&& f);
#endif
private:
bool isFull() const;
void runInThread();//真正执行任务的函数
Task take();//从任务队列去任务
mutable MutexLock mutex_;
Condition notEmpty_;//任务队列非空信号量
Condition notFull_;//任务队列非满信号了
string name_;
Task threadInitCallback_;//回调函数,在线程池第一次执行任务是调用。
boost::ptr_vector<muduo::Thread> threads_;//存放线程
std::deque<Task> queue_;//存放任务
size_t maxQueueSize_;
bool running_;//线程池是否运行
};
ThreadPoll.cc
ThreadPool::ThreadPool(const string& nameArg)
: mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
name_(nameArg),
maxQueueSize_(0),
running_(false)
{
}
ThreadPool::~ThreadPool()
{
if (running_)//如果线程池开始运行
{
stop();
}
}
void ThreadPool::start(int numThreads)//线程池中的线程数,并开始运行线程池
{
assert(threads_.empty());
running_ = true;
threads_.reserve(numThreads);
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof id, "%d", i+1);
threads_.push_back(new muduo::Thread(
boost::bind(&ThreadPool::runInThread, this), name_+id));//绑定runInThread为线程运行函数
threads_[i].start();//线程开始执行
}
if (numThreads == 0 && threadInitCallback_)//如果线程池为空,且有回调函数,则调用回调函数。这时相当与只有一个主线程
{
threadInitCallback_();
}
}
void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_);
running_ = false;
notEmpty_.notifyAll();//通知所有等待在任务队列上的线程
}
for_each(threads_.begin(),
threads_.end(),
boost::bind(&muduo::Thread::join, _1));//每个线程都调用join
}
size_t ThreadPool::queueSize() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
void ThreadPool::run(const Task& task)//向任务对了添加任务
{
if (threads_.empty())//如果线程池是空的,那么直接由当前线程执行任务
{
task();
}
else
{
MutexLockGuard lock(mutex_);
while (isFull())//当任务对了已满
{
notFull_.wait();//等待非满通知
}
assert(!isFull());
queue_.push_back(task);//添加到任务队列
notEmpty_.notify();//告知任务对了已经非空,可以执行了
}
}
#ifdef __GXX_EXPERIMENTAL_CXX0X__
void ThreadPool::run(Task&& task)
{
if (threads_.empty())
{
task();
}
else
{
MutexLockGuard lock(mutex_);
while (isFull())
{
notFull_.wait();
}
assert(!isFull());
queue_.push_back(std::move(task));
notEmpty_.notify();
}
}
#endif
ThreadPool::Task ThreadPool::take()//从队列取出任务
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty() && running_)//如果队列已空,且线程已经开始运行
{
notEmpty_.wait();//等待队列非空信号
}
Task task;
if (!queue_.empty())
{
task = queue_.front();//取出队列头的任务
queue_.pop_front();
if (maxQueueSize_ > 0)
{
notFull_.notify();//通知,告知任务队列已经非满了,可以放任务进来了
}
}
return task;
}
bool ThreadPool::isFull() const//判断队列是否已满
{
mutex_.assertLocked();//是否被当前线程锁住
return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}
void ThreadPool::runInThread()
{
try
{
if (threadInitCallback_)//如果有回调函数,先调用回调函数。为任务执行做准备
{
threadInitCallback_();
}
while (running_)//线程池已经开始运行
{
Task task(take());//取出任务。有可能阻塞在这里,因为任务队列为空。
if (task)
{
task();//执行任务
}
}
}
catch (const Exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
throw; // rethrow
}
}
写个测试函数:
//threadpoolTest.cpp
#include <muduo/base/ThreadPool.h>
#include <muduo/base/CurrentThread.h>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <iostream>
using namespace muduo;
class TaskClass
{
public:
TaskClass(int id):id_(id){}
void Print()
{
std::cout<<"Task ID is: "<<id_<<", Thread ID is: "<<CurrentThread::tid()<<", Thread name is: "<<CurrentThread::name()<<std::endl;
}
private:
int id_;
};
void ThreadInitFunc()
{
std::cout<<"Init Thread. "<<", Thread ID is: "<<CurrentThread::tid()<<", Thread name is: "<<CurrentThread::name()<<std::endl;
}
int main()
{
ThreadPool pool;
pool.setMaxQueueSize(20);
pool.setMaxQueueSize(20);
pool.setThreadInitCallback(boost::bind(ThreadInitFunc));
pool.start(4);
for(int i=0; i<20; ++i)
{
boost::shared_ptr<TaskClass> taskClass(new TaskClass(i));
pool.run(boost::bind(&TaskClass::Print, taskClass));
}
sleep(3);
pool.stop();
return 0;
}
版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:muduo 线程库 threadpoll 多线程
原文地址:http://blog.csdn.net/kangroger/article/details/47170163