标签:nic chap 指令 volatil future push cal status handle
int doAsyncWork(); std::thread t(doAsyncWork);
基于任务的做法
auto fut = std::async(doAsyncWork);
区别是:基于线程的做法没办法访问函数的返回值,或者当出现异常时,程序会直接崩溃;而基于任务的做法能够访问返回值,并且能够返回异常的结果,保证程序不会崩溃
C++并发概念中线程的三个含义
Hardware threads
真正执行计算的线程,每个CPU核上面会提供几个这样的硬件线程
Software threads
系统线程,是操作系统管理的所有进程内部的线程,操作系统把它们调度到硬件线程上来执行任务
std::threads
一个C++进程内的对象,是底层软件线程的句柄
基于std::thread做法的劣势
软件线程是一种有限的资源,当申请超过系统能提供的线程数量时,程序会抛出std::system_error异常,因此程序需要捕获这种异常进行处理,但在用户层难以解决这个问题
当就绪的软件线程数量大于硬件线程数量时,就会出现过载现象,此时,系统的线程调度器按时间片将软件线程调度到硬件线程上执行,而线程切换会给系统增加整体线程管理开销,尤其是当一个硬件线程上的软件线程调度到另一个CPU核的硬件线程上时,代价更昂贵,在这种情况下:
新的硬件线程所在的CPU缓存对于调度过来的软件线程是冷的,因为没有存储与它有关的数据和指令信息
调度过来的软件线程会给原来的软件线程所在的CPU缓存造成污染,因为原来的软件线程可能会再度被调度到这个CPU上面
从应用开发的角度来避免过载很难,因为这种软件线程和硬件线程的最优比例是瞬息万变的,不仅和硬件设施有关,还和软件行为有关,但是把这份工作交给底层标准库来做会容易很多。
std::async的底层机制
如果当前申请的线程已经超过系统能够提供的线程数量时,调用std::thread和std::async有什么区别呢?
调用std::async并不保证会创建一个新的软件线程,而是它允许调度器把新线程要执行的函数放在当前线程上运行,当前线程是请求新线程并等待执行结果的线程,那么当系统过载或者线程资源不够时,合理的调度器会利用自由方式来解决这些问题。
但是对于GUI线程的响应可能会出现问题,因为调度器并不知道应用哪个线程会有高响应度的要求,这时需要对std::async使用std::launch::async的启动策略,它能确保函数会在另一个不同的线程上执行。
最新的线程调度器会使用系统返回的线程池来避免过载,通过工作窃取来改善所有硬件核的负载均衡。C++标准并没有要求这些特性,但是生产商会在标准库中使用这些技术。如果在并发编程中使用基于任务的方法,我们能享受这种技术的好处。
std::thread的使用场景
需要访问底层线程实现的API时,std::thread能通过native_handle()返回这个句柄
需要优化应用的线程使用时,比如硬件特性和应用的配置文件已知且固定
需要实现一些C++并发API没有提供的线程技术
auto fut1 = std::async(f); ==>等价于 auto fut2 = std::async(std::launch::async | std::launch::deferred, f);
因此,默认的启动机制允许函数既可以同步运行,也可以异步运行,这样做是为了允许标准库对线程的管理做优化(处理过载,资源不够,负载不均衡的情况)
std::async的默认启动机制会有一些有趣的含义
无法预测异步函数是否和当前线程并发执行
无法预测异步函数是否执行在新的线程上还是执行在当前线程上
可能也无法预测异步函数是否运行了
以上这些含义使得默认启动机制不能很好地和线程局部变量混用,因为无法预测异步函数所在线程什么时候会执行,也不知道会修改哪些线程局部变量;除此之外,那些使用超时的等待机制循环也会受到影响,因为在一个被延迟的任务上调用wait_for或者wait_unti会产生std::future_status::deferred的值,也就意味着下面的例子可能会一直运行下去
using namespace std::literals; void f() { std::this_thread::sleep_for(1s); } auto fut = std::async(f); while(fut.wait_for(100ms) != std::future_status::ready) //函数f有可能一直没有被执行,那么就会一直卡在循环的判断上,这种情况在开发和单元测试中一般不会出现,但是在高压负载下就会出现 { ... }
一种修正方法是在启动之后,马上检查任务状态是否是std::future_status::deferred,然后做相关处理
auto fut = std::async(f); if( fut.wait_for(0s) == std::future_status::deferred) { ... // 通过get或者wait来同步调用函数f } else { while(fut.wait_for(100ms) != std::future_status::ready) { ...//并发执行其他任务 } ... }
使用默认启动机制的std::async时,需要满足以下条件
任务不需要与调用线程并发运行
与线程局部变量的读写无关
要么保证std::async的future会调用get或者wait函数,要么也能接受异步任务不会被执行的结果
用到wait_for或者wait_until的代码考虑到延迟任务类型的可能性
constexpr auto tenMillion = 10000000; bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion) { std::vector<int> goodVals; std::thread t( [&filter, maxVal, &goodVals] { for(auto i = 0; i <= maxVal ; ++i) { if ( filter(i)) goodVals.push_back(i); } }); auto nh = t.native_handle(); ... if(conditionAreSatisfied()) { t.join(); performComputation(goodVals); return true; } return false; // thread对象没有被join!!! }
为什么std::thread的析构函数会在线程是joinable状态时应该导致程序异常
对于joinable的线程,析构时析构函数在等待底层的线程完成,那么会导致行为异常,很难追踪,因为明明conditionAreSatisfied()返回false,就说明filter函数不应该在执行中了,而析构函数等待这意味着上层的filter函数应该在继续执行。
对于joinable的线程,析构时析构函数通过detach断开了std::thread对象和底层执行线程的连接后,底层的线程仍然在运行,此时thread所在的函数占用的内存已经回收,如果后面仍有函数调用的话,那么新函数将会使用这片内存,而此时如果底层线程修改了原来函数的内存空间时,新函数占用的内存就会被修改!!!
推荐的做法是使用一种RAII对象来将thread对象包含在内,使用RAII对象来保证thread资源的join
class ThreadRAII { public: enum class DtorAction { join, detach }; ThreadRAII(std::thread&& t, DtorAction a): action(a), t(std::move(t)) {} ~ThreadRAII() { if(t.joinable()) { if (action == DtorAction::join){ t.join(); } else { t.detach(); } } } ThreadRAII(ThreadRAII&&) = default; ThreadRAII& operator=(ThreadRAII&&) = default; std::thread& get() { return t;} private: DtorAction action; std::thread t; };
int calcValue(); std::packaged_task<int()> pt(calcValue); auto fut = pt.get_future(); // fut会正常析构,因为不满组上面的条件1
{ std::packaged_task<int()> pt(calcValue); auto fut = pt.get_future(); std::thread t(std::move(pt)); ... }
std::condition_variable cv;
std::mutex m;
// task 1 { ... // detect event cv.notify_one(); } //task 2 { std::unique_lock<std::mutex> lk(m); cv.wait(lk); ... // react to event }
问题1:如果task1在task2调用wait之前调用了notify_one,那么task2就会一直挂起
问题2:wait函数没有考虑到错误的唤醒,此时task1尚未调用notify_one
共享布尔变量
std::atomic<bool> flag(false); //task 1 { ... // detect event flag = true; } //task 2 { ... while(!flag); // wait for event ... }
问题是while循环空转会浪费CPU资源
条件变量加布尔变量组合
std::condition_variable cv; std::mutex m; bool flag(false); //task 1 { ... // detect event { std::lock_guard<std::mutex> g(m); flag = true; } cv.notify_one(); } //task 2 { ... { std::unique_lock<std::mutex> lk(m); cv.wait(lk, [] {return flag; }); // use lambda to avoid spurious wakeups ... // react to event } ... }
future+promise方式
std::promise<void> p; // task 1 { ... // detect event p.set_value(); // tell reacting task } // task 2 { ... p.get_future().wait(); // wait on future corresponding to p ... // react to event }
缺点是std::promise只能被设置一次,不能被重复使用
这种方式还可以用来启动一个suspend的线程
std::promise<void> p; void react(); void detect() { std::thread t([] { p.get_future().wait(); react(); }); ... // do something else before starting thread p.set_value(); ... t.join(); }
或者控制多个线程的启动
std::promise<void> p; void detect() { auto sf = p.get_future().share(); for(int i = 0; i < threadsToRun; ++i) { vt.emplace_back([sf] { sf.wait(); react(); }); } ... p.set_value(); ... for(auto& t : vt){ t.join(); } }
[Effective Modern C++(11&14)]Chapter 7: The Concurrency API
标签:nic chap 指令 volatil future push cal status handle
原文地址:https://www.cnblogs.com/burningTheStar/p/8944239.html