标签:stream 条件变量 主线程 指针 处理 cout future 调用 工具
#include <iostream> #include <thread> #include <mutex> class wait_test { bool flag; std::mutex m; public: wait_test(bool _flag):flag(_flag){} void setFlag(bool _flag) { std::unique_lock<std::mutex> lk(m); flag = _flag; } bool getFlag() { std::unique_lock<std::mutex> lk(m); return flag; } void wait_for_flag() { std::unique_lock<std::mutex> lk(m); while (!flag) { lk.unlock(); // 1 解锁互斥量 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms lk.lock(); // 3 再锁互斥量 } } }; void funA(wait_test &wt, int &i) { while (!wt.getFlag()) { ++i; } } void funB(wait_test &wt, int &i) { std::cout << "begin\t" << i << std::endl; wt.wait_for_flag();//等待主线程set std::cout << "end\t" << i << std::endl; } int main() { wait_test wt{ false }; int i{ 0 }; std::thread t1{ funA,std::ref(wt),std::ref(i) }, t2{ funB,std::ref(wt),std::ref(i) }; t1.detach(); t2.detach(); wt.setFlag(true); system("pause"); return 0; }第三个选择(也是优先的选择)是,使用C++标准库提供的工具去等待事件的发生。通过另一线程触发等待事件的机制是最基本的唤醒方式(例如:流水线上存在额外的任务时),这种机制就称为“条件变量”(condition variable)。从概念上来说,一个条件变量会与多个事件或其他条件相关,并且一个或多个线程会等待条件的达成。当某些线程被终止时,为了唤醒等待线程(允许等待线程继续执行)终止的线程将会向等待着的线程广播“条件达成”的信息。
#include <iostream> #include <thread> #include <mutex> #include <queue> struct data_chunk { int m; }; struct A { std::mutex mut; std::queue<data_chunk> data_queue; // 1 std::condition_variable data_cond; bool more_data_to_prepare() { return data_queue.size() < 10; } bool is_last_chunk() { return data_queue.size() == 3; } }; int i = 0; data_chunk prepare_data() { data_chunk r; r.m = ++i; return r; } void data_preparation_thread(A &a) { std::cout << "preparation begin"<< std::endl; while (a.more_data_to_prepare()) { const data_chunk data = prepare_data(); std::lock_guard<std::mutex> lk(a.mut); a.data_queue.push(data); // 2 std::cout << "preparation notify" << std::endl; a.data_cond.notify_one(); // 3 } std::cout << "preparation end" << std::endl; } void process(const data_chunk &d) { std::cout << d.m << std::endl; } void data_processing_thread(A &a) { while (true) { std::unique_lock<std::mutex> lk(a.mut); // 4 a.data_cond.wait( lk, [&a] {return !a.data_queue.empty();}); // 5 std::cout << "process wait end" << std::endl; data_chunk data = a.data_queue.front(); a.data_queue.pop(); lk.unlock(); // 6 process(data); if (a.is_last_chunk()) break; } } int main() { A a; std::thread t1{ data_preparation_thread,std::ref(a) }, t2{ data_processing_thread,std::ref(a) }; t1.join(); t2.join(); system("pause"); return 0; }当等待线程重新获取互斥量并检查条件时,如果它并非直接响应另一个线程的通知,这就是所谓的“伪唤醒”(spurious wakeup)。因为任何伪唤醒的数量和频率都是不确定的,这里不建议使用一个有副作用的函数做条件检查。当你这样做了,就必须做好多次产生副作用的心理准备。
#include <queue> #include <memory> #include <mutex> #include <condition_variable>//头文件 template<typename T> class threadsafe_queue { private: mutable std::mutex mut; // 1 互斥量必须是可变的 std::queue<T> data_queue; std::condition_variable data_cond; public: threadsafe_queue() {} threadsafe_queue(threadsafe_queue const& other) { std::lock_guard<std::mutex> lk(other.mut); data_queue = other.data_queue; } void push(T new_value) { std::lock_guard<std::mutex> lk(mut); data_queue.push(new_value); data_cond.notify_one(); } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this] {return !data_queue.empty();}); value = data_queue.front(); data_queue.pop(); } std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this] {return !data_queue.empty();}); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool try_pop(T& value) { std::lock_guard<std::mutex> lk(mut); if (data_queue.empty()) return false; value = data_queue.front(); data_queue.pop(); return true; } std::shared_ptr<T> try_pop() { std::lock_guard<std::mutex> lk(mut); if (data_queue.empty()) return std::shared_ptr<T>(); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool empty() const { std::lock_guard<std::mutex> lk(mut); return data_queue.empty(); } }; threadsafe_queue<data_chunk> data_queue; // 1 void data_preparation_thread() { while (more_data_to_prepare()) { data_chunk const data = prepare_data(); data_queue.push(data); // 2 } } void data_processing_thread() { while (true) { data_chunk data; data_queue.wait_and_pop(data); // 3 process(data); if (is_last_chunk(data)) break; } }
#include <future> #include <iostream> int find_the_answer_to_ltuae() { std::this_thread::sleep_for(std::chrono::milliseconds(100)); return 10; } void do_other_stuff() { std::this_thread::sleep_for(std::chrono::milliseconds(120)); } int main() { std::future<int> the_answer = std::async(find_the_answer_to_ltuae); do_other_stuff(); std::cout << "The answer is " << the_answer.get() << std::endl; system("pause"); return 0; }std::async 允许你通过添加额外的调用参数,向函数传递额外的参数。当第一个参数是一个指向成员函数的指针,第二个参数提供有这个函数成员类的具体对象(不是直接的,就是通过指针,还可以包装在 std::ref 中),剩余的参数可作为成员函数的参数传入。否则,第二个和随后的参数将作为函数的参数,或作为指定可调用对象的第一个参数。
#include <string> #include <future> #include <iostream> struct X { int m; void foo(int i, std::string const& s) { std::cout << s << "\t" << i << std::endl; } std::string bar(std::string const &s) { return "bar("+s+")"; } }; struct Y { double operator()(double d) { return d + 1.1; } }; X baz(X& _x) { ++_x.m; return _x; } class move_only { public: move_only() = default; move_only(move_only&&) = default; move_only(move_only const&) = delete; move_only& operator=(move_only&&) = default; move_only& operator=(move_only const&) = delete; void operator()() { std::cout << "move_only()" << std::endl; } }; void fun() { X x; auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用p->foo(42, "hello"),p是指向x的指针,指针 auto f2 = std::async(&X::bar, x, "goodbye"); // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本,具体对象 std::cout << f2.get() << std::endl; Y y; auto f3 = std::async(Y(), 3.141); // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到 std::cout << f3.get() << std::endl; auto f4 = std::async(std::ref(y), 2.718); // 调用y(2.718) std::cout << f4.get() << std::endl; x.m = 1; auto f5 = std::async(baz, std::ref(x)); // 调用baz(x) std::cout << f5.get().m << std::endl; auto f6 = std::async(move_only()); // 调用tmp(),tmp是通过std::move(move_only())构造得到 } int main() { fun(); system("pause"); return 0; }在默认情况下,这取决于 std::async 是否启动一个线程,或是否在期望等待时同步任务。在大多数情况下(估计这就是你想要的结果),但是你也可以在函数调用之前,向 std::async 传递一个额外参数。这个参数的类型是 std::launch ,还可以是std::launch::defered ,用来表明函数调用被延迟到wait()或get()函数调用时才执行, std::launch::async 表明函数必须在其所在的独立线程上执行, std::launch::deferred | std::launch::async 表明实现可以选择这两种方式的一种。最后一个选项是默认的。当函数调用被延迟,它可能不会在运行了。
X baz(X& _x,int i) { _x.m=i; std::cout << _x.m<<"调用" << std::endl; return _x; }调用:
auto f7 = std::async(std::launch::async, Y(), 1.2); // 在新线程上执行 std::cout <<"f7\t"<< f7.get() << std::endl; auto f8 = std::async(std::launch::deferred, baz, std::ref(x),2); // 在wait()或get()调用时执行 auto f9 = std::async( std::launch::deferred | std::launch::async, baz, std::ref(x),3); // 实现选择执行方式 std::cout << "f9\t"<<f9.get().m << std::endl; auto f10 = std::async(baz, std::ref(x),4); f8.wait(); // 调用延迟函数,后台运行,此时如果有结果就前行,否则阻塞 std::cout << "f8\t" << f8.get().m << std::endl; std::cout <<"f10\t"<< f10.get().m << std::endl;
Cpp Concurrency In Action(读书笔记3)——同步并发操作
标签:stream 条件变量 主线程 指针 处理 cout future 调用 工具
原文地址:http://blog.csdn.net/bestzem/article/details/52980004