面试有被问到怎么实现线程池,网上找的可以用的代码,在VS2010上测试通过,没有用到C++11,信号量也是用WINDOWS的。
线程池为了节省开辟线程耗费的资源,提前创建一批线程处于信号量等待状态,需要用的时候将任务加入队列中,发送信号量,抢占到的线程执行该任务。具体代码如下:
1 #ifndef _ThreadPool_H_ 2 #define _ThreadPool_H_ 3 #pragma warning(disable: 4530) 4 #pragma warning(disable: 4786) 5 #include <cassert> 6 #include <vector> 7 #include <queue> 8 #include <windows.h> 9 #include "stdafx.h" 10 #include <iostream> 11 using namespace std; 12 13 14 class ThreadJob //工作基类 15 { 16 public: 17 //供线程池调用的虚函数 18 virtual void DoJob(void *pPara) = 0; 19 }; 20 class ThreadPool 21 { 22 public: 23 //dwNum 线程池规模 24 ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0) 25 { 26 InitializeCriticalSection(&_csThreadVector); 27 InitializeCriticalSection(&_csWorkQueue); 28 _EventComplete = CreateEvent(0, false, false, NULL); 29 _EventEnd = CreateEvent(0, true, false, NULL); 30 _SemaphoreCall = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL); 31 _SemaphoreDel = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL); 32 assert(_SemaphoreCall != INVALID_HANDLE_VALUE); 33 assert(_EventComplete != INVALID_HANDLE_VALUE); 34 assert(_EventEnd != INVALID_HANDLE_VALUE); 35 assert(_SemaphoreDel != INVALID_HANDLE_VALUE); 36 AdjustSize(dwNum <= 0 ? 4 : dwNum); 37 } 38 ~ThreadPool() 39 { 40 DeleteCriticalSection(&_csWorkQueue); 41 CloseHandle(_EventEnd); 42 CloseHandle(_EventComplete); 43 CloseHandle(_SemaphoreCall); 44 CloseHandle(_SemaphoreDel); 45 vector<ThreadItem*>::iterator iter; 46 for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++) 47 { 48 if(*iter) 49 delete *iter; 50 } 51 DeleteCriticalSection(&_csThreadVector); 52 } 53 // //调整线程池规模 54 int AdjustSize(int iNum) 55 { 56 if(iNum > 0) 57 { 58 ThreadItem *pNew; 59 EnterCriticalSection(&_csThreadVector); 60 for(int _i=0; _i<iNum; _i++) 61 { 62 _ThreadVector.push_back(pNew = new ThreadItem(this)); 63 assert(pNew); 64 pNew->_iTreadID=_i; 65 pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL); 66 // set priority 67 SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL); 68 assert(pNew->_Handle); 69 } 70 LeaveCriticalSection(&_csThreadVector); 71 } 72 else 73 { 74 iNum *= -1; 75 ReleaseSemaphore(_SemaphoreDel, iNum > _lThreadNum ? _lThreadNum : iNum, NULL); 76 } 77 return (int)_lThreadNum; 78 } 79 //调用线程池 80 void Call(void (*pFunc)(void *), void *pPara = NULL) 81 { 82 assert(pFunc); 83 EnterCriticalSection(&_csWorkQueue); 84 if(_ThreadVector.empty()) 85 { 86 printf("线程队列空\n\r"); 87 } 88 else 89 { 90 printf("线程队列不空\n\r"); 91 _JobQueue.push(new JobItem(pFunc, pPara)); 92 } 93 94 LeaveCriticalSection(&_csWorkQueue); 95 ReleaseSemaphore(_SemaphoreCall, 1, NULL); 96 } 97 //调用线程池 98 inline void Call(ThreadJob * p, void *pPara = NULL) 99 { 100 Call(CallProc, new CallProcPara(p, pPara)); 101 } 102 //结束线程池, 并同步等待 103 bool EndAndWait(DWORD dwWaitTime = INFINITE) 104 { 105 SetEvent(_EventEnd); 106 return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0; 107 } 108 //结束线程池 109 inline void End() 110 { 111 SetEvent(_EventEnd); 112 } 113 inline DWORD Size() 114 { 115 return (DWORD)_lThreadNum; 116 } 117 inline DWORD GetRunningSize() 118 { 119 return (DWORD)_lRunningNum; 120 } 121 bool IsRunning() 122 { 123 return _lRunningNum > 0; 124 } 125 protected: 126 // //工作线程 127 static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL) 128 { 129 ThreadItem *pNew; 130 pNew=(ThreadItem*)lpParameter; 131 printf("threadID=%d \n\r",pNew->_iTreadID); 132 ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter); 133 assert(pThread); 134 ThreadPool *pThreadPoolObj = pThread->_pThis; 135 assert(pThreadPoolObj); 136 InterlockedIncrement(&pThreadPoolObj->_lThreadNum);//InterLockedIncrement 能够保证在一个线程访问变量时其它线程不能访问 137 HANDLE hWaitHandle[3]; 138 hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall; 139 hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel; 140 hWaitHandle[2] = pThreadPoolObj->_EventEnd; 141 JobItem *pJob; 142 bool fHasJob; 143 for(;;) 144 { 145 146 DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE); 147 printf("hh =%d\n\r",pNew->_iTreadID); 148 //响应删除线程信号 149 if(wr == WAIT_OBJECT_0 + 1) 150 break; 151 //从队列里取得用户作业 152 EnterCriticalSection(&pThreadPoolObj->_csWorkQueue); 153 if(fHasJob = !pThreadPoolObj->_JobQueue.empty()) 154 { 155 pJob = pThreadPoolObj->_JobQueue.front(); 156 pThreadPoolObj->_JobQueue.pop(); 157 assert(pJob); 158 } 159 LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue); 160 //受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作) 161 if(wr == WAIT_OBJECT_0 + 2 && !fHasJob) 162 { 163 printf("endthread =%d\n\r",pNew->_iTreadID); 164 break; 165 } 166 167 if(fHasJob && pJob) 168 { 169 InterlockedIncrement(&pThreadPoolObj->_lRunningNum); 170 pThread->_dwLastBeginTime = GetTickCount(); 171 pThread->_dwCount++; 172 pThread->_fIsRunning = true; 173 pJob->_pFunc(pJob->_pPara); //运行用户作业 174 delete pJob; 175 pThread->_fIsRunning = false; 176 InterlockedDecrement(&pThreadPoolObj->_lRunningNum); 177 } 178 } 179 //删除自身结构 180 EnterCriticalSection(&pThreadPoolObj->_csThreadVector); 181 pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread)); 182 LeaveCriticalSection(&pThreadPoolObj->_csThreadVector); 183 delete pThread; 184 InterlockedDecrement(&pThreadPoolObj->_lThreadNum); 185 if(!pThreadPoolObj->_lThreadNum) //所有线程结束 186 SetEvent(pThreadPoolObj->_EventComplete); 187 return 0; 188 } 189 //调用用户对象虚函数 190 static void CallProc(void *pPara) 191 { 192 CallProcPara *cp = static_cast<CallProcPara *>(pPara); 193 assert(cp); 194 if(cp) 195 { 196 cp->_pObj->DoJob(cp->_pPara); 197 delete cp; 198 } 199 } 200 //用户对象结构 201 struct CallProcPara 202 { 203 ThreadJob* _pObj;//用户对象 204 void *_pPara;//用户参数 205 CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { }; 206 }; 207 //用户函数结构 208 struct JobItem 209 { 210 void (*_pFunc)(void *);//函数 211 void *_pPara; //参数 212 JobItem(void (*pFunc)(void *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { }; 213 }; 214 // //线程池中的线程结构 215 struct ThreadItem 216 { 217 int _iTreadID; 218 HANDLE _Handle; //线程句柄 219 ThreadPool *_pThis; //线程池的指针 220 DWORD _dwLastBeginTime; //最后一次运行开始时间 221 DWORD _dwCount; //运行次数 222 bool _fIsRunning; 223 ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { }; 224 ~ThreadItem() 225 { 226 if(_Handle) 227 { 228 CloseHandle(_Handle); 229 _Handle = NULL; 230 } 231 } 232 }; 233 std::queue<JobItem *> _JobQueue; //工作队列 234 std::vector<ThreadItem *> _ThreadVector; //线程数据 235 CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界 236 HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号 237 long _lThreadNum, _lRunningNum; //线程数, 运行的线程数 238 }; 239 #endif //_ThreadPool_H_ 240 241 void threadfunc(void *p) 242 { 243 printf("*********\n\r"); 244 for (int i=0;i<10;i++) 245 { 246 for(int j=0;j<10;j++) 247 { 248 printf("%02d ",i-j); 249 } 250 printf("\n\r"); 251 } 252 printf("*********\n\r"); 253 //YourClass* yourObject = (YourClass*) p; 254 //... 255 } 256 257 void fun2(void *p) 258 { 259 260 printf("*********\n\r"); 261 for (int i=0;i<20;i++) 262 { 263 for(int j=0;j<20;j++) 264 { 265 printf("%02d ",i); 266 } 267 Sleep(30); 268 printf("\n\r"); 269 270 } 271 //YourClass* yourObject = (YourClass*) p; 272 //... 273 printf("*********\n\r"); 274 } 275 void main() 276 { 277 //ThreadPool tp; 278 //for(int i=0; i<100; i++) 279 // tp.Call(threadfunc); 280 ThreadPool tp(20);//20为初始线程池规模 281 Sleep(1000); 282 tp.Call(threadfunc, NULL); 283 tp.Call(fun2, NULL); 284 //tp.Call(fun2, NULL); 285 //tp.Call(fun2, NULL); 286 //tp.Call(fun2, NULL); 287 288 for(int i=0;i<20;i++) 289 { 290 tp.End(); 291 } 292 293 Sleep(2000); 294 printf("run\n\r"); 295 tp.Call(fun2, NULL); 296 Sleep(100000); 297 system("pause"); 298 return ; 299 }