标签:
BaseTask.h 任务基类
1 #ifndef MYTASK_H 2 #define MYTASK_H 3 #include "BaseTask.h" 4 5 class MyTask : public BaseTask 6 { 7 public: 8 virtual void run(void); 9 }; 10 11 #endif
MyTask,h 其中一个实现的任务类
1 #ifndef MYTASK_H 2 #define MYTASK_H 3 #include "BaseTask.h" 4 5 class MyTask : public BaseTask 6 { 7 public: 8 virtual void run(void); 9 }; 10 11 #endif
MyTask.cpp
1 #include "MyTask.h" 2 3 void MyTask::run(void) 4 { 5 cout<<"Hello MyTask"<<endl; 6 }
Mutex.h 封装了的互斥类
1 #ifndef MUTEX_H 2 #define MUTEX_H 3 4 #include <iostream> 5 #include <pthread.h> 6 using namespace std; 7 8 class Mutex 9 { 10 public: 11 Mutex(); 12 void Lock(); 13 void Unlock(); 14 15 private: 16 pthread_mutex_t mutex; 17 }; 18 19 #endif
Mutex.cpp
1 #include "Mutex.h" 2 3 Mutex::Mutex() 4 { 5 pthread_mutex_init(&mutex,NULL); 6 } 7 8 void Mutex::Lock() 9 { 10 pthread_mutex_lock(&mutex); 11 } 12 13 void Mutex::Unlock() 14 { 15 pthread_mutex_unlock(&mutex); 16 }
MyThread.h 线程类
1 #ifndef MYTHREAD_H 2 #define MYTHREAD_H 3 4 #include <iostream> 5 #include <pthread.h> 6 #include <semaphore.h> 7 #include "BaseTask.h" 8 9 10 // 前置定义 11 class MyThreadPool; 12 13 class MyThread 14 { 15 public: 16 MyThread(MyThreadPool* mtp); 17 void Set_Task(BaseTask* task); 18 void Start_Task(); 19 /* 线程启动函数 必须写成静态成员函数 传入的参数为类对象自己*/ 20 static void* Start_Func(void* arg); 21 /* 完成一个任务后 询问线程池管理器是否有未完成的任务*/ 22 bool Fetch_Task(); 23 /* 无更多任务 让自己进入线程池栈*/ 24 void Recycle(); 25 private: 26 /* 用于挂起线程 */ 27 sem_t sem; 28 pthread_t tid; 29 BaseTask* task; 30 MyThreadPool* mtp; 31 }; 32 33 #endif
MyThread.cpp
1 #include "MyThreadPool.h" 2 #include "MyThread.h" 3 4 5 MyThread::MyThread(MyThreadPool* mtp) 6 { 7 sem_init(&sem,0,0); 8 this->mtp = mtp; 9 pthread_create(&tid,NULL,Start_Func,(void*)this); 10 } 11 12 void MyThread::Set_Task(BaseTask* task) 13 { 14 this->task = task; 15 } 16 17 void MyThread::Start_Task() 18 { 19 sem_post(&sem); 20 } 21 22 void* MyThread::Start_Func(void* arg) 23 { 24 MyThread* mt = (MyThread*) arg; 25 while(1) 26 { 27 sem_wait(&mt->sem); 28 mt->task->run(); 29 if(mt->Fetch_Task()) 30 mt->Start_Task(); 31 else 32 mt->Recycle(); 33 } 34 } 35 36 bool MyThread::Fetch_Task() 37 { 38 return mtp->FetchTask(this); 39 } 40 41 void MyThread::Recycle() 42 { 43 mtp->Recycle(this); 44 }
MyThreadPool.h 线程池类
1 #ifndef MYTHREADPOOL_H 2 #define MYTHREADPOOL_H 3 4 #include <iostream> 5 #include <stack> 6 #include <queue> 7 #include "Mutex.h" 8 #include "BaseTask.h" 9 10 class MyThread; 11 12 class MyThreadPool 13 { 14 public: 15 /* no为要开辟的线程数目*/ 16 MyThreadPool(int no); 17 /* 添加任务*/ 18 void AddTask(BaseTask* task); 19 /* 供线程类调用 让线程类对象询问线程池任务队列中是否仍有任务未完成*/ 20 bool FetchTask(MyThread* mt); 21 /* 回收线程 */ 22 void Recycle(MyThread* mt); 23 private: 24 int no; 25 Mutex smutex; 26 Mutex qmutex; 27 28 stack<MyThread*> sthread; 29 queue<BaseTask*> qtask; 30 }; 31 32 #endif
MyThreadPool.cpp
1 #include "MyThreadPool.h" 2 #include "MyThread.h" 3 4 MyThreadPool::MyThreadPool(int no) 5 { 6 this->no = no; 7 8 for(int i=0;i<no;++i) 9 { 10 sthread.push(new MyThread(this)); 11 } 12 } 13 14 void MyThreadPool::AddTask(BaseTask* task) 15 { 16 smutex.Lock(); 17 if (!sthread.empty()) 18 { 19 MyThread* mt = sthread.top(); 20 sthread.pop(); 21 smutex.Unlock(); 22 mt->Set_Task(task); 23 mt->Start_Task(); 24 } 25 else 26 { 27 smutex.Unlock(); 28 qmutex.Lock(); 29 qtask.push(task); 30 qmutex.Unlock(); 31 } 32 } 33 34 bool MyThreadPool::FetchTask(MyThread* mt) 35 { 36 qmutex.Lock(); 37 if (!qtask.empty()) 38 { 39 mt->Set_Task(qtask.front()); 40 qmutex.Unlock(); 41 return true; 42 } 43 else 44 { 45 qmutex.Unlock(); 46 return false; 47 } 48 } 49 50 void MyThreadPool::Recycle(MyThread* mt) 51 { 52 smutex.Lock(); 53 sthread.push(mt); 54 smutex.Unlock(); 55 }
main.cpp
1 #include "MyThread.h" 2 #include "MyThreadPool.h" 3 #include "Mutex.h" 4 #include "MyTask.h" 5 6 int main() 7 { 8 MyThreadPool mtp(3); 9 while(1) 10 { 11 BaseTask* task = new MyTask; 12 mtp.AddTask(task); 13 sleep(5); 14 delete task; 15 task = NULL; 16 } 17 }
makefile
1 pro: main.cpp libtp.a MyTask.cpp 2 g++ -lpthread main.cpp -L. -ltp MyTask.cpp -o pro 3 4 libtp.a: MyThreadPool.o MyThread.o Mutex.o 5 ar cr libtp.a Mutex.o MyThreadPool.o MyThread.o 6 7 MyThread.o:MyThread.cpp 8 g++ -c -lpthread MyThread.cpp -o MyThread.o 9 10 MyThreadPool.o:MyThreadPool.cpp 11 g++ -c -lpthread MyThreadPool.cpp -o MyThreadPool.o 12 13 Mutex.o:Mutex.cpp 14 g++ -c -lpthread Mutex.cpp -o Mutex.o 15 16 clean: 17 rm libtp.a MyThreadPool.o Mutex.o MyThread.o pro
执行 ./pro 即可
编译: 我首先将线程类(MyThread.o) 互斥类(Mutex.o) 和 线程池类(MyThreadPool.o) 打包成一个静态库
静态库留出两个接口 一个是线程池的初始化 另外一个是用户自己继承BaseTask的run函数后 调用线程池的类对象 AddTask接口去增加任务
所以使用者只需自己指定要开辟的线程数(线程池的构造函数) 和 自定义 任务类对象即可使用这个线程池.
ps:所有文件都放在同一个目录下
标签:
原文地址:http://www.cnblogs.com/lewiskyo/p/4214549.html