标签:
处理大量并发任务时,一个请求对应一个线程来处理任务,线程的创建和销毁将消耗过多的系统资源,并增加上下文切换代价。线程池技术通过在系统中预先创建一定数量的线程(通常和cpu
核数相同),当任务到达时,从线程池中分配一个线程进行处理,线程在处理完任务之后不用销毁,等待重用。
线程池包括半同步半异步和领导者追随者两种实现方式。线程池包括三部分,第一层是同步服务层,它处理来自上层的任务请求。第二层是同步队列层,同步服务层中的任务将添加到队列中。第三层是异步服务层,多个线程同时处理队列中的任务。
先贴上代码,然后逐一说明
同步队列的实现代码:
// ThreadPool.cpp : 定义控制台应用程序的入口点。
//
#ifndef _synaqueue_h_
#define _synaqueue_h_
#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>
using namespace std;
template<typename T>
class SynaQueue
{
public:
SynaQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false){}
void Put(const T& x)
{
Add(x);
}
void Put(T&& x)
{
Add(forward<T>(x)); //完美转发,不改变参数的类型
}
void Take(list<T>& list)
{
unique_lock<mutex> locker(m_mutex);
// 判断式, 当不满足任一个条件时,条件变量会释放mutex, 并将线程置于waiting状态, 等待其他线程调用notify_one/all 将其唤醒。
// 当满足其中一个条件时继续执行, 将队列中的任务取出,唤醒等待添加任务的线程
// 当处于waiting状态的线程被唤醒时,先获取mutex,检查条件是否满足,满足-继续执行,否则释放mutex继续等待
m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
if (m_needStop)
return;
list = move(m_queue);
m_notFull.notify_one();
}
void Take(T& t)
{
unique_lock<mutex> locker(m_mutex); // 锁
m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
if (m_needStop)
return;
t = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
}
void Stop()
{
{
lock_guard<mutex> locker(m_mutex);
m_needStop = true;
}
m_notFull.notify_all(); // 将所有等待的线程全部唤醒,被唤醒的进程检查m_needStop,为真,所有的线程退出执行
m_notEmpty.notify_all();
}
bool Empty()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.empty();
}
bool Full()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.size() == m_maxSize;
}
size_t Size()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.size();
}
int count()
{
return m_queue.size();
}
private:
bool NotFull() const
{
bool full = m_queue.size() >= m_maxSize;
if (full)
cout << "缓冲区满了,需要等待。。。。" << endl;
return !full;
}
bool NotEmpty() const
{
bool empty = m_queue.empty();
if (empty)
cout << "缓冲区空了,需要等待,。。。异步层线程: " << this_thread::get_id() << endl;
return !empty;
}
template<typename F>
void Add(F&& x)
{
unique_lock<mutex> locker(m_mutex); // 通过m_mutex获得写锁
m_notFull.wait(locker, [this]{return m_needStop || NotFull(); }); // 不需要停止且满了,就释放m_mutex并waiting;有一个为真就继续执行
if (m_needStop)
return;
m_queue.push_back(forward<F>(x));
m_notEmpty.notify_one();
}
private:
list<T> m_queue; //缓冲区
mutex m_mutex; // 互斥量
condition_variable m_notEmpty; // 条件变量
condition_variable m_notFull;
int m_maxSize; //同步队列最大的size
bool m_needStop; // 停止标识
};
#endif
线程池的实现代码:
#ifndef _threadpool_h_
#define _threadpool_h_
#include "synaqueue.h"
#include <list>
#include <thread>
#include <functional>
#include <memory>
#include <atomic>
const int MaxTaskCount = 100;
class ThreadPool
{
public:
using Task = function < void() > ;
ThreadPool(int numThread = thread::hardware_concurrency()) :m_queue(MaxTaskCount){ Start(numThread); }
~ThreadPool(){ Stop(); }
void Stop()
{
call_once(m_flag, [this]{StopThreadGroup(); });
}
void AddTask(Task&& task)
{
m_queue.Put(forward<Task>(task));
}
void AddTask(const Task& task)
{
m_queue.Put(task);
}
private:
void Start(int numThreads)
{
m_running = true;
//创建线程组
for (int i = 0; i < numThreads; i++)
{
m_threadgroup.push_back(make_shared<thread>(&ThreadPool::RunInThread, this));
}
}
// 每个线程都执行这个函数
void RunInThread()
{
while (m_running)
{
//取任务分别执行
list<Task> list;
m_queue.Take(list);
for (auto& task : list)
{
if (!m_running)
return;
task();
}
}
}
void StopThreadGroup()
{
m_queue.Stop(); // 同步队列中的线程停止
m_running = false; // 让内部线程跳出循环并推出
for (auto thread : m_threadgroup)
{
if (thread)
thread->join();
}
m_threadgroup.clear();
}
private:
list<shared_ptr<thread>> m_threadgroup; // 处理任务的线程组, 链表中存储着指向线程的共享指针
SynaQueue<Task> m_queue; //同步队列
atomic_bool m_running; // 是否停止的标识
once_flag m_flag;
};
#endif
单元测试代码:
// ThreadPool.cpp : 定义控制台应用程序的入口点。
//
#include "stdafx.h"
#include "threadpool.h"
void TestThreadPool()
{
ThreadPool pool(2);
thread thd1([&pool]{
for (int i = 0; i < 10; i++)
{
auto thrID = this_thread::get_id();
pool.AddTask([thrID, i]{cout << "同步层线程1的线程ID:" << thrID << " 这是任务 " << i << endl; this_thread::sleep_for(chrono::seconds(2)); });
}
});
thread thd2([&pool]{
for (int i = 0; i < 10; i++)
{
auto thrID = this_thread::get_id();
pool.AddTask([thrID, i]{cout << "同步层线程2的线程ID:" << thrID << " 这是任务 " << i << endl; this_thread::sleep_for(chrono::seconds(2)); });
}
});
this_thread::sleep_for(chrono::seconds(45));
pool.Stop();
thd1.join();
thd2.join();
}
int main()
{
TestThreadPool();
}
先看同步队列的数据成员, 成员函数都是在此基础上进行的操作:
private:
list<T> m_queue; //缓冲区
mutex m_mutex; // 互斥量
condition_variable m_notEmpty; // 条件变量
condition_variable m_notFull;
int m_maxSize; //同步队列最大的size
bool m_needStop; // 停止标识
list<T> m_queue
缓冲队列中存放任务,要对它进行访问需要使用一个互斥量m_mutex
和两个条件变量m_notEmpty
、m_notFull
。这和操作系统中的生产者消费者相同。m_needStop
用来标识线程池是否需要停止。
接下来看向同步队列中添加任务:
template<typename F>
void Add(F&& x)
{
unique_lock<mutex> locker(m_mutex); // 通过m_mutex获得写锁
// 不需要停止且满了,就释放m_mutex并waiting;有一个为真就继续执行
m_notFull.wait(locker, [this]{return m_needStop || NotFull(); });
if (m_needStop)
return;
m_queue.push_back(forward<F>(x));
m_notEmpty.notify_one();
}
unique_lock
实现的是写锁,shared_lock
实现的是读锁。条件变量的wait
函数,检查第二个参数是否为真,如果为真,那么将会继续执行,如果为假,那么就释放mutex
并将当前线程置于waiting
状态,等待其他线程通过m_notFull.notify_one()
唤醒。接下来检查结束标识是否设置。将任务添加到队列中,并通知某一个因为队列任务为空而等待的线程继续执行。
接下来看怎么取出来任务, 和添加任务一样,不再详细说明:
void Take(T& t)
{
unique_lock<mutex> locker(m_mutex); // 锁
m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
if (m_needStop)
return;
t = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
}
停止线程池:
void Stop()
{
{
lock_guard<mutex> locker(m_mutex);
m_needStop = true;
}
m_notFull.notify_all(); // 将所有等待的线程全部唤醒,被唤醒的进程检查m_needStop,为真,所有的线程退出执行
m_notEmpty.notify_all();
}
对数据成员要进行互斥操作,因此要先获得锁,lock_guard
会在析构函数中释放锁,因此使用单独的空间。然后将等待添加任务和取出任务的线程全部唤醒,让他们自行终止。
数据成员:
private:
list<shared_ptr<thread>> m_threadgroup; // 处理任务的线程组, 链表中存储着指向线程的共享指针
SynaQueue<Task> m_queue; //同步队列
atomic_bool m_running; // 是否停止的标识
once_flag m_flag;
once_flag
和call_once
用来保证在多线程下函数只调用一次。
标签:
原文地址:http://blog.csdn.net/yapian8/article/details/46514095