码迷,mamicode.com
首页 > 编程语言 > 详细

一个Windows C++的线程池的实现

时间:2016-04-14 13:44:14      阅读:221      评论:0      收藏:0      [点我收藏+]

标签:

此线程池所依赖的线程类,请参看《一个Windows C++的线程类实现》:

 

ThreadPoolExecutor.h

 1 #ifndef __THREAD_POOL_EXECUTOR__  
 2 #define __THREAD_POOL_EXECUTOR__  
 3   
 4 #include "Thread.h"  
 5 #include <set>  
 6 #include <list>  
 7 #include <windows.h>  
 8   
 9 class CThreadPoolExecutor  
10 {  
11 public:  
12     CThreadPoolExecutor(void);  
13     ~CThreadPoolExecutor(void);  
14   
15     /** 
16       初始化线程池,创建minThreads个线程 
17     **/  
18     bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTaskse);  
19   
20     /** 
21       执行任务,若当前任务列表没有满,将此任务插入到任务列表,返回true 
22       若当前任务列表满了,但当前线程数量小于最大线程数,将创建新线程执行此任务,返回true 
23       若当前任务列表满了,但当前线程数量等于最大线程数,将丢弃此任务,返回false 
24     **/  
25     bool Execute(Runnable * pRunnable);  
26   
27     /** 
28       终止线程池,先制止塞入任务, 
29       然后等待直到任务列表为空, 
30       然后设置最小线程数量为0, 
31       等待直到线程数量为空, 
32       清空垃圾堆中的任务 
33     **/  
34     void Terminate();  
35   
36     /** 
37       返回线程池中当前的线程数量 
38     **/  
39     unsigned int GetThreadPoolSize();  
40   
41 private:  
42     /** 
43       获取任务列表中的任务,若任务列表为空,返回NULL 
44     **/  
45     Runnable * GetTask();  
46   
47     static unsigned int WINAPI StaticThreadFunc(void * arg);  
48   
49 private:  
50     class CWorker : public CThread  
51     {  
52     public:  
53         CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask = NULL);  
54         ~CWorker();  
55         void Run();  
56   
57     private:  
58         CThreadPoolExecutor * m_pThreadPool;  
59         Runnable * m_pFirstTask;  
60         volatile bool m_bRun;  
61     };  
62   
63     typedef std::set<CWorker *> ThreadPool;  
64     typedef std::list<Runnable *> Tasks;  
65     typedef Tasks::iterator TasksItr;  
66     typedef ThreadPool::iterator ThreadPoolItr;  
67   
68     ThreadPool m_ThreadPool;  
69     ThreadPool m_TrashThread;  
70     Tasks m_Tasks;  
71   
72     CRITICAL_SECTION m_csTasksLock;  
73     CRITICAL_SECTION m_csThreadPoolLock;  
74   
75     volatile bool m_bRun;  
76     volatile bool m_bEnableInsertTask;  
77     volatile unsigned int m_minThreads;  
78     volatile unsigned int m_maxThreads;  
79     volatile unsigned int m_maxPendingTasks;  
80 };  
81   
82 #endif 

ThreadPoolExecutor .cpp

  1 #include "stdafx.h"
  2 
  3 #include "ThreadPoolExecutor.h"  
  4   
  5 CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) :   
  6 m_pThreadPool(pThreadPool),  
  7 m_pFirstTask(pFirstTask),  
  8 m_bRun(true)  
  9 {  
 10       
 11 }  
 12   
 13 CThreadPoolExecutor::CWorker::~CWorker()  
 14 {  
 15 }  
 16   
 17 /** 
 18   执行任务的工作线程。 
 19   当前没有任务时, 
 20   如果当前线程数量大于最小线程数量,减少线程, 
 21   否则,执行清理程序,将线程类给释放掉 
 22 **/  
 23 void CThreadPoolExecutor::CWorker::Run()  
 24 {  
 25     Runnable * pTask = NULL;  
 26     while(m_bRun)  
 27     {  
 28         if(NULL == m_pFirstTask)  
 29         {  
 30             pTask = m_pThreadPool->GetTask();  
 31         }  
 32         else  
 33         {  
 34             pTask = m_pFirstTask;  
 35             m_pFirstTask = NULL;  
 36         }  
 37   
 38         if(NULL == pTask)  
 39         {  
 40             EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));  
 41             if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads)  
 42             {  
 43                 ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);  
 44                 if(itr != m_pThreadPool->m_ThreadPool.end())  
 45                 {  
 46                     m_pThreadPool->m_ThreadPool.erase(itr);  
 47                     m_pThreadPool->m_TrashThread.insert(this);  
 48                 }  
 49                 m_bRun = false;  
 50             }  
 51             else  
 52             {  
 53                 ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();  
 54                 while(itr != m_pThreadPool->m_TrashThread.end())  
 55                 {  
 56                     (*itr)->Join();  
 57                     delete (*itr);  
 58                     m_pThreadPool->m_TrashThread.erase(itr);  
 59                     itr = m_pThreadPool->m_TrashThread.begin();  
 60                 }  
 61             }  
 62             LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));  
 63             continue;  
 64         }  
 65         else  
 66         {  
 67             pTask->Run();  
 68             pTask = NULL;  
 69         }  
 70     }  
 71 }  
 72   
 73 /////////////////////////////////////////////////////////////////////////////////////////////  
 74   
 75 CThreadPoolExecutor::CThreadPoolExecutor(void) :   
 76 m_bRun(false),  
 77 m_bEnableInsertTask(false)  
 78 {  
 79     InitializeCriticalSection(&m_csTasksLock);  
 80     InitializeCriticalSection(&m_csThreadPoolLock);  
 81 }  
 82   
 83 CThreadPoolExecutor::~CThreadPoolExecutor(void)  
 84 {  
 85     Terminate();  
 86     DeleteCriticalSection(&m_csTasksLock);  
 87     DeleteCriticalSection(&m_csThreadPoolLock);  
 88 }  
 89   
 90 bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks)  
 91 {  
 92     if(minThreads == 0)  
 93     {  
 94         return false;  
 95     }  
 96     if(maxThreads < minThreads)  
 97     {  
 98         return false;  
 99     }  
100     m_minThreads = minThreads;  
101     m_maxThreads = maxThreads;  
102     m_maxPendingTasks = maxPendingTasks;  
103     unsigned int i = m_ThreadPool.size();  
104     for(; i<minThreads; i++)  
105     {  
106         //创建线程  
107         CWorker * pWorker = new CWorker(this);  
108         if(NULL == pWorker)  
109         {  
110             return false;  
111         }  
112         EnterCriticalSection(&m_csThreadPoolLock);  
113         m_ThreadPool.insert(pWorker);  
114         LeaveCriticalSection(&m_csThreadPoolLock);  
115         pWorker->Start();  
116     }  
117     m_bRun = true;  
118     m_bEnableInsertTask = true;  
119     return true;  
120 }  
121   
122 bool CThreadPoolExecutor::Execute(Runnable * pRunnable)  
123 {  
124     if(!m_bEnableInsertTask)  
125     {  
126         return false;  
127     }  
128     if(NULL == pRunnable)  
129     {  
130         return false;  
131     }  
132     if(m_Tasks.size() >= m_maxPendingTasks)  
133     {  
134         if(m_ThreadPool.size() < m_maxThreads)  
135         {  
136             CWorker * pWorker = new CWorker(this, pRunnable);  
137             if(NULL == pWorker)  
138             {  
139                 return false;  
140             }  
141             EnterCriticalSection(&m_csThreadPoolLock);  
142             m_ThreadPool.insert(pWorker);  
143             LeaveCriticalSection(&m_csThreadPoolLock);  
144             pWorker->Start();  
145         }  
146         else  
147         {  
148             return false;  
149         }  
150     }  
151     else  
152     {  
153         EnterCriticalSection(&m_csTasksLock);  
154         m_Tasks.push_back(pRunnable);  
155         LeaveCriticalSection(&m_csTasksLock);  
156     }  
157     return true;  
158 }  
159   
160 Runnable * CThreadPoolExecutor::GetTask()  
161 {  
162     Runnable * Task = NULL;  
163     EnterCriticalSection(&m_csTasksLock);  
164     if(!m_Tasks.empty())  
165     {  
166         Task = m_Tasks.front();  
167         m_Tasks.pop_front();  
168     }  
169     LeaveCriticalSection(&m_csTasksLock);  
170     return Task;  
171 }  
172   
173 unsigned int CThreadPoolExecutor::GetThreadPoolSize()  
174 {  
175     return m_ThreadPool.size();  
176 }  
177   
178 void CThreadPoolExecutor::Terminate()  
179 {  
180     m_bEnableInsertTask = false;  
181     while(m_Tasks.size() > 0)  
182     {  
183         Sleep(1);  
184     }  
185     m_bRun = false;  
186     m_minThreads = 0;  
187     m_maxThreads = 0;  
188     m_maxPendingTasks = 0;  
189     while(m_ThreadPool.size() > 0)  
190     {  
191         Sleep(1);  
192     }  
193     EnterCriticalSection(&m_csThreadPoolLock);  
194     ThreadPoolItr itr = m_TrashThread.begin();  
195     while(itr != m_TrashThread.end())  
196     {  
197         (*itr)->Join();  
198         delete (*itr);  
199         m_TrashThread.erase(itr);  
200         itr = m_TrashThread.begin();  
201     }  
202     LeaveCriticalSection(&m_csThreadPoolLock);  
203 }  

调用:

 1 #include "stdafx.h"
 2 #include "Thread.h"
 3 #include "ThreadPoolExecutor.h"
 4 
 5 class R : public Runnable
 6 {
 7 public:
 8     ~R()
 9     {
10         printf("~R/n");
11     }
12     void Run()
13     {
14         printf("Hello World\n");
15     }
16 };
17 
18 
19 int _tmain(int argc, _TCHAR* argv[])
20 {
21     /*R r;
22     CThread * t = NULL;
23     t = new CThread(&r);
24     t->Start();
25     t->Join();
26     getchar();*/
27     CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor();
28     pExecutor->Init(1, 10, 50);
29     R r;
30     for(int i=0;i<100;i++)
31     {
32         while(!pExecutor->Execute(&r))
33         {
34         }
35     }
36     pExecutor->Terminate();
37     delete pExecutor;
38     getchar();
39     return 0;
40     return 0;
41 }

 

from:http://blog.csdn.net/huyiyang2010/article/details/5809919

一个Windows C++的线程池的实现

标签:

原文地址:http://www.cnblogs.com/chuyibky/p/5390514.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!