标签:
1、为什么需要线程池?
部分应用程序需要执行很多细小的任务,对于每个任务都创建一个线程来完成,任务完成后销毁线程,而这就会产生一个问题:当执行的任务所需要的时间T1小于等于创建线程时间T2和销毁线程时间T3总和时即T1 <= T2 + T3,应用处理任务的响应能力会大大减弱,从而影响了应用程序性能,为了解决这类问题,线程池技术提供了很好的解决方案。线程池顾名思义就是把线程资源池化,在应用启动时一次性创建合适数量的线程,当需要执行任务时就从线程池中分配一个已经创建好的线程来执行,执行完在把线程归还,只在应用停时再一次性销毁所有的线程。
2、线程池的基本组成部分
一个简单的线程池至少包括下列的组成部分:
1)线程池管理器(ThreadPool):用于创建一个线程池对象并管理线程池,如分配任务给某个空闲线程,查看当前线程状态等等的操作。
2)工作线程(WorkThread):线程池中线程,可能是挂起,可能是被分配了任务,若然是挂起,则用一个信号量去阻塞直到有任务分配。
3)任务接口(Task):每个任务必须实行的接口,以供工作线程调度任务执行。
4) 任务调度(本例程无实现)
3、Unix下的线程池实现
将给大家展示的线程池实现的类如下,含有比较多的面向对象设计思想。
主要是每个工作线程类对象管理一个线程,一个线程池管理多个工作线程类。
1)CMutex:互斥量类,里面只有一个pthread_mutex_t的私有成员 对POSIX互斥量的C++封装 这个互斥量在ThreadPool是一个私有成员。
用于CAutoMutex的构造。
2)CAutoMutex:继承CMutex
利用C++构造函数和析构函数的特性,实现CAutoMutex变量生命周期内对全局锁进行加锁解锁操作,以保持同步。(互斥量属于线程池的)
具体如下:
用于局部函数的开头生成一个所有线程共享的互斥类对象,并lock(分配任务和查看当前忙碌线程数)
以免期间切换线程进入这个函数造成一个任务被分派给多个任务或者在查看当前工作线程数的时候返回的数据不正确 然后在局部函数退出时利用类对象的自动调用
析构函数 在函数内解锁unlock这个线程池的互斥量。
3)Task:任务的抽象基类,任何具体的任务都要继承该类,并实现自己的void * run()函数,
即线程运行的函数。在这里继承的类定义实现在main函数里面。抽象类不能被实例化,但可以声明为指针指向继承的子类。
4)CThread:线程基类,对POSIX线程的C++封装。包括线程的各种状态 >=0为正常状态 <0为错误
running idle quited 以及出错代码.
5)CWorkThread:工作线程类,实现了执行任务的基本接口
6)CThreadPool:线程池管理类,实现线程的创建管理和任务调度。
关于类的设计方面:
CMutex负责创建一个互斥量,用于对线程间共享数据进行加锁。
在CMutex构造函数中新建这个变量,在析构函数中销毁这个变量。
以前单纯在一个源文件写程序的时候,只需把互斥量定义成全局变量即可。
这里互斥量只作为线程池的私有成员使用,为主线程独享的。用于在读取运行线程数量count和分配新任务前postback进行加锁。
所以要用到mutex_mutexattr_t 设置这个互斥量属性并初始化互斥量达成一个的效果。
在函数的尾部记得调用destroy这个变量。
CAuToMutex利用了基类先调用构造函数和最后调用析构函数的特点。
CAuToMutex封装了CMutex 在构造函数中对互斥量加锁,析构函数解锁。
CAuToMutex用在局部函数中,在函数结束的时候自动析构即可达成解锁效果。
Task是任务类,是一个纯虚函数,只有一个void* run()
之所以是void* 是因为线程的启动函数形式是 void* fucntionname(void*)
Thread类: 定义了一个枚举类描述线程的状态。
以及封装了线程的基本操作函数。
如create join exit detached yield
线程的创建也是用 pthread_attr_t 这个线程状态变量来一次设置的。
WorkThread类:继承Thread类 实际工作实例化的类。其设计关键在于其信号量这个私有成员。
包含对线程的初始化,判断线程状态以确定下一步的执行,设置线程的任务。
在初始化的时候信号量为0 线程处于挂起状态,阻塞于信号量,直到线程池分派新的任务到来。
ps:信号量是线程间共享的。但这里把信号量设置成类的私有成员间接使信号量变成每个类对象的独享成员。
因为每一个CWorkThread对象管理一个线程,即使信号量本身是线程间共享,但被类的不可访问其他对象的私有成员所限制。
这里要说明的一个关系:逻辑上是每个WorkThread类对象管理一个pthread线程,但实际上这些WorkThread类对象是生存在
主线程main函数中的,一旦创建了线程,它们之间就是在不同的线程中,线程池仅是通过类对象的变量值去得知这个类对象创建的
线程到底是处于什么工作状态。即主线程是一个线程池对象+n个工作线程类 然后其他就是新生成的线程.
流程是先调用initialize函数,在init函数里面设置线程的运行状态(是否运行一次退出,是否分离(detached))
设置信号量(很重要 在线程没任务的时候发生阻塞等待新任务的时候再运行)
然后调用create函数 参数为doRun函数和对应的一个void *ptr参数指针,使得线程挂起。
这里设计成doRun实际上并不是一个真正的线程启动执行函数,
*ptr其实是设计成*this这个指针,在doRun里面设置*this->setTask后调用 Task->run函数
这个才是真正执行的动作。但执行Task->run函数实际上还是没能运行,因为信号量依然为0,所以阻塞在
while循环的sem_wait
要调用类的run函数的sem_post给信号量+1 解除阻塞 通知Task->run可以运行。
此时信号量又变回了0 继续阻塞等待下一个任务到来。
所以initialize函数对于每个线程只会调用一次,调用的时候,pthread_create只是唯一一次生成一个回调函数为doRun的线程,
并阻塞于doRun函数中等待setTask的函数调用以解除阻塞执行任务,所以巧妙的地方在于此,启动执行函数中再执行函数,
来解决线程初始化的之后不能再执行启动执行函数的问题。
关于doRun为什么是保护静态函数:http://blog.csdn.net/luo6620378xu/article/details/8521940
http://www.cnblogs.com/pure/archive/2010/10/13/1850531.html 使用友元也可以。(友元不会被加上类名 而且其作用域与this指针一样)
1.doRun函数是线程函数。线程函数的原型是:void* (*)(void*) 而类的成员函数会在其编译的时候类型会被加上类名 这里是CWorkThread:: 不符合线程启动函数的函数格式 静态函数则不会被加上类名
2.一般来说,C++的类成员函数不能作为线程函数。这是因为在类中定义的成员函数,编译器会给其加
上this指针,而静态函数里面不能接收this指针。可以通过传递this指针给线程函数的形参来解决这个问题。
同时,把类对象作为指针传递给这个类的静态函数好处是,可以在这个静态函数里面直接访问所在对象的成员变量(类的特点)
这个类还有一个要注意的地方是,要注意为status赋值,线程的状态是十分重要的。
status:构造函数后(uninitialize)->initial(IDLE)->run(RUNNING)->Task->run(IDLE)。
还有finish函数根据线程状态来执行对应处理也是很巧妙,若然线程是分离(detached)的,则不符合finish的处理要求
设置AuToFinish使线程是分离的,这样使得线程在执行一次之后就会退出doRun函数结束,然后就自动释放线程所占的资源内存。
CThreadPool类 一次create多个线程 每个线程都在Run的循环内 分配任务到空闲的线程,搜索方式为从已经创建的第一个线程开始寻找为IDLE状态的线程。
找不到则继续新建一个线程。
注意获取现有线程数这个操作要先获取互斥量。
Demo实例 CTest继承了CTask 为一个执行103秒的任务。 3 + 100
这个实例是没有终止的,会一直增加任务,直到任务数目为26的时候维持平衡。
可以在主循环中增加一个级数设置循环退出条件。
waitAliveFinish的主动调用通知线程结束后,在main函数的最后其析构函数也会调用,所以要判断m_pThreads是否已经为NULL
否则会出现二次delete.
finish函数会再次将信号量+1 但doRun函数的sem_wait被唤醒后 之后没有执行任务,因为m_nStatus为IDLE(每次执行完任务后会设置为IDLE)
之后就可以退出doRun函数了。
好好体会面向对象的思想 对象没销毁,可以管理其内部成员先销毁。
1.AutoMutex.h
1 #ifndef AUTO_MUTEX_H 2 #define AUTO_MUTEX_H 3 4 #include "Mutex.h" 5 6 class CAutoMutex 7 { 8 9 private: 10 CMutex * m_pMutex; 11 12 public: 13 CAutoMutex(CMutex * pMutex) 14 { 15 m_pMutex = pMutex; 16 m_pMutex->lock(); 17 } 18 19 ~CAutoMutex() 20 { 21 m_pMutex->unlock(); 22 } 23 }; 24 25 #endif
2.CMutex.h
1 #ifndef MUTEX_H 2 #define MUTEX_H 3 4 #include <pthread.h> 5 #include <iostream> 6 7 class CMutex 8 { 9 public: 10 CMutex(int nShared = PTHREAD_PROCESS_PRIVATE,int ntype = 11 PTHREAD_MUTEX_NORMAL); 12 ~CMutex(); 13 14 int lock(); 15 int unlock(); 16 17 private: 18 pthread_mutex_t myMutex; 19 20 CMutex(const CMutex & cMutex) 21 { 22 } 23 24 CMutex & operator = (const CMutex & cMutex) 25 { 26 return *this; 27 } 28 }; 29 30 inline int CMutex::lock() 31 { 32 return pthread_mutex_lock(&myMutex); 33 } 34 35 inline int CMutex::unlock() 36 { 37 return pthread_mutex_unlock(&myMutex); 38 } 39 40 #endif
3.CMutex.cpp
1 #include "Mutex.h" 2 3 CMutex::CMutex(int nShared, int nType) 4 { 5 pthread_mutexattr_t attr; 6 pthread_mutexattr_init(&attr); 7 pthread_mutexattr_setpshared(&attr,nShared); 8 pthread_mutexattr_settype(&attr,nType); 9 pthread_mutex_init(&myMutex,&attr); 10 pthread_mutexattr_destroy(&attr); 11 } 12 13 CMutex::~CMutex() 14 { 15 pthread_mutex_destroy(&myMutex); 16 }
3.Thread.h
1 #ifndef THREAD_H 2 #define THREAD_H 3 4 #include <pthread.h> 5 6 typedef void* (*pFuncThreadStart) (void *); 7 8 class CThread 9 { 10 public: 11 enum EThreadState 12 { 13 ERR_ALREADERY_INITIALIZED = -6, 14 ERR_AT_CREATE_THREAD = -5, 15 ERR_AT_CREATE_SEM = -4, 16 ERR_NO_TASK = -3, 17 ERR_NOT_IDLE = -2, 18 UNINITIALIZED = -1, 19 IDLE = 0, 20 RUNNING = 1, 21 QUITED = 9 22 }; 23 24 CThread(); 25 virtual ~ CThread(); 26 27 int create(pFuncThreadStart pFuncStartRoutine, void *pArg, 28 bool bDetached = false, bool bSetScope = false); 29 int detach(); 30 int join(void ** pRetValue = NULL); 31 void exit(void * pRetValue = NULL); 32 void yield(); 33 void reset(); 34 bool isAlive(); 35 pthread_t getThreadId(); 36 private: 37 pthread_t myThreadId; 38 }; 39 40 inline pthread_t CThread::getThreadId() 41 { 42 return myThreadId; 43 } 44 45 inline int CThread::detach() 46 { 47 return pthread_detach(myThreadId); 48 } 49 50 inline int CThread::join(void **pRetValue) 51 { 52 return pthread_join(myThreadId,pRetValue); 53 } 54 55 inline void CThread::exit(void *pRetValue) 56 { 57 if (isAlive()) 58 pthread_exit(pRetValue); 59 } 60 61 inline bool CThread::isAlive() 62 { 63 if (pthread_equal(myThreadId,pthread_self()) != 0) 64 return true; 65 else 66 return false; 67 } 68 69 inline void CThread::reset() 70 { 71 join(); 72 myThreadId = -1; 73 } 74 75 #endif
4.Thread.cpp
1 #include "Thread.h" 2 3 CThread::CThread():myThreadId(-1) 4 { 5 6 } 7 8 CThread::~CThread() 9 { 10 11 } 12 13 int CThread::create(pFuncThreadStart pFuncStartRoutine, void *pArg, 14 bool bDetached, bool bSetScope) 15 { 16 pthread_attr_t sThread_attr; 17 int nStatus; 18 nStatus = pthread_attr_init(&sThread_attr); 19 if (nStatus != 0) 20 return -1; 21 22 if (bDetached) 23 { 24 nStatus = pthread_attr_setdetachstate(&sThread_attr, 25 PTHREAD_CREATE_DETACHED); 26 if (nStatus != 0) 27 { 28 pthread_attr_destroy(&sThread_attr); 29 return -1; 30 } 31 } 32 33 nStatus = pthread_create(&myThreadId, &sThread_attr,pFuncStartRoutine, 34 pArg); 35 pthread_attr_destroy(&sThread_attr); 36 return nStatus; 37 }
5.Task.h
1 #ifndef TASK_H 2 #define TASK_H 3 4 class CTask 5 { 6 public: 7 virtual void *run() = 0; 8 }; 9 10 #endif
6.ThreadPool.h
1 #ifndef THREADPOOL_H 2 #define THREADPOOL_H 3 4 #include <string.h> 5 #include "WorkThread.h" 6 #include "Mutex.h" 7 #include "AuToMutex.h" 8 9 class CThreadPool 10 { 11 public: 12 CThreadPool(int nPoolSize = 128, int nInitializeCount = 0); 13 ~CThreadPool(); 14 15 bool postTask(CTask* pTask); 16 int getPoolSize(); 17 int getInitializeCount(); 18 int getAliveCount(); 19 void waitAliveFinish(); 20 21 private: 22 int m_nPoolSize; 23 int m_nInitializeCount; 24 int m_nAliveCount; 25 CWorkThread ** m_pThreads; 26 CMutex m_cMutex; 27 }; 28 29 inline int CThreadPool::getPoolSize() 30 { 31 return m_nPoolSize; 32 } 33 34 inline int CThreadPool::getInitializeCount() 35 { 36 return m_nInitializeCount; 37 } 38 39 inline int CThreadPool::getAliveCount() 40 { 41 CAutoMutex cAutoMutex(&m_cMutex); 42 return m_nAliveCount; 43 } 44 45 #endif
7.ThreadPool.cpp
1 #include <iostream> 2 #include "ThreadPool.h" 3 using namespace std; 4 CThreadPool::CThreadPool(int nPoolSize, int nInitializeCount) 5 :m_nPoolSize(nPoolSize),m_nInitializeCount(nInitializeCount), 6 m_nAliveCount(0),m_pThreads(NULL) 7 { 8 m_pThreads = new CWorkThread* [nPoolSize]; 9 10 if (NULL == m_pThreads) 11 return; 12 13 memset(m_pThreads,0,sizeof(CWorkThread *) *nPoolSize); 14 15 for(int i=0; i<m_nInitializeCount; ++i) 16 { 17 m_pThreads[i] = new CWorkThread(i+1); 18 if (NULL == m_pThreads) 19 break; 20 if (m_pThreads[i]->initialize() != CThread::IDLE) 21 break; 22 23 ++m_nAliveCount; 24 } 25 } 26 27 CThreadPool::~CThreadPool() 28 { 29 if (NULL == m_pThreads) 30 return; 31 for (int i=0; i< m_nAliveCount; ++i) 32 { 33 if (NULL == m_pThreads[i]) 34 continue; 35 m_pThreads[i]->finish(); 36 delete m_pThreads[i]; 37 m_pThreads[i] = NULL; 38 } 39 40 delete []m_pThreads; 41 m_pThreads = NULL; 42 } 43 44 45 bool CThreadPool::postTask(CTask* pTask) 46 { 47 bool bPostSuccess = false; 48 CAutoMutex cAutoMutex(&m_cMutex); 49 50 for (int i=0; i<m_nAliveCount;++i) 51 { 52 if (m_pThreads[i]->getStatus() == CThread::IDLE) 53 { 54 m_pThreads[i]->setTask(pTask); 55 m_pThreads[i]->run(); 56 bPostSuccess = true; 57 break; 58 } 59 } 60 61 62 if (!bPostSuccess && m_nAliveCount < m_nPoolSize) 63 { 64 m_pThreads[m_nAliveCount] = new CWorkThread(m_nAliveCount +1); 65 if (m_pThreads[m_nAliveCount] != NULL ) 66 { 67 if (m_pThreads[m_nAliveCount]->initialize() == CThread::IDLE) 68 { 69 m_pThreads[m_nAliveCount]->setTask(pTask); 70 m_pThreads[m_nAliveCount]->run(); 71 ++m_nAliveCount; 72 bPostSuccess = true; 73 } 74 } 75 } 76 return bPostSuccess; 77 } 78 79 80 void CThreadPool::waitAliveFinish() 81 { 82 if (NULL == m_pThreads) 83 return; 84 85 cout<<"need to be delete number is "<<m_nAliveCount<<endl; 86 87 for (int i=0; i<m_nAliveCount;++i) 88 { 89 if (NULL == m_pThreads[i]) 90 continue; 91 m_pThreads[i]->finish(); 92 delete m_pThreads[i]; 93 m_pThreads[i] == NULL; 94 cout<<"delete No. "<<i<<endl; 95 } 96 97 delete [] m_pThreads; 98 m_pThreads = NULL; 99 m_nAliveCount = 0; 100 }
8.WorkThread.h
1 #ifndef WORKTHREAD_H 2 #define WORKTHREAD_H 3 4 #include <semaphore.h> 5 #include "Thread.h" 6 #include "Task.h" 7 8 class CWorkThread:public CThread 9 { 10 public: 11 CWorkThread(int nNo = 0); 12 ~CWorkThread(); 13 14 int getNo(); 15 int getStatus(); 16 17 int initialize(); 18 void setTask(CTask * pTask); 19 void setAutoFinish(); 20 int run(); 21 22 void finish(); 23 24 protected: 25 static void *doRun(void *pContext); 26 int m_nNo; 27 int m_nStatus; 28 sem_t * m_pSem; 29 bool m_bNeedQuit; 30 bool m_bAutoFinish; 31 CTask * m_pTask; 32 }; 33 34 inline void CWorkThread::setTask(CTask *pTask) 35 { 36 m_pTask = pTask; 37 } 38 39 inline int CWorkThread::getStatus() 40 { 41 return m_nStatus; 42 } 43 44 inline void CWorkThread::setAutoFinish() 45 { 46 m_bAutoFinish = true; 47 } 48 49 inline int CWorkThread::getNo() 50 { 51 return m_nNo; 52 } 53 54 #endif
9.WorkThread.cpp
1 #include "WorkThread.h" 2 3 CWorkThread::CWorkThread(int nNo):m_pTask(NULL),m_nStatus(UNINITIALIZED), 4 m_bNeedQuit(false),m_bAutoFinish(false),m_nNo(nNo),m_pSem(NULL) 5 { 6 7 } 8 9 CWorkThread::~CWorkThread() 10 { 11 finish(); 12 } 13 14 int CWorkThread::initialize() 15 { 16 m_pSem = new sem_t; 17 if (m_nStatus != UNINITIALIZED && m_nStatus != QUITED) 18 { 19 return ERR_ALREADERY_INITIALIZED; 20 } 21 22 if (sem_init(m_pSem,0,0) < 0) 23 return ERR_AT_CREATE_SEM; 24 25 if (create(&doRun, (void*) this) < 0) 26 { 27 return ERR_AT_CREATE_THREAD; 28 } 29 30 if (m_bNeedQuit) 31 { 32 m_bNeedQuit = false; 33 } 34 35 if (m_bAutoFinish) 36 { 37 m_bAutoFinish = false; 38 } 39 40 m_nStatus = IDLE; 41 return m_nStatus; 42 } 43 44 int CWorkThread::run() 45 { 46 if (m_nStatus != IDLE) 47 { 48 return ERR_NOT_IDLE; 49 } 50 51 if (NULL == m_pTask) 52 { 53 return ERR_NO_TASK; 54 } 55 56 m_nStatus = RUNNING; 57 sem_post(m_pSem); 58 return m_nStatus; 59 } 60 61 void CWorkThread::finish() 62 { 63 if (m_nStatus!= UNINITIALIZED && m_nStatus != QUITED) 64 { 65 m_bNeedQuit = true; 66 sem_post(m_pSem); 67 reset(); 68 sem_destroy(m_pSem); 69 delete m_pSem; 70 m_pSem = NULL; 71 } 72 } 73 74 void * CWorkThread::doRun(void *pArg) 75 { 76 CWorkThread * pWorkThread = (CWorkThread *)pArg; 77 CTask* pTask = pWorkThread->m_pTask; 78 79 while (!pWorkThread->m_bNeedQuit) 80 { 81 sem_wait(pWorkThread->m_pSem); 82 if (RUNNING == pWorkThread->m_nStatus) 83 { 84 if (NULL == pTask) 85 { 86 pWorkThread->m_nStatus = ERR_NO_TASK; 87 } 88 else 89 { 90 pTask->run(); 91 pWorkThread->m_nStatus = IDLE; 92 } 93 } 94 95 if (pWorkThread->m_bAutoFinish) 96 { 97 pWorkThread->detach(); 98 break; 99 } 100 } 101 102 pWorkThread->m_nStatus = QUITED; 103 return (void *)0; 104 }
10.main.cpp
1 #include <iostream> 2 #include "ThreadPool.h" 3 4 using namespace std; 5 6 class CTest : public CTask 7 { 8 public: 9 void * run() 10 { 11 int nCount = 0; 12 while (true) 13 { 14 sleep(1); 15 cout << "[" << ++nCount << "] sleep ..." << endl; 16 if (nCount >= 3) 17 { 18 break; 19 } 20 } 21 sleep(30); 22 } 23 }; 24 25 int main() 26 { 27 CThreadPool cThreadPool(128,1); 28 CTest cTest; 29 cThreadPool.postTask(&cTest); 30 int i=0; 31 while (++i<5) 32 { 33 sleep(4); 34 cout << "Current AliveCount = " << cThreadPool.getAliveCount() << endl; 35 cThreadPool.postTask(&cTest); 36 cout << "Add one task." << endl; 37 } 38 cThreadPool.waitAliveFinish(); 39 cout<<cThreadPool.getAliveCount(); 40 41 }
标签:
原文地址:http://www.cnblogs.com/lewiskyo/p/4214497.html