线程的管理
启动线程
为了让编译器识别 std::thread 类,这个简单的例子也要包含 <thread> 头文件。
如同大多数C++标准库一样
线程在std::thread对象创建(为线程指定任务)启动
无参任务
最简单的任务,通常是无参数无返回(void-returning)的函数,这种函数在其所属线程上运行,直到函数执行完毕,线程也就结束了。
例如:
#include<iostream>
#include<thread>
using namespace std;
void go()
{
cout << "Welcome to Thread!";
}
void main()
{
thread t1(go);
cin.get();
}
运行结果
有参任务
std::thread 可以用可调用(callable)类型构造,将带有函数调用符类型
的实例传入 std::thread 类中,替换默认的构造函数。
#include <iostream>
#include <thread>
#include <string>
using namespace std;
void run(int num)
{
cout << "线程" << num << endl;
}
void main()
{
thread p(run,1);
cin.get();
}
执行结果!
等待线程
启动了线程,你需要明确是要等待线程结束(加入式joined),还是让其自主运行(分
离式——detached)。如果 std::thread 对象销毁之前还没有做出决定,程序就会终止
( std::thread 的析构函数会调用 std::terminate() )。因此,即便是有异常存在,也需要确保线程能够正确的加入(joined)或分离(detached)
例如
#include <iostream>
#include <thread>
#include <string>
#include <chrono> //c++时间库
using namespace std;
void run(int num)
{
chrono::seconds(3); //c++标准库休眠3秒钟
std::cout << "线程" << num << endl;
}
void main()
{
thread t(run, 1);
t.join(); //阻塞主函数等待等待线程结束
cin.get();
}
joinable()查看当前线程是否被join true没有flase成功
#include <iostream>
#include <thread>
#include <string>
#include <chrono> //c++时间库
using namespace std;
void run(int num)
{
chrono::seconds(3); //c++标准库休眠
std::cout << "线程" << num << endl;
}
void main()
{
thread t(run, 1);
if(t.joinable())
t.join();
cin.get();
}
分离线程
使用detach()会让线程在后台运行,这就意味着主线程不能与之产生直接交互。也就是说,不会等待这个线程结束;
如果线程分离,那么就不可能有 std::thread 对象能引用它,分离线程
的确在后台运行,所以分离线程不能被加入。不过C++运行库保证,当线程退出时,相关资源的能够正确回收,后台线程的归属和控制C++运行库都会处理。
例如
#include <iostream>
#include <thread>
#include <string>
#include <chrono> //c++时间库
using namespace std;
void run(int num)
{
chrono::seconds(3); //c++标准库休眠
std::cout << "线程" << num << endl;
}
void main()
{
thread t(run, 1);
t.detach(); //脱离当前主线程自由执行
cin.get();
}
转移线程所有权
假设要写一个在后台启动线程的函数,想通过新线程返回的所有权去调用这个函数,而不是
等待线程结束再去调用;或完全与之相反的想法:创建一个线程,并在函数中转移所有权,
都必须要等待线程结束。总之,新线程的所有权都需要转移。
线程的所有权可以在 std::thread 实例中移动,下面将展示一个例子。
例如:
#include<iostream>
#include<thread>
using namespace std;
void run1()
{
cout << "run1" << endl;
}
void run2()
{
cout << "run2" << endl;
}
void main()
{
std::thread t1(run1); // 1
std::thread t2 = std::move(t1); // 2当显式使用 std::move() 创建t2后,t1的所有权就转移给了t2
t1 = std::thread(run2);
cin.get();
}
std::thread 支持移动,就意味着线程的所有权可以在函数外进行转移,就如下面程序一样。
#include<iostream>
#include<thread>
using namespace std;
std::thread run1()
{
void some_function();
return std::thread(some_function);
}
void main()
{
void some_function();
thread t1(std::thread(run1));
}
运行时决定线程数量
std::thread::hardware_concurrency()
这个函数将返回能同时并发在一个程序中的线程数量。
例如,多核系统中,返回值就可以能是CPU核芯的数量。
返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0。但是,这也无
法掩盖这个函数对启动线程数量的帮助。
使用线程组来分割任。
//如下
//将100个任务分片,分成4片
#include <iostream>
#include <thread>
#include <vector>
#include <string>
#include <iterator>
#include <numeric>
#include <algorithm>
using namespace std;
template<typename Iterator, typename T>
struct accumulate_block
{
void operator()(Iterator first, Iterator last, T& result) //迭代器头,迭代器尾,线程的数量 (重载)
{
result = std::accumulate(first, last, result);
//累加 开始 结束 累加的初值
}
};
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
// 若输入数据为空,则返回初始值
if (!length)
return init;
// 计算所需要的最大线程数量,每个线程至少计算25个数据
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
// 获取硬件可并发线程数量
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
// 计算实际要创建的线程数量
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
// 根据线程数量,拆分数据
unsigned long const block_size = length / num_threads;
// 创建用于存放每个线程计算结果的容器和线程
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i<(num_threads - 1); ++i)
{
Iterator block_end = block_start;
// 移动迭代器
std::advance(block_end, block_size);
// 启动新线程,对一块数据进行处理
threads[i] = std::thread(
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results[i]));
// 为下一个线程准备数据
block_start = block_end;
}
// 当启动了所有的子线程对数据进行计算,本线程就对数据的最后一块进行计算
accumulate_block<Iterator, T>()(block_start, last, results[num_threads - 1]);
// 使用fore_each对所有的线程执行join操作,等待它们执行结束
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
// 最后对所有的计算结果求和
return std::accumulate(results.begin(), results.end(), init);
}
int main()
{
std::cout << "threads: " << std::thread::hardware_concurrency() << std::endl;
std::vector<int> vi;
for (int i = 0; i<100; ++i)
{
vi.push_back(1);
}
int sum = parallel_accumulate(vi.begin(), vi.end(), 5);
std::cout << "sum=" << sum << std::endl;
cin.get();
return 0;
}
识别线程
线程标识类型是 std::thread::id ,可以通过两种方式进行检索。
第一种,可以通过调用 std::thread 对象的成员函数 get_id() 来直接获取。
如果 std::thread 对象没有与任何执行线程相关联, get_id() 将返回 std::thread::type 默认构造值,这个值表示“没有线程”。
第
二种,当前线程中调用 std::this_thread::get_id() (这个函数定义在 <thread> 头文件中)也可
以获得线程标识
std::thread::id 实例常用作检测线程是否需要进行一些操作,比如:当用线程来分割一项工
主线程可能要做一些与其他线程不同的工作。这种情况下,启动其他线程
前,它可以将自己的线程ID通过 std::this_thread::get_id() 得到,并进行存储。
就是算法核心部分(所有线程都一样的),每个线程都要检查一下,其拥有的线程ID是否与初始线程的ID相同。
std::thread::id master_thread;
void some_core_part_of_algorithm()
{
if(std::this_thread::get_id()==master_thread)
{
do_master_thread_work();
}
do_common_work();
}
总结
讨论了C++标准库中基本的线程管理方式:启动线程,等待结束和不等待结束(因为需要它们运行在后台)。
并了解应该如何在线程启动前,向线程函数中传递参数,如何转移线程的
所有权,如何使用线程组来分割任务。
最后使用线程标识来确定关联数据,以及特殊线程的特殊解决方案
线程间共享数据
当线程在访问共享数据的时候,必须定一些规矩,用来限定线程可访问的数据。
还有,一个线程更新了共享数据,需要对其他线程进行通知。
从易用性的角度,同一进程中的多个线程进行数据共享,有利有弊。
错误的共享数据使用是产生并发bug的一个主要原因。
共享数据带来的问题
当涉及到共享数据时,问题很可能是因为共享数据修改所导致。
如果共享数据是只读的,那么只读操作不会影响到数据,更不会涉及对数据的修改,所以所有线程都会获得同样的数据。
但是,当一个或多个线程要修改共享数据时,就会产生很多麻烦。这种情况下,就必须
小心谨慎,才能确保一切所有线程都工作正常
例如破坏一个链表
如图:
条件竞争
良心竞争:
并发中竞争条件的形成,取决于一个以上线程的相对执行顺序,每个线程都抢着完成自己的任务。大多数情况下,即使改变执行顺序,也是良性竞争,其结果可以接受。
恶心竞争:
例如,有两个线程同时向一个处理队列中添加任务,因为系统提供的变量保持不变,所以谁先谁后都不会有什么影响。当变量遭到破坏时,才会产生条件竞争。
并发中对数据的条件竞争通常表示为“恶性”(problematic)条件竞争,我们对不产生问题的良性条件竞争不感兴趣。
C++标准中也定义了数据竞争(data race)这个术语,一种特殊的条件竞争:并发的
去修改一个独立对象,数据竞争是(可怕的)定义行为(undefine behavior)的起
因。
避免恶性条件竞争
这里提供一些方法来解决恶性条件竞争,最简单的办法就是对数据结构采用某种保护机制,确保只有进行修改的线程才能看到不变量被破坏时的中间状态。
从其他访问线程的角度来看,修改不是已经完成了,就是还没开始。
另一个选择是对数据结构和不变量的设计进行修改,修改完的结构必须能完成一系列不可分
割的变化,也就是保证每个不变量保持稳定的状,这就是无锁编程
另一种处理条件竞争的方式是,使用事务(transacting)的方式去处理数据结构的更新,这里的"处理"就如同对数据库进行更新一样。
所需的一些数据和读取都存储在事务日志中,然后将之前的操作合为一步,再进行提交。
当数据结构被另一个线程修改后,或处理已经重启的情况下,提交就会无法进行,这称作为“软件事务内存”(software transactional memory
(STM))。理论研究中,这是一个很热门的研究领域。这个概念将不会在本书中再进行介绍,
因为在C++中没有对STM进行直接支持。
保护共享数据结构的最基本的方式,是使用C++标准库提供的互斥量(mutex)。
使用互斥量保护共享数据
当程序中有共享数据,肯定不想让其陷入条件竞争,或是不变量被破坏。
那么,将所有访问共享数据结构的代码都标记为互斥岂不是更好?这样任何一个线程在执行这些代码时,其他任何线程试图访问共享数据结构,就必须等到那一段代码执行结束。
于是,一个线程就不可能会看到被破坏的不变量,除非它本身就是修改共享数据的线程。
当访问共享数据前,使用互斥量将相关数据锁住,再当访问结束后,再将数据解锁。线程库需要保证,当一个线程使用特定互斥量锁住共享数据时,其他的线程想要访问锁住的数据,
都必须等到之前那个线程对数据进行解锁后,才能进行访问。这就保证了所有线程能看到共享数据,而不破坏不变量。
互斥量是C++中一种最通用的数据保护机制,但它不是“银蛋”;精心组织代码来保护正确的数据,并在接口内部避免竞争条件是非常重要的。但互斥量自身也有问
题,也会造成死锁,或是对数据保护的太多(或太少)。
C++中使用互斥量
C++中通过实例化 srd::mutex 创建互斥量,通过调用成员函数lock()进行上锁,unlock()进行解锁。
不推荐实践中直接去调用成员函数,因为调用成员函数就意味着,必须记住在每个函数出口都要去调用unlock(),也包括异常的情况。
C++标准库为互斥量提供了一个RAII语法的模板类 std::lack_guard ,其会在构造的时候提供已锁的互斥量,并在析构的时候进行解锁,从而保证了一个已锁的互斥量总是会被正确的解锁。
std::mutex 和 std::lock_guard 都在 <mutex> 头文件中声明。
实践调用成员函数
//进程的锁定
#include <iostream>
#include <thread>
#include <string>
#include<windows.h>
#include<mutex>
using namespace std;
//两个线程并行访问一个变量
int g_num = 20;//找到或者找不到的标识
mutex g_mutex;
void goA(int num)
{
g_mutex.lock();//你访问的变量,在你访问期间,别人访问不了
for (int i = 0; i < 15; i++)
{
g_num = 10;
std::cout << "线程" << num << " " << g_num << endl;
}
g_mutex.unlock();
}
void goB(int num)
{
for (int i = 0; i < 15; i++)
{
g_num = 11;
std::cout << "线程" << num << " " << g_num << endl;
}
}
void main()
{
thread t1(goA, 1);
thread t2(goB, 2);
t1.join();
t2.join();
std::cin.get();
}
运行结果
RAII语法的模板类lack_guard()
RAII语法实现自动解锁
//进程的锁定
#include <iostream>
#include <thread>
#include<mutex>
using namespace std;
//两个线程并行访问一个变量
int g_num = 20;//找到或者找不到的标识
mutex g_mutex;
void goA(int num)
{
lock_guard<std::mutex>guard(g_mutex);//自动解锁
for (int i = 0; i < 15; i++)
{
g_num = 10;
std::cout << "线程" << num << " " << g_num << endl;
}
}
void goB(int num)
{
for (int i = 0; i < 15; i++)
{
g_num = 11;
std::cout << "线程" << num << " " << g_num << endl;
}
}
void main()
{
thread t1(goA, 1);
thread t2(goB, 2);
t1.join();
t2.join();
std::cin.get();
}
精心组织代码来保护共享数据
用互斥量来保护数据,并不是仅仅在每一个成员函数中都加入一个 std::lock_guard 对象那么简单。
一个迷失的指针或引用,将会让这种保护形同虚设。
函数可能没在互斥量保护的区域内,存储着指针或者引用,这样就很危险。
更危险的是:将保护数据作为一个运行时参数.
如同下面:
#include <iostream>
#include <thread>
#include<mutex>
class some_data
{
public :
int a;
std::string b;
public:
void do_something()
{
std::cout << a;
}
};
class data_wrapper
{
private:
some_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func) //通过传递的函数将,保护的数据传递出去,跳过保护
{
std::lock_guard<std::mutex> l(m);
data.a = 10;
func(data); // 1 传递“保护”数据给用户函数
}
};
some_data* unprotected;
void malicious_function(some_data& protected_data)
{
unprotected = &protected_data;
}
data_wrapper x;
void foo()
{
x.process_data(malicious_function); // 2 传递一个恶意函数
unprotected->do_something(); // 3 在无保护的情况下访问保护数据
}
void main()
{
foo();
std::cin.get();
}
例子中process_data看起来没有任何问题, std::lock_guard 对数据做了很好的保护,但调用
用户提供的函数func①,就意味着foo能够绕过保护机制将函数 malicious_function 传递进去
在没有锁定互斥量的情况下调用 do_something() 。
这段代码的问题在于,它根本没有做到保护:只是将所有可访问的数据结构代码标记为互斥。
发现接口内在的条件竞争
因为使用了互斥量或其他机制保护了共享数据,就不必再为条件竞争所担忧吗?并不是这样,你依旧需要确定特定的数据受到了保护。
回想之前双链表的例子,为了能让线程安全地删除一个节点,需要确保防止对这三个节点(待删除的节点及其前后相邻的节点)的并发访问。
如果只对指向每个节点的指针进行访问保护,那就和没有使用互斥量一样,条件竞争仍会发生——整个数据结构和整个删除操作需要保护,但指针不需要保护。
这种情况下最简单的解决方案就是使用互斥量来保护整个链表,尽管对链表的个别操作是安全的,但不意味着你就能走出困境;即使在一个很简单的接口中,依旧可能遇到条件竞争
例如,构建一个类似于 std::stack 结构的栈除了构造函数和swap()以外,需要对 std::stack 提供五个操作:push()一个新元素进栈,pop()一个元素出栈,top()查看栈顶元素,empty()判断栈是否是空栈,size()了解栈中有多少个元素。
即使修改了top(),使其返回一个拷贝而非引用,对内部数据使用一个互斥量进行保护,不过这个接口仍存在条件竞争。
这个问题不仅存在于基于互斥量实现的接口中,在无锁实现的接口中,条件竞争依旧会产生。
这是接口的问题,与其实现方式无关。
一个给定操作需要两个或两个以上的互斥量时,另一个潜在的问题将出现:死锁(deadlock)。
与条件竞争完全相反——不同于两个线程会互相等待,从而什么都没做。
死锁
但线程有对锁的竞争:一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。
这样没有线程能工作,因为他们都在等待对方释放互斥量。这种情况就是死锁,它的最大问题就是由两个或两个以上的互斥量来锁定一个操作。
避免死锁的一般建议,就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。某些情况下是可以这样用,因为不同的互斥量用于不同的地方。
不过,事情没那么简单,比如:当有多个互斥量保护同一个类的独立实例时,一个操作对同一个类的两个不同实例进行数据的交换操作,为了保证数据交换操作的正确性,就要避免数据被并发修改,并确保每个实例上的互斥量都能锁住自己要保护的区域。
不过,选择一个固定的顺序(例如,实例提供的第一互斥量作为第一个参数,提供的第二个互斥量为第二个参数),可能会适得其反:在参数交换了之后,两个线程试图在相同的两个实例间进行数据交换时,程序又死锁了!
std::lock ——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)
交换操作中使用 std::lock() 和 std::lock_guard
#include <iostream>
#include <thread>
#include<mutex>
#include <string>
class some_big_object
{
};
void swap(some_big_object& lhs, some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd) :some_detail(sd) {}
friend void swap(X& lhs, X& rhs)
{
if (&lhs == &rhs)
return;
std::lock(lhs.m, rhs.m); // 1
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); // 2
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock); // 3
swap(lhs.some_detail, rhs.some_detail);
}
};
① 锁住两个互斥量,并且两个 std:lock_guard 实例已经创建好②③,还有一个
互斥量。提供 std::adopt_lock 参数除了表示 std::lock_guard 对象已经上锁外,还表示现成的锁,而非尝试创建新的锁。
这样,就能保证在大多数情况下,函数退出时互斥量能被正确的解锁(保护操作可能会抛出一个异常),也允许使用一个简单的“return”作为返回。还有,需要注意的是,当使用 std::lock 去锁lhs.m或rhs.m时,可能会抛出异常;这种情况下,异常会传播到 std::lock 之外。
当 std::lock 成功的获取一个互斥量上的锁,并且当其尝试从另一个互斥量上再获取锁时,就会有异常抛出,第一个锁也会随着异常的产生而自动释放,所以 std::lock 要么将两个锁都锁住,要不一个都不锁。
避免死锁的进阶
虽然锁是产生死锁的一般原因,但也不排除死锁出现在其他地方。
无锁的情况下,仅需要每个 std::thread 对象调用join(),两个线程就能产生死锁。这种情况下,没有线程可以继续运行,因为他们正在互相等待。这种情况很常见,一个线程会等待另一个线程,其他线程同时也会等待第一个线程结束,所以三个或更多线程的互相等待也会发生死锁。
为了避免死锁,这里的指导意见为:当机会来临时,不要拱手让人(don’t wait for another thread if there’s achance it’s waiting for you)。
以下提供一些的指导建议,如何识别死锁,并消除其他线程的等待。
避免嵌套锁第一个建议往往是最简单的:
一个线程已获得一个锁时,再别去获取第二个(don’t acquire alock if you already hold one)。
如果能坚持这个建议,因为每个线程只持有一个锁,锁上就不会产生死锁。即使互斥锁造成死锁的最常见原因,也可能会在其他方面受到死锁的困扰(比如:线程间的互相等待)。
当你需要获取多个锁,使用一个 std::lock 来做这件事(对获取锁的操作上锁),避免产生死锁。
使用固定顺序获取锁当硬性条件要求你获取两个以上(包括两个)的锁,并且不能使用 std::lock 单独操作来获取它们;那么最好在每个线程上,用固定的顺序获取它们获取它们(锁)。
获取两个互斥量时,避免死锁的方法:关键是如何在线程之间,以一致性的顺序获取锁。一些情况下,这种方式相对简单。
unique_lock——灵活的锁
unique_lock 介绍
std::unqiue_lock 通过对不变量的放松(by relaxing the invariants),会比 std:lock_guard 更加灵活;一个 std::unique_lock 实现不会总是拥有与互斥量相关的数据类型。
首先,就像你能将 std::adopt_lock 作为第二个参数传入到构造函数,对互斥所进行管理,你也可以把 std::defer_lock 作为第二个参数传递进去,为了表明互斥量在结构上应该保持解锁状态。
这样,就可以被后面调用lock()函数的 std::unique_lock 对象(不是互斥量)所获取,或传递 std::unique_lock 对象本身到 std::lock() 中。清单3.6可以很容易被改写为清单3.9中的代
码,使用 std::unique_lock 和 std::defer_lock ,而非 std::lock_guard 和 std::adopt_lock 。
代码长度相同,且几乎等价,唯一不同的就是: std::unique_lock 会占用比较多的空间,并且比 std::lock_guard 运行的稍慢一些。保证灵活性是要付出代价的,这个代价就允许 std::unique_lock 实例不携带互斥量:该信息已被存储,且已被更新。
unique_lock 构造函数
default 构造函数
新创建的 unique_lock 对象不管理任何 Mutex 对象。
locking 初始化
新创建的 unique_lock 对象管理 Mutex 对象 m,并尝试调用 m.lock() 对 Mutex 对象进行上锁,如果此时另外某个 unique_lock 对象已经管理了该 Mutex 对象 m,则当前线程将会被阻塞。
try-locking 初始化
新创建的 unique_lock 对象管理 Mutex 对象 m,并尝试调用 m.try_lock() 对 Mutex 对象进行上锁,但如果上锁不成功,并不会阻塞当前线程。
deferred 初始化
新创建的 unique_lock 对象管理 Mutex 对象 m,但是在初始化的时候并不锁住 Mutex 对象。 m 应该是一个没有当前线程锁住的 Mutex 对象。
adopting 初始化
新创建的 unique_lock 对象管理 Mutex 对象 m, m 应该是一个已经被当前线程锁住的 Mutex 对象。(并且当前新创建的 unique_lock 对象拥有对锁(Lock)的所有权)。
locking 一段时间(duration)
新创建的 unique_lock 对象管理 Mutex 对象 m,并试图通过调用 m.try_lock_for(rel_time) 来锁住 Mutex 对象一段时间(rel_time)。
locking 直到某个时间点(time point)
新创建的 unique_lock 对象管理 Mutex 对象m,并试图通过调用 m.try_lock_until(abs_time) 来在某个时间点(abs_time)之前锁住 Mutex 对像。
copy [deleted]
unique_lock 对象不能被拷贝构造。
移动(move)构造
新创建的 unique_lock 对象获得了由 x 所管理的 Mutex 对象的所有权(包括当前 Mutex 的状态)。调用 move 构造之后, x 对象如同通过默认构造函数所创建的,就不再管理任何 Mutex 对象了。
unique_lock 的构造函数参考
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock, std::unique_lock
using namespace std;
// std::adopt_lock, std::defer_lock
std::mutex foo, bar;
void task_a() {
std::lock(foo, bar); // simultaneous lock (prevents deadlock)
std::unique_lock<std::mutex> lck1(foo, std::adopt_lock);
std::unique_lock<std::mutex> lck2(bar, std::adopt_lock);
std::cout << "task a\n";
// (unlocked automatically on destruction of lck1 and lck2)
}
void task_b() {
// foo.lock(); bar.lock(); // replaced by:
std::unique_lock<std::mutex> lck1, lck2;
lck1 = std::unique_lock<std::mutex>(bar, std::defer_lock);
lck2 = std::unique_lock<std::mutex>(foo, std::defer_lock);
std::lock(lck1, lck2); // simultaneous lock (prevents deadlock)
std::cout << "task b\n";
// (unlocked automatically on destruction of lck1 and lck2)
}
int main()
{
std::thread th1(task_a);
std::thread th2(task_b);
th1.join();
th2.join();
cin.get();
return 0;
}
unique_lock 移动(move assign)赋值操作
移动情况是锁的所有权需要从一个域转到另一个
移动赋值(move assignment)之后,由 A所管理的 Mutex 对象及其状态将会被新的 std::unique_lock 对象取代。
如果被赋值的对象之前已经获得了它所管理的 Mutex 对象的锁,则在移动赋值(move assignment)之前会调用 unlock 函数释放它所占有的锁。
调用移动赋值(move assignment)之后, A对象如同通过默认构造函数所创建的,也就不再管理任何 Mutex 对象了
例如
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include<string>
std::mutex mtx; // mutex for critical section
void print_fifty(std::string c) {
std::unique_lock<std::mutex> lck; // default-constructed
lck = std::unique_lock<std::mutex>(mtx); // move-assigned
std::cout << c;
std::cout << ‘\n‘;
}
int main()
{
std::thread th1(print_fifty, "Move OK !");
th1.join();
std::cin.get();
return 0;
}
unique_lock 主要成员函数
1、 上锁/解锁操作:lock,try_lock,try_lock_for,try_lock_until 和 unlock
2、 修改操作:移动赋值(move assignment),交换(swap)(与另一个 std::unique_lock 对象交换它们所管理的 Mutex 对象的所有权),释放(release)(返回指向它所管理的 Mutex 对象的指针,并释放所有权)
3、 获取属性操作:owns_lock(返回当前 std::unique_lock 对象是否获得了锁)、operator bool()(与 owns_lock 功能相同,返回当前 std::unique_lock 对象是否获得了锁)、mutex(返回当前 std::unique_lock 对象所管理的 Mutex 对象的指针)
std::unique_lock::lock
上锁操作,调用它所管理的 Mutex 对象的 lock 函数。如果在调用 Mutex 对象的 lock 函数时该 Mutex 对象已被另一线程锁住,则当前线程会被阻塞,直到它获得了锁。
该函数返回时,当前的 unique_lock 对象便拥有了它所管理的 Mutex 对象的锁。如果上锁操作失败,则抛出 system_error 异常。
// unique_lock::lock/unlock
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock, std::defer_lock
std::mutex mtx; // mutex for critical section
void print_thread_id(int id) {
std::unique_lock<std::mutex> lck(mtx, std::defer_lock);
// critical section (exclusive access to std::cout signaled by locking lck):
lck.lock();
std::cout << "thread #" << id << ‘\n‘;
lck.unlock();
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i<10; ++i)
threads[i] = std::thread(print_thread_id, i + 1);
for (auto& th : threads) th.join();
std::cin.get();
return 0;
}
std::unique_lock::try_lock
上锁操作,调用它所管理的 Mutex 对象的 try_lock 函数,如果上锁成功,则返回 true,否则返回 false。
#include <iostream> // std::cout
#include <vector> // std::vector
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock, std::defer_lock
std::mutex mtx; // mutex for critical section
void print_star() {
std::unique_lock<std::mutex> lck(mtx, std::defer_lock);
// print ‘*‘ if successfully locked, ‘#‘ otherwise:
if (lck.try_lock())
std::cout << ‘*‘;
else
std::cout << ‘#‘;
}
int main()
{
std::vector<std::thread> threads;
for (int i = 0; i<500; ++i)
threads.emplace_back(print_star);
for (auto& x : threads) x.join();
std::cin.get();
return 0;
}
std::unique_lock::try_lock_for
上锁操作,调用它所管理的 Mutex 对象的 try_lock_for 函数,如果上锁成功,则返回 true,否则返回 false。
#include <iostream> // std::cout
#include <chrono> // std::chrono::milliseconds
#include <thread> // std::thread
#include <mutex> // std::timed_mutex, std::unique_lock, std::defer_lock
std::timed_mutex mtx;
void fireworks() {
std::unique_lock<std::timed_mutex> lck(mtx, std::defer_lock);
// waiting to get a lock: each thread prints "-" every 200ms:
while (!lck.try_lock_for(std::chrono::milliseconds(200))) {
std::cout << "-";
}
// got a lock! - wait for 1s, then this thread prints "*"
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "*\n";
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i<10; ++i)
threads[i] = std::thread(fireworks);
for (auto& th : threads) th.join();
std::cin.get();
return 0;
}
std::unique_lock::owns_lock
返回当前 std::unique_lock 对象是否获得了锁。
#include <iostream> // std::cout
#include <vector> // std::vector
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock, std::try_to_lock
std::mutex mtx; // mutex for critical section
void print_star () {
std::unique_lock<std::mutex> lck(mtx,std::try_to_lock);
// print ‘*‘ if successfully locked, ‘x‘ otherwise:
if (lck.owns_lock())
std::cout << ‘*‘;
else
std::cout << ‘x‘;
}
int main ()
{
std::vector<std::thread> threads;
for (int i=0; i<500; ++i)
threads.emplace_back(print_star);
for (auto& x: threads) x.join();
return 0;
}
同步并发操作
当你不仅想要保护数据,还想对单独的线程进行同步。例如,在第一个线程完成前,可能需要等待另一个线程执行完成。
通常情况下,线程会等待一个特定事件的发生,或者等待某一条件达成(为true)。这可能需要定期检查“任务完成”标识,或将类似的东西放到共享数据中,但这与理想情况还是差很多。
像这种情况就需要在线程中进行同步,C++标准库提供了一些工具可用于同步操作,形式上表现为
条件变量(condition variables)和期望(futures)。
等待一个事件或其他条件三种方式
当一个线程等待另一个线程完成任务时,它会有很多选择
一、它可以持续的检查共享数据标志(用于做保护工作的互斥量),直
到另一线程完成工作时对这个标志进行重设。
不过,就是一种浪费:线程消耗宝贵的执行时间持续的检查对应标志,并且当互斥量被等待线程上锁后,其他线程就没有办法获取锁,这样线程就会持续等待。因为以上方式对等待线程限制资源,并且在完成时阻碍对标识的设置。
二、个选择是在等待线程在检查间隙,使用 std::this_thread::sleep_for() 进行周期性的间歇
例如
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock(); // 1 解锁互斥量
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
lk.lock(); // 3 再锁互斥量
}
}
这个实现就进步很多,因为当线程休眠时,线程没有浪费执行时间,但是很难确定正确的休
眠时间。太短的休眠和没有休眠一样,都会浪费执行时间;太长的休眠时间,可能会让任务
等待线程醒来。休眠时间过长是很少见的情况,因为这会直接影响到程序的行为,当在高节
奏游戏(fast-paced game)中,它意味着丢帧,或在一个实时应用中超越了一个时间片。
三、选择(也是优先的选择)是,使用C++标准库提供的工具去等待事件的发生。
通过另一线程触发等待事件的机制是最基本的唤醒方式(例如:流水线上存在额外的任务时),这种机制就称为“条件变量”(condition variable)。
从概念上来说,一个条件变量会与多个事件或其他条件相关,并且一个或多个线程会等待条件的达成。
当某些线程被终止时,为了唤醒等待线程(允许等待线程继续执行)终止的线程将会向等待着的线程广播“条件达成”的信息。
等待条件达成
C++标准库对条件变量有两套实
现: std::condition_variable 和 std::condition_variable_any 。
这两个实现都包含在 <condition_variable> 头文件的声明中。
两者都需要与一个互斥量一起才能工作(互斥量是为了同步);前者仅限于std::mutex 一起工作,而后者可以和任何满足最低标准的互斥量一起工作,从而加上了_any的后缀。
因为 std::condition_variable_any 更加通用,这就可能从体积、性能,以及系统资源的使用方面产生额外的开销,所以 std::condition_variable 一般作为首选的类型,当对灵活性有硬性要求时,我们才会去考虑 std::condition_variable_any 。
所以,如何使用 std::condition_variable 去处理之前提到的情况——当有数据需要处理时,
如何唤醒休眠中的线程对其进行处理?以下清单展示了一种使用条件变量做唤醒的方式。
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx; // 全局互斥锁.
std::condition_variable cv; // 全局条件变量.
bool ready = false; // 全局标志位.
void do_print_id(int id)
{
std::unique_lock <std::mutex> lck(mtx);
while (!ready) // 如果标志位不为 true, 则等待...
cv.wait(lck); // 当前线程被阻塞, 当全局标志位变为 true 之后,
// 线程被唤醒, 继续往下执行打印线程编号id.
std::cout << "thread " << id << ‘\n‘;
}
void go()
{
std::unique_lock <std::mutex> lck(mtx);
ready = true; // 设置全局标志位为 true.
cv.notify_all(); // 唤醒所有线程.
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(do_print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto & th:threads)
th.join();
return 0;
}
std::condition_variable 构造函数
default (1) |
condition_variable(); |
copy [deleted] (2) |
condition_variable (const condition_variable&) = delete; |
std::condition_variable 的拷贝构造函数被禁用,只提供了默认构造函数。
std::condition_variable::wait() 介绍
unconditional (1) |
void wait (unique_lock<mutex>& lck); |
predicate (2) |
template <class Predicate> void wait (unique_lock<mutex>& lck, Predicate pred); |
std::condition_variable 提供了两种 wait() 函数。
当前线程调用 wait() 后将被阻塞(此时当前线程应该获得了锁(mutex),不妨设获得锁 lck),直到另外某个线程调用 notify_* 唤醒了当前线程。
在线程被阻塞时,该函数会自动调用 lck.unlock() 释放锁
,使得其他被阻塞在锁竞争上的线程得以继续执行。
另外,一旦当前线程获得通知(notified,通常是另外某个线程调用 notify_* 唤醒了当前线程),wait() 函数也是自动调用 lck.lock(),使得 lck 的状态和 wait 函数被调用时相同。
在第二种情况下(即设置了 Predicate),只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞.
#include <iostream> // std::cout
#include <thread> // std::thread, std::this_thread::yield
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable cv;
int cargo = 0;
bool shipment_available()
{
return cargo != 0;
}
// 消费者线程.
void consume(int n)
{
for (int i = 0; i < n; ++i) {
std::unique_lock <std::mutex> lck(mtx);
cv.wait(lck, shipment_available);
std::cout << cargo << ‘\n‘;
cargo = 0;
}
}
int main()
{
std::thread consumer_thread(consume, 10); // 消费者线程.
// 主线程为生产者线程, 生产 10 个物品.
for (int i = 0; i < 10; ++i) {
while (shipment_available())
std::this_thread::yield(); //线程调用yield()方法后,表明自己做的事已经完成,让出自己的cpu时间给其他线程使用
std::unique_lock <std::mutex> lck(mtx);
cargo = i + 1;
cv.notify_one();
}
consumer_thread.join();
std::cin.get();
return 0;
}