标签:star types abort 作用 sizeof join 子线程 注意 tab
class threadpool:noncopyable { };
利用mymuduo::thread 完成对于线程池的封装
线程池内部成员:
线程集合m_threads: 用于保存线程池内的所有线程
线程池任务队列m_queue 表示待执行的任务队列
条件变量:m_notFull,m_notEmpty, 条件变量,用于对任务队列同步操作,感觉用一个也可以,就是判断一下m_queue.size()即可
互斥锁:m_mutex 用于和条件变量一起使用
其他成员:线程池的名字,运行状态,最大任务队列长度
线程池提供的操作:
设置任务队列的最大长度
设置线程初始化回调函数
启动线程池,
关闭线程池
运行一个任务,
获取线程池的基本信息,名字,运行状态...
private: mutable mutexlock m_mutex; //配合条件变量使用的互斥锁 condition m_notEmpty; //条件变量,任务列队不空 condition m_notFull; //条件变量,任务列队不满 string m_name; //线程池名字 Task m_threadInitCallback; //任务函数,std::function<void()> //线程池内线程,用智能指针,在vector中存储 std::vector<std::unique_ptr<mymuduo::thread>> m_threads; std::deque<Task> m_queue; //线程池内的任务队列 size_t m_maxQueueSize; //任务队列最大长度 bool m_running; //线程池正在运行?
public: typedef std::function<void()> Task; //C++中,有时可以将构造函数用作自动类型转换函数。但这种自动特性并非总是合乎要求的,有时会导致意外的类型转换 //被explicit关键字修饰的类构造函数,不能进行自动地隐式类型转换,只能显式地进行类型转换。 explicit threadpool(const string& nameArg=string("ThreadPool")); ~threadpool(); //设置最大任务队列长度 void setMaxQueueSize(int maxSize){m_maxQueueSize=maxSize;} //设置回调函数 void setThreadInitCallback(const Task& cb){m_threadInitCallback=cb;} //线程池启动,创建numThreads个线程跑起来 void start(int numThreads); //回收子线程,并关闭线程池 void stop(); //获得一些线程池相关信息,名字,任务队列大小 const string& name() const{return m_name;} size_t queueSize() const { mutexlockguard mlg(m_mutex); return m_queue.size(); } // Could block if maxQueueSize > 0 // Call after stop() will return immediately. // There is no move-only version of std::function in C++ as of C++14. // So we don‘t need to overload a const& and an && versions // as we do in (Bounded)BlockingQueue. // https://stackoverflow.com/a/25408989 //运行一个任务,实质是向任务队列中添加一个任务 void run(Task f); private: //判断队列是否满了 bool isFull() const; //线程池线程执行的函数 void runInThread(); //从任务队列中取出一个任务,实质是取出一个函数在runInThread函数里执行 Task take();
#ifndef THREADPOOL_H #define THREADPOOL_H #include"base/condition.h" #include"base/mutex.h" #include"base/thread.h" #include"base/types.h" #include<deque> #include<vector> namespace mymuduo{ class threadpool:noncopyable { public: typedef std::function<void()> Task; //C++中,有时可以将构造函数用作自动类型转换函数。但这种自动特性并非总是合乎要求的,有时会导致意外的类型转换 //被explicit关键字修饰的类构造函数,不能进行自动地隐式类型转换,只能显式地进行类型转换。 explicit threadpool(const string& nameArg=string("ThreadPool")); ~threadpool(); //设置最大任务队列长度 void setMaxQueueSize(int maxSize){m_maxQueueSize=maxSize;} //设置回调函数 void setThreadInitCallback(const Task& cb){m_threadInitCallback=cb;} //线程池启动,创建numThreads个线程跑起来 void start(int numThreads); //回收子线程,并关闭线程池 void stop(); //获得一些线程池相关信息,名字,任务队列大小 const string& name() const{return m_name;} size_t queueSize() const { mutexlockguard mlg(m_mutex); return m_queue.size(); } // Could block if maxQueueSize > 0 // Call after stop() will return immediately. // There is no move-only version of std::function in C++ as of C++14. // So we don‘t need to overload a const& and an && versions // as we do in (Bounded)BlockingQueue. // https://stackoverflow.com/a/25408989 //运行一个任务,实质是向任务队列中添加一个任务 void run(Task f); private: //判断队列是否满了 bool isFull() const; //线程池线程执行的函数 void runInThread(); //从任务队列中取出一个任务,实质是取出一个函数在runInThread函数里执行 Task take(); mutable mutexlock m_mutex; //配合条件变量使用的互斥锁 condition m_notEmpty; //条件变量,任务列队不空 condition m_notFull; //条件变量,任务列队不满 string m_name; //线程池名字 Task m_threadInitCallback; //任务函数,std::function<void()> //线程池内线程,用智能指针,在vector中存储 std::vector<std::unique_ptr<mymuduo::thread>> m_threads; std::deque<Task> m_queue; //线程池内的任务队列 size_t m_maxQueueSize; //任务队列最大长度 bool m_running; //线程池正在运行? };//namespace mymuduo } #endif // THREADPOOL_H
#include "threadpool.h" #include"base/logging.h" #include"base/exception.h" #include<assert.h> #include<stdio.h> namespace mymuduo{ //构造函数仅用于初始化各种成员变量,其他什么也不做 threadpool::threadpool(const string& nameArg) :m_mutex(),m_notEmpty(m_mutex),m_notFull(m_mutex),m_name(nameArg), m_maxQueueSize(0),m_running(false) { } threadpool::~threadpool() { if(m_running) //只有线程池处于运行状态才可以关闭 this->stop(); } //启动线程池,创建numThreads个线程 void threadpool::start(int numThreads) { assert(m_threads.empty()); m_running=true; m_threads.reserve(numThreads); //创建numThreads个线程 for(int i=0;i<numThreads;i++) { char id[32]; snprintf(id,sizeof(id),"%d",i+1); //push_back和emplace_back添加临时变量的区别,假设a是个临时变量 //push_back(a),会在vector/dequeue内部把a拷贝过来,然后临时变量a被析构 //emplace_back(a),直接添加临时变量,没有拷贝操作,不会析构a m_threads.emplace_back(new thread(std::bind(&threadpool::runInThread,this),m_name+id)); m_threads[i]->start(); //运行线程,实际上是运行runInThread函数 } //numThreads为0,即不创建任务线程时,默认调用m_threadInitCallback()函数 if(numThreads==0 && m_threadInitCallback) m_threadInitCallback(); } void threadpool::stop() { //这里踩一个坑一定要注意,mutexlockguard一定要析构把锁释放掉,也就是写在局部作用域中{} //下分来分析一下之前踩得坑,没写到局部作用于会发生什么.(死锁) //当前线程执行join()时,子线程拿不到锁一直不会结束,因此,join()得不到返回,mlg也不会析构, //m_mutex也不会得到释放,形成死锁 //当前线程拿着锁等回收子线程,之后才能释放锁,而子线程需要锁才能结束,因此形成死锁 { mutexlockguard mlg(m_mutex); m_running=false; //指定线程池状态为false,之后唤醒时所有线程都会直接退出 m_notFull.notifyAll(); m_notEmpty.notifyAll(); } //回收每个线程 for(auto& i:m_threads) i->join(); } //线程池任务队列不为空就往里面添加一个任务 void threadpool::run(Task task) { //线程池为空,直接运行即可,也不用同步啥的 if(m_threads.empty()) { task(); return; } //线程池不为空,所有线程争抢这一任务 mutexlockguard mlg(m_mutex); while(m_running && isFull()) //线程池任务队列满了必须要等待 m_notFull.wait(); if(!m_running) //线程池状态变成false,直接退出 { return; } assert(!isFull()); //把当前任务添加到任务队列中去 m_queue.push_back(std::move(task)); m_notEmpty.notify(); //唤醒一个线程 } //从线程池任务队列中拿出一个任务 threadpool::Task threadpool::take() { mutexlockguard mlg(m_mutex); //当线程池任务队列为空时一直等待 //LOG_WARN<<currentthread::tid()<<"wait for task..."; while(m_running && m_queue.empty()) m_notEmpty.wait(); //LOG_WARN<<currentthread::tid()<<"got task..."; //无任务可取了, if(m_queue.empty())return NULL; Task task; task=m_queue.front(); m_queue.pop_front(); if(m_maxQueueSize>0) //唤醒一个线程 m_notFull.notify(); return task; } bool threadpool::isFull() const { m_mutex.assertLocked(); //保证m_mutex被当前线程所持有 return m_maxQueueSize>0 && m_queue.size()>=m_maxQueueSize; } void threadpool::runInThread() { try { if(m_threadInitCallback) m_threadInitCallback(); while(m_running) { Task task=this->take();//在任务队列中拿出一个任务并执行它 //LOG_WARN<<"get one task"; if(task!=NULL) task(); //LOG_WARN<<"get one task done"; } } catch (const mymuduo::exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", m_name.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); fprintf(stderr, "stack trace: %s\n", ex.stackTree()); abort(); }catch (const std::exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", m_name.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); abort(); }catch(...) { fprintf(stderr, "unknown exception caught in ThreadPool %s\n", m_name.c_str()); throw; // rethrow } } }//namespace mymuduo
#include"base/threadpool.h" #include<iostream> void workerThread() { //sleep for 1 sec mymuduo::currentthread::sleepUsec(1000*1000); std::cout<<mymuduo::currentthread::name()<<" done\n"; } int main() { mymuduo::threadpool tp("testThreadPool"); tp.setMaxQueueSize(10); tp.start(5); for(int i=0;i<10;i++) tp.run(workerThread); mymuduo::currentthread::sleepUsec(5000*1000); //tp.stop(); }
打印结果:
testThreadPool1 done
testThreadPool3testThreadPool2 done
testThreadPool5 done
done
testThreadPool4 done
testThreadPool5 done
testThreadPool1 done
testThreadPool2 done
testThreadPool3 done
testThreadPool4 done
标签:star types abort 作用 sizeof join 子线程 注意 tab
原文地址:https://www.cnblogs.com/woodineast/p/13566014.html