#ifndef THREADPOOL_H #define THREADPOOL_H #include<iostream> #include<list> #include<exception> #include<pthread.h> #include"locker.h"//线程的互斥和同步机制 using namespace std; template<typename T>class threadpool { public: //默认创建8个线程,请求队列最大为10000 threadpool(int thread_number = 8, int max_requests = 10000); ~threadpool(); //往请求队列中添加任务 bool append(T *request); private: //线程的工作函数,它不断从工作队列中取出任务并执行 static void *worker(void *arg); void run(); private: int m_thread_number;//线程池中的线程数 int m_max_requests;//请求队列所能允许的最大请求数 pthread_t m_threads;//描述线程池的数组 大小和线程数相同 list<T *> m_workqueue;//请求工作队列(先来先服务) locker m_queuelocker;//保护请求队列的互斥锁 sem m_queuestat;//信号量 判断是否有任务需要处理 bool m_stop;//线程池是否被释放了 }; template<typename T> threadpool<T>::threadpool(int thread_number, int max_requests): m_thread_number(thread_number), m_max_requests(max_requests), m_stop(false), m_threads(NULL) { if((thread_number <= 0) || (max_requests<=0)) { throw std::exception(); } //创建数组用来存放线程号 m_threads = new pthread_t[m_thread_number]; if(!m_threads) { throw std::exception(); } //创建thread_number个线程,并将它们设置为脱离线程 for(int i=0; i<thread_number; ++i) { cout<<"create "<<i+1<<" th thread"<<endl; if(pthread_create(m_thread[i], NULL, worker, this) != 0)//将this指针传入 可以方便的调用this->worker(); 提高了效率 { delete []m_threads; throw std::exception(); } //设置线程状态为非阻塞,结束后立即返回 类似于waitpid if( pthread_detack(m_threads[i]) ) { delete [] m_threads; throw std::exception(); } } } template<typename T> threadpool<T>::~threadpool() { delete [] m_threads; m_stop = true; } template<typename T> bool threadpool<T>::append(T *request) { //由于工作队列被多个线程所共享,可能有多个线程同时处理工作队列, //所以在对工作队列进行相关操作时,一定要加锁 m_queuelocker.lock();//加锁 if(m_workqueue.size() > m_max_requests) { m_queuelocker.unlock(); return false; } m_workqueue.push_back(request); m_queuelocker.unlock();//操作完成后解锁 m_queuestat.post();//对信号量进行post操作 return true; } template<typename T> void * threadpool<T>::worker(void *arg) { threadpool *pool = (threadpool *)arg;//获取this指针 pool->run();//调用run处理 return pool; } template<typename T> void threadpool<T>::run() { while(!m_stop)// { m_queuestat.wait();//等待获取信号量 m_queuelocker.lock();//获得信号量之后进行加锁 if(m_workqueue.empty())//如果请求工作队列为空 解锁 进行下一次循环 继续轮询 { m_queuelocker.unlock(); continue; } T *request = m_workqueue.front();//获取将要处理的请求 m_workqueue.push_front();//该请求出队 m_queuelocker.unlock();//队列操作完毕 解锁 if(!request)//请求为空 进行下一次循环 继续进行轮询 { continue; } request->process();//真正的请求处理函数!!! } } #endif
原文地址:http://blog.csdn.net/huai1693838234/article/details/44753311