线程池实现
Table of Contents
1 基本设计思路
我们首先设计TPThread类,用于管理单个线程的属性和方法;有了TPThread表示的线程之后,我们定义ThreadPool类,用以管理一组TPThread对象,此处所说的管理包括:针对所有TPThread线程的创建、销毁以及调度。
我们怎么将需要在线程中调用的业务逻辑代码接入线程池呢?选择之一是利用TPThread提供多态函数,将业务逻辑代码嵌入TPThread的子对象。但是这样做,在每次将并发任务代理给线程池之后,TPThread将会绑定到固定的业务逻辑上。更灵活的设计是分离出一个专门用于代理并发任务的TPTask类。
那么,现在的线程池结构是:一个全局的ThreadPool对象,在它初始化的时候会创建一组初始数量的TPThread对象,用户的并发业务逻辑在TPTask的子对象中实现,这样的子对象同样也都交给全局的ThreadPool对象管理,ThreadPool中当前空闲的TPThread线程将被分配去处理TPTask任务。
2 使用线程池的优势
设计线程池的初衷是规避频繁创建和销毁线程的开销。
但从上面的设计来看,除了性能方面的优势之外,利用线程池,用户通常只需一个类似 addTask 的方法即可快捷地添加并发任务;并发线程与主线程的交互也将更加简单,这将在接下来的源码中体现出来。
综上所述,线程池对比操作系统提供的原始线程控制原语,它不仅降低了线程频繁创建、销毁的性能开销,也为用户提供了更简易明了的操作接口。
3 TPTask
我们将为TPTask类提供两个函数:一个纯虚函数,用于提供给继承该对象的用户子对象重写该方法以嵌入并发的业务逻辑;一个有默认实现的在主线程中被调用的虚函数。
class TPTask { public: enum TPTaskState { TPTask_Completed = 0, // 一个任务已经完成 TPTask_ContinueMainThread = 1, // 继续在主线程执行 TPTask_ContinueChildThread = 2, // 继续在子线程执行 }; virtual bool process() = 0; virtual TPTask::TPTaskState presentMainThread() { return TPTask::TPTask_Completed; } };
presentMainThread 函数是并发线程与主线程交互的接口,它将在 TPThread 对象的 process 函数执行之后的某一时刻在主线程中被调用。
4 TPThread
class ThreadPool; class TPThread { public: friend class ThreadPool; enum EThreadState { ThreadState_Stop = -1, ThreadState_Sleep = 0, ThreadState_Busy = 1, ThreadState_End = 2 }; TPThread(ThreadPool* threadPool, int threadWaitSecond = 0) :mThreadWaitSecond(threadWaitSecond) ,mpCurrTask(NULL) ,mpThreadPool(threadPool) { mState = ThreadState_Sleep; initCond(); initMutex(); } virtual ~TPThread() { deleteCond(); deleteMutex(); } THREAD_ID createThread(void); bool join(void); void onTaskCompleted(void); // 线程通知 等待条件信号 bool onWaitCondSignal(void); virtual TPTask* tryGetTask(void); #if PLATFORM == PLATFORM_WIN32 static unsigned __stdcall threadFunc(void *arg); #else static void* threadFunc(void* arg); #endif int sendCondSignal(void) { return THREAD_SINGNAL_SET(mCond); } virtual void onStart() {} virtual void onEnd() {} virtual void onProcessTaskStart(TPTask* pTask) {} virtual void processTask(TPTask* pTask) { pTask->process(); } virtual void onProcessTaskEnd(TPTask* pTask) {} THREAD_ID id(void) const { return mTid; } void id(THREAD_ID tidp) { mTid = tidp; } virtual void initCond(void) { THREAD_SINGNAL_INIT(mCond); } virtual void initMutex(void) { THREAD_MUTEX_INIT(mMutex); } virtual void deleteCond(void) { THREAD_SINGNAL_DELETE(mCond); } virtual void deleteMutex(void) { THREAD_MUTEX_DELETE(mMutex); } virtual void lock(void) { THREAD_MUTEX_LOCK(mMutex); } virtual void unlock(void) { THREAD_MUTEX_UNLOCK(mMutex); } TPTask* task(void) const { return mpCurrTask; } void task(TPTask* tpt) { mpCurrTask = tpt; } int state(void) const { return mState; } ThreadPool* threadPool() { return mpThreadPool; } virtual std::string printWorkState() { char buf[128]; lock(); sprintf(buf, "%p,%u", mpCurrTask, mDoneTasks); unlock(); return buf; } void resetDoneTasks() { mDoneTasks = 0; } void incDoneTasks() { ++mDoneTasks; } protected: THREAD_ID mTid; // 本线程的ID THREAD_SINGNAL mCond; THREAD_MUTEX mMutex; int mThreadWaitSecond; // 线程空闲状态超过这个秒数则线程退出, 小于0为永久线程(秒单位) TPTask *mpCurrTask; // 该线程的当前执行的任务 ThreadPool *mpThreadPool; // 线程池指针 EThreadState mState; // 线程状态 uint32 mDoneTasks; // 线程启动一次在未改变到闲置状态下连续执行的任务计数 };
该对象封装了Win32和Unix平台下的线程实现。mpCurrTask指向该线程当前的任务,这不需要用户操心,它实际是由ThreadPool管理的。
5 ThreadPool
class ThreadPool { public: ThreadPool(); virtual ~ThreadPool(); void finalise(); void destroy(); /** 创建线程池 @param inewThreadCount: 当系统繁忙时线程池会新增加这么多线程(临时) @param inormalMaxThreadCount: 线程池会一直保持这么多个数的线程 @param imaxThreadCount: 线程池最多只能有这么多个线程 */ bool createThreadPool(uint32 inewThreadCount, uint32 inormalMaxThreadCount, uint32 imaxThreadCount); virtual TPThread* createThread(int threadWaitSecond = ThreadPool::timeout); void bufferTask(TPTask* tptask); TPTask* popbufferTask(void); bool addFreeThread(TPThread* tptd); bool addBusyThread(TPThread* tptd); void addFiniTask(TPTask* tptask); bool removeHangThread(TPThread* tptd); virtual void onMainThreadTick(); bool hasThread(TPThread* pTPThread); std::string printThreadWorks(); bool addTask(TPTask* tptask); bool addBackgroundTask(TPTask* tptask) { return addTask(tptask); } bool pushTask(TPTask* tptask) { return addTask(tptask); } uint32 currentThreadCount(void) const { return mCurrentThreadCount; } uint32 currentFreeThreadCount(void) const { return mCurrentFreeThreadCount; } bool isThreadCountMax(void) const { return mCurrentThreadCount >= mMaxThreadCount; } bool isBusy(void) const { return mBufferedTaskList.size() > THREAD_BUSY_SIZE; } bool isInitialize(void) const { return mIsInitialize; } bool isDestroyed() const { return mIsDestroyed; } uint32 bufferTaskSize() const { return mBufferedTaskList.size(); } std::queue<TPTask*>& bufferedTaskList() { return mBufferedTaskList; } void lockBufferedTaskList() { THREAD_MUTEX_LOCK(mBufferedTaskListMutex); } void unlockBufferedTaskList() { THREAD_MUTEX_UNLOCK(mBufferedTaskListMutex); } uint32 finiTaskSize() const { return mFiniTaskListCount; } virtual std::string name() const{ return "ThreadPool"; } public: static int timeout; protected: bool mIsInitialize; bool mIsDestroyed; std::queue<TPTask *> mBufferedTaskList; // 系统处于繁忙时还未处理的任务列表 std::list<TPTask *> mFinishedTaskList; // 已经完成的任务列表 size_t mFiniTaskListCount; THREAD_MUTEX mBufferedTaskListMutex; // 处理mBufferedTaskList互斥锁 THREAD_MUTEX mThreadStateListMutex; // 处理mBufferedTaskList and mFreeThreadList互斥锁 THREAD_MUTEX mFinishedTaskListMutex; // 处理mFinishedTaskList互斥锁 std::list<TPThread *> mBusyThreadList; // 繁忙的线程列表 std::list<TPThread *> mFreeThreadList; // 闲置的线程列表 std::list<TPThread *> mAllThreadList; // 所有的线程列表 uint32 mMaxThreadCount; // 最大线程总数 uint32 mExtraNewAddThreadCount; // 如果mNormalThreadCount不足够使用则会新创建这么多线程 uint32 mCurrentThreadCount; // 当前线程数 uint32 mCurrentFreeThreadCount; // 当前闲置的线程数 uint32 mNormalThreadCount; // 标准状态下的线程总数 即:默认情况下一启动服务器就开启这么多线程,如果线程不足够,则会新创建一些线程, 最大能够到mMaxThreadCount };
5.1 线程管理
从声明中可以看到有三个线程对象列表
- mAllThreadList 该容器是 mFreeThreadList 和 mBusyThreadList 容器的并集。
- mFreeThreadList 该容器记录了当前闲置的线程对象,当用户调用 addTask 方法添加并发任务对象时,线程池将尝试从该容器中取出线程对象来执行并发任务。
- mBusyThreadList 当前正在执行并发任务的线程对象将会保存到此容器。
在调用 createThreadPool 初始化线程池时,所有的线程对象都会被添加到 mFreeThreadList 容器, mBusyThreadList 初始化为空。
在调用线程池的 addTask 方法添加并发任务时,若 mFreeThreadList 非空则会从 mFreeThreadList 列表中取出一个线程对象来执行并发任务,该对象将被转移到 mBusyThreadList,表示其正在执行任务。
5.2 并发任务管理
线程池成员中跟任务相关的容器有:
- mBufferedTaskList 如果当前没有闲置的线程,那么,用户新增的任务将缓存至此容器。
- mFinishedTaskList 在并发任务对象的 process 方法执行完之后,它将被添加到该容器。 mFinishedTaskList 容器中的并发任务对象将在主线程中被进一步处理,其 presentMainThread 函数将被调用,然后根据返回值来决定相应任务对象的去向。
注意到, mBufferedTaskList 和 mFinishedTaskList 容器保存的都是当前未在执行的并发任务。实际上,正在执行的并发任务对象是直接代理给 TPThread 线程对象的,并未用任何形式的容器去缓存。
6 实现细节
6.1 线程回调函数
- 注册线程回调函数
THREAD_ID TPThread::createThread(void) { #if PLATFORM == PLATFORM_WIN32 mTid = (THREAD_ID)_beginthreadex(NULL, 0, &TPThread::threadFunc, (void*)this, NULL, 0); #else pthread_create(&mTid, NULL, TPThread::threadFunc, (void*)this); #endif return mTid; }
- 线程回调函数的实现
#if PLATFORM == PLATFORM_WIN32 unsigned __stdcall TPThread::threadFunc(void *arg) #else void* TPThread::threadFunc(void* arg) #endif { TPThread *tptd = static_cast<TPThread *>(arg); ThreadPool *pThreadPool = tptd->threadPool(); bool isRun = true; tptd->resetDoneTasks(); #if PLATFORM == PLATFORM_WIN32 #else pthread_detach(pthread_self()); #endif tptd->onStart(); while(isRun) { if(tptd->task() != NULL) { isRun = true; } else { tptd->resetDoneTasks(); isRun = tptd->onWaitCondSignal(); } if(!isRun || pThreadPool->isDestroyed()) { if(!pThreadPool->hasThread(tptd)) tptd = NULL; goto __THREAD_END__; } TPTask * task = tptd->task(); if(task == NULL) continue; tptd->mState = ThreadState_Busy; while(task && !tptd->threadPool()->isDestroyed()) { tptd->incDoneTasks(); tptd->onProcessTaskStart(task); tptd->processTask(task); // 处理该任务 tptd->onProcessTaskEnd(task); TPTask * task1 = tptd->tryGetTask(); // 尝试继续从任务队列里取出一个繁忙的未处理的任务 if(!task1) { tptd->onTaskCompleted(); break; } else { pThreadPool->addFiniTask(task); task = task1; tptd->task(task1); } } } __THREAD_END__: if(tptd) { TPTask * task = tptd->task(); if(task) { delete task; } tptd->onEnd(); tptd->mState = ThreadState_End; tptd->resetDoneTasks(); } #if PLATFORM == PLATFORM_WIN32 return 0; #else pthread_exit(NULL); return NULL; #endif }
我们可以看到,该函数的核心即是:取出并发任务并执行它。
在刚进入循环时,会执行如下代码:
if(tptd->task() != NULL) { isRun = true; } else { tptd->resetDoneTasks(); isRun = tptd->onWaitCondSignal(); }
onWaitCondSignal 的实现如下:
bool TPThread::onWaitCondSignal(void) { #if PLATFORM == PLATFORM_WIN32 if(mThreadWaitSecond <= 0) { mState = ThreadState_Sleep; WaitForSingleObject(mCond, INFINITE); ResetEvent(mCond); } else { mState = ThreadState_Sleep; DWORD ret = WaitForSingleObject(mCond, mThreadWaitSecond * 1000); ResetEvent(mCond); // 如果是因为超时了, 说明这个线程很久没有被用到, 我们应该注销这个线程。 // 通知ThreadPool注销自己 if (ret == WAIT_TIMEOUT) { mpThreadPool->removeHangThread(this); return false; } else if(ret != WAIT_OBJECT_0) { } } #else if(mThreadWaitSecond <= 0) { lock(); mState = ThreadState_Sleep; pthread_cond_wait(&mCond, &mMutex); unlock(); } else { struct timeval now; struct timespec timeout; gettimeofday(&now, NULL); timeout.tv_sec = now.tv_sec + mThreadWaitSecond; timeout.tv_nsec = now.tv_usec * 1000; lock(); mState = ThreadState_Sleep; int ret = pthread_cond_timedwait(&mCond, &mMutex, &timeout); unlock(); // 如果是因为超时了, 说明这个线程很久没有被用到, 我们应该注销这个线程。 if (ret == ETIMEDOUT) { // 通知ThreadPool注销自己 mpThreadPool->removeHangThread(this); return false; } else if(ret != 0) { } } #endif return true; }
可以看到,在 tptd->task() 为空的时候,线程将休眠。如果该线程是在线程池初始化时所创建,那么,将进入永久休眠,直至被唤醒;如果该线程是在用户添加任务过程中因线程池中暂无闲置线程而临时创建的,那么,将进入超时休眠。临时线程在超时唤醒后将被回收,初始线程则会直到线程池销毁后才被回收。这体现在如下的代码中:
if(!isRun || pThreadPool->isDestroyed()) { if(!pThreadPool->hasThread(tptd)) tptd = NULL; goto __THREAD_END__; }
在上述预处理之后,线程回调函数将正式进入任务处理流程:
TPTask * task = tptd->task(); if(task == NULL) continue; tptd->mState = ThreadState_Busy; while(task && !tptd->threadPool()->isDestroyed()) { tptd->incDoneTasks(); tptd->onProcessTaskStart(task); tptd->processTask(task); // 处理该任务 tptd->onProcessTaskEnd(task); TPTask * task1 = tptd->tryGetTask(); // 尝试继续从任务队列里取出一个繁忙的未处理的任务 if(!task1) { tptd->onTaskCompleted(); break; } else { pThreadPool->addFiniTask(task); task = task1; tptd->task(task1); } }
上述循环的退出时机是:当前任务已被执行,并且线程池的 mBufferedTaskList 容器为空,这表示该线程暂时完成了自己的使命,可以先休息了。
6.2 线程池管理
6.2.1 线程池初始化
bool ThreadPool::createThreadPool(uint32 inewThreadCount, uint32 inormalMaxThreadCount, uint32 imaxThreadCount) { assert(!mIsInitialize); mExtraNewAddThreadCount = inewThreadCount; mNormalThreadCount = inormalMaxThreadCount; mMaxThreadCount = imaxThreadCount; for(uint32 i = 0; i < mNormalThreadCount; ++i) { TPThread* tptd = createThread(0); if(!tptd) { // ERROR_MSG("ThreadPool::createThreadPool: create is error! \n"); return false; } mCurrentFreeThreadCount++; mCurrentThreadCount++; mFreeThreadList.push_back(tptd); mAllThreadList.push_back(tptd); } mIsInitialize = true; sleepms(100); return true; } TPThread* ThreadPool::createThread(int threadWaitSecond) { TPThread* tptd = new TPThread(this, threadWaitSecond); tptd->createThread(); return tptd; }
线程池的初始化即是创建一批初始数量的线程。在线程初始化过程中创建的线程将一直延续到线程池销毁为止。
6.2.2 并发任务添加
bool ThreadPool::addTask(TPTask* tptask) { THREAD_MUTEX_LOCK(mThreadStateListMutex); if(mCurrentFreeThreadCount > 0) { std::list<TPThread *>::iterator itr = mFreeThreadList.begin(); TPThread* tptd = (TPThread *)(*itr); mFreeThreadList.erase(itr); mBusyThreadList.push_back(tptd); --mCurrentFreeThreadCount; tptd->task(tptask); // 给线程设置新任务 #if PLATFORM == PLATFORM_WIN32 if(tptd->sendCondSignal()== 0){ #else if(tptd->sendCondSignal()!= 0){ #endif THREAD_MUTEX_UNLOCK(mThreadStateListMutex); return false; } THREAD_MUTEX_UNLOCK(mThreadStateListMutex); return true; } bufferTask(tptask); if(isThreadCountMax()) { THREAD_MUTEX_UNLOCK(mThreadStateListMutex); return false; } for(uint32 i = 0; i < mExtraNewAddThreadCount; ++i) { TPThread* tptd = createThread(300); // 设定5分钟未使用则退出的线程 mAllThreadList.push_back(tptd); // 所有的线程列表 mFreeThreadList.push_back(tptd); // 闲置的线程列表 ++mCurrentThreadCount; ++mCurrentFreeThreadCount; } THREAD_MUTEX_UNLOCK(mThreadStateListMutex); return true; }
如果当前有闲置线程,则直接将并发任务代理给它;如果当前没有闲置线程,那就先将并发任务缓存起来,将来的空闲线程将通过 tryGetTask() 取出缓存的并发任务。
我们注意到,在缓存任务之后,会尝试创建一批新的线程对象,这些线程对象是临时的,当线程池“清闲”下来的时候,它们将被回收。这个额外的优化如果利用得好,将会提高线程池的并发度。
6.2.3 与主线程的交互
这是一种常有的需求,并发任务产生的输出通常需要提交给主线程。我们提供 onMainThreadTick 方法来达成此目的。
void ThreadPool::onMainThreadTick() { std::vector<TPTask *> finitasks; THREAD_MUTEX_LOCK(mFinishedTaskListMutex); if(mFinishedTaskList.size() == 0) { THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex); return; } std::copy(mFinishedTaskList.begin(), mFinishedTaskList.end(), std::back_inserter(finitasks)); mFinishedTaskList.clear(); THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex); std::vector<TPTask *>::iterator finiiter = finitasks.begin(); for(; finiiter != finitasks.end(); ) { TPTask::TPTaskState state = (*finiiter)->presentMainThread(); switch(state) { case TPTask::TPTask_Completed: delete (*finiiter); finiiter = finitasks.erase(finiiter); --mFiniTaskListCount; break; case TPTask::TPTask_ContinueChildThread: this->addTask((*finiiter)); finiiter = finitasks.erase(finiiter); --mFiniTaskListCount; break; case TPTask::TPTask_ContinueMainThread: THREAD_MUTEX_LOCK(mFinishedTaskListMutex); mFinishedTaskList.push_back((*finiiter)); THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex); ++finiiter; break; default: Assert(false); break; }; } }
注意到,此函数只针对了 mFinishedTaskList 容器中的任务进行了处理,这表示,一个并发任务得在执行了至少一次 process 之后才能与主线程交互。
6.2.4 线程池销毁
void ThreadPool::destroy() { mIsDestroyed = true; int itry = 0; while(true) { sleepms(300); itry++; std::string taskaddrs = ""; THREAD_MUTEX_LOCK(mThreadStateListMutex); int count = mAllThreadList.size(); std::list<TPThread *>::iterator itr = mAllThreadList.begin(); for(; itr != mAllThreadList.end(); ++itr) { if((*itr)) { if((*itr)->state() != TPThread::ThreadState_End) { (*itr)->sendCondSignal(); } else { count--; } } } THREAD_MUTEX_UNLOCK(mThreadStateListMutex); if(count <= 0) { break; } } THREAD_MUTEX_LOCK(mThreadStateListMutex); sleepms(100); std::list<TPThread*>::iterator itr = mAllThreadList.begin(); for(; itr != mAllThreadList.end(); ++itr) { if((*itr)) { delete (*itr); (*itr) = NULL; } } mAllThreadList.clear(); THREAD_MUTEX_UNLOCK(mThreadStateListMutex); THREAD_MUTEX_LOCK(mFinishedTaskListMutex); if(mFinishedTaskList.size() > 0) { std::list<TPTask*>::iterator finiiter = mFinishedTaskList.begin(); for(; finiiter != mFinishedTaskList.end(); ++finiiter) { delete (*finiiter); } mFinishedTaskList.clear(); mFiniTaskListCount = 0; } THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex); THREAD_MUTEX_LOCK(mBufferedTaskListMutex); if(mBufferedTaskList.size() > 0) { while(mBufferedTaskList.size() > 0) { TPTask* tptask = mBufferedTaskList.front(); mBufferedTaskList.pop(); delete tptask; } } THREAD_MUTEX_UNLOCK(mBufferedTaskListMutex); THREAD_MUTEX_DELETE(mThreadStateListMutex); THREAD_MUTEX_DELETE(mBufferedTaskListMutex); THREAD_MUTEX_DELETE(mFinishedTaskListMutex); }
线程池销毁时将会等待所有线程正常退出,所以用户重写的 process 方法需要在线程池销毁时全部退出。
7 总结
实现高效且友好的线程池确实需要注意很多细节,但总的来说,线程池技术并不包含多么高深的算法。任何线程池框架一般都只有如下的几个简单执行步骤:
- 预创建一批线程
- 添加并发任务
- 销毁所有线程