码迷,mamicode.com
首页 > 编程语言 > 详细

C++线程池实现

时间:2018-03-12 21:12:11      阅读:319      评论:0      收藏:0      [点我收藏+]

标签:通过   log   system   set   sse   red   ati   infinite   als   

面试有被问到怎么实现线程池,网上找的可以用的代码,在VS2010上测试通过,没有用到C++11,信号量也是用WINDOWS的。

线程池为了节省开辟线程耗费的资源,提前创建一批线程处于信号量等待状态,需要用的时候将任务加入队列中,发送信号量,抢占到的线程执行该任务。具体代码如下:

  1 #ifndef _ThreadPool_H_
  2 #define _ThreadPool_H_
  3 #pragma warning(disable: 4530)
  4 #pragma warning(disable: 4786)
  5 #include <cassert>
  6 #include <vector>
  7 #include <queue>
  8 #include <windows.h>
  9 #include "stdafx.h"
 10 #include <iostream>
 11 using namespace std;
 12 
 13 
 14 class ThreadJob  //工作基类
 15 {
 16 public:
 17     //供线程池调用的虚函数
 18     virtual void DoJob(void *pPara) = 0;
 19 };
 20 class ThreadPool
 21 {
 22 public:
 23     //dwNum 线程池规模
 24     ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0) 
 25     {
 26         InitializeCriticalSection(&_csThreadVector);
 27         InitializeCriticalSection(&_csWorkQueue);
 28         _EventComplete = CreateEvent(0, false, false, NULL);
 29         _EventEnd = CreateEvent(0, true, false, NULL);
 30         _SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
 31         _SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
 32         assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
 33         assert(_EventComplete != INVALID_HANDLE_VALUE);
 34         assert(_EventEnd != INVALID_HANDLE_VALUE);
 35         assert(_SemaphoreDel != INVALID_HANDLE_VALUE);
 36         AdjustSize(dwNum <= 0 ? 4 : dwNum);
 37     }
 38     ~ThreadPool()
 39     {
 40         DeleteCriticalSection(&_csWorkQueue);
 41         CloseHandle(_EventEnd);
 42         CloseHandle(_EventComplete);
 43         CloseHandle(_SemaphoreCall);
 44         CloseHandle(_SemaphoreDel);
 45         vector<ThreadItem*>::iterator iter;
 46         for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
 47         {
 48             if(*iter)
 49                 delete *iter;
 50         }
 51         DeleteCriticalSection(&_csThreadVector);
 52     }
 53 //    //调整线程池规模
 54     int  AdjustSize(int iNum)
 55     {
 56         if(iNum > 0)
 57         {
 58             ThreadItem *pNew;
 59             EnterCriticalSection(&_csThreadVector);
 60             for(int _i=0; _i<iNum; _i++)
 61             {
 62                 _ThreadVector.push_back(pNew = new ThreadItem(this)); 
 63                 assert(pNew);
 64                 pNew->_iTreadID=_i;
 65                 pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
 66                 // set priority
 67                 SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL);
 68                 assert(pNew->_Handle);
 69             }
 70             LeaveCriticalSection(&_csThreadVector);
 71         }
 72         else
 73         {
 74             iNum *= -1;
 75             ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
 76         }
 77         return (int)_lThreadNum;
 78     }
 79     //调用线程池
 80     void Call(void (*pFunc)(void  *), void *pPara = NULL)
 81     {
 82         assert(pFunc);
 83         EnterCriticalSection(&_csWorkQueue);
 84         if(_ThreadVector.empty())
 85         {
 86             printf("线程队列空\n\r");
 87         }
 88         else
 89         {
 90             printf("线程队列不空\n\r");
 91             _JobQueue.push(new JobItem(pFunc, pPara));
 92         }
 93         
 94         LeaveCriticalSection(&_csWorkQueue);
 95         ReleaseSemaphore(_SemaphoreCall, 1, NULL);
 96     }
 97     //调用线程池
 98     inline void Call(ThreadJob * p, void *pPara = NULL)
 99     {
100         Call(CallProc, new CallProcPara(p, pPara));
101     }
102     //结束线程池, 并同步等待
103     bool EndAndWait(DWORD dwWaitTime = INFINITE)
104     {
105         SetEvent(_EventEnd);
106         return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
107     }
108     //结束线程池
109     inline void End()
110     {
111         SetEvent(_EventEnd);
112     }
113     inline DWORD Size()
114     {
115         return (DWORD)_lThreadNum;
116     }
117     inline DWORD GetRunningSize()
118     {
119         return (DWORD)_lRunningNum;
120     }
121     bool IsRunning()
122     {
123         return _lRunningNum > 0;
124     }
125 protected:
126 //    //工作线程
127     static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
128     {
129         ThreadItem *pNew;
130         pNew=(ThreadItem*)lpParameter;
131         printf("threadID=%d \n\r",pNew->_iTreadID);
132         ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
133         assert(pThread);
134         ThreadPool *pThreadPoolObj = pThread->_pThis;
135         assert(pThreadPoolObj);
136         InterlockedIncrement(&pThreadPoolObj->_lThreadNum);//InterLockedIncrement 能够保证在一个线程访问变量时其它线程不能访问
137         HANDLE hWaitHandle[3];
138         hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
139         hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
140         hWaitHandle[2] = pThreadPoolObj->_EventEnd;
141         JobItem *pJob;
142         bool fHasJob;
143         for(;;)
144         {
145             
146             DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
147             printf("hh =%d\n\r",pNew->_iTreadID);
148             //响应删除线程信号
149             if(wr == WAIT_OBJECT_0 + 1)  
150                 break;
151             //从队列里取得用户作业
152             EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
153             if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
154             {
155                 pJob = pThreadPoolObj->_JobQueue.front();
156                 pThreadPoolObj->_JobQueue.pop();
157                 assert(pJob);
158             }
159             LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
160             //受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作)
161             if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)  
162             {
163                 printf("endthread =%d\n\r",pNew->_iTreadID);
164                 break;
165             }
166                 
167             if(fHasJob && pJob)
168             {
169                 InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
170                 pThread->_dwLastBeginTime = GetTickCount();
171                 pThread->_dwCount++;
172                 pThread->_fIsRunning = true;
173                 pJob->_pFunc(pJob->_pPara); //运行用户作业
174                 delete pJob; 
175                 pThread->_fIsRunning = false;
176                 InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
177             }
178         }
179         //删除自身结构
180         EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
181         pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
182         LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
183         delete pThread;
184         InterlockedDecrement(&pThreadPoolObj->_lThreadNum);
185         if(!pThreadPoolObj->_lThreadNum)  //所有线程结束
186             SetEvent(pThreadPoolObj->_EventComplete);
187         return 0;
188     }
189     //调用用户对象虚函数
190     static void CallProc(void *pPara) 
191     {
192         CallProcPara *cp = static_cast<CallProcPara *>(pPara);
193         assert(cp);
194         if(cp)
195         {
196             cp->_pObj->DoJob(cp->_pPara);
197             delete cp;
198         }
199     }
200     //用户对象结构
201     struct CallProcPara  
202     {
203         ThreadJob* _pObj;//用户对象 
204         void *_pPara;//用户参数
205         CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
206     };
207     //用户函数结构
208     struct JobItem 
209     {
210         void (*_pFunc)(void  *);//函数
211         void *_pPara; //参数
212         JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
213     };
214 //    //线程池中的线程结构
215     struct ThreadItem
216     {
217         int _iTreadID;
218         HANDLE _Handle; //线程句柄
219         ThreadPool *_pThis;  //线程池的指针
220         DWORD _dwLastBeginTime; //最后一次运行开始时间
221         DWORD _dwCount; //运行次数
222         bool _fIsRunning;
223         ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
224         ~ThreadItem()
225         {
226             if(_Handle)
227             {
228                 CloseHandle(_Handle);
229                 _Handle = NULL;
230             }
231         }
232     };
233     std::queue<JobItem *> _JobQueue;  //工作队列
234     std::vector<ThreadItem *>  _ThreadVector; //线程数据
235     CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界
236     HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号
237     long _lThreadNum, _lRunningNum; //线程数, 运行的线程数
238 };
239 #endif //_ThreadPool_H_
240 
241 void threadfunc(void *p)
242 {
243     printf("*********\n\r");
244     for (int i=0;i<10;i++)
245     {
246         for(int j=0;j<10;j++)
247         {
248             printf("%02d  ",i-j);
249         }
250         printf("\n\r");
251     }
252     printf("*********\n\r");
253     //YourClass* yourObject = (YourClass*)    p;
254     //...
255 }
256 
257 void fun2(void *p)
258 {
259     
260     printf("*********\n\r");
261     for (int i=0;i<20;i++)
262     {
263         for(int j=0;j<20;j++)
264         {
265             printf("%02d ",i);
266         }
267         Sleep(30);
268         printf("\n\r");
269         
270     }
271     //YourClass* yourObject = (YourClass*)    p;
272     //...
273     printf("*********\n\r");
274 }
275 void main()
276 {
277     //ThreadPool tp;
278     //for(int i=0; i<100; i++)
279     //    tp.Call(threadfunc);
280     ThreadPool tp(20);//20为初始线程池规模
281     Sleep(1000);
282     tp.Call(threadfunc, NULL);
283     tp.Call(fun2, NULL);
284     //tp.Call(fun2, NULL);
285     //tp.Call(fun2, NULL);
286     //tp.Call(fun2, NULL);
287 
288     for(int i=0;i<20;i++)
289     {
290         tp.End();
291     }
292 
293     Sleep(2000);
294     printf("run\n\r");
295     tp.Call(fun2, NULL);
296     Sleep(100000);
297     system("pause");
298     return ;
299 }

 

C++线程池实现

标签:通过   log   system   set   sse   red   ati   infinite   als   

原文地址:https://www.cnblogs.com/saye/p/8550576.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!