C++并发:在线程间共享数据
1 线程间共享数据的问题
1.1 条件竞争
条件竞争:在并发编程中:操作由两个或多个线程负责,它们争先让线程执行各自的操作,而结果取决于它们执行的相对次序,这样的情况就是条件竞争。
诱发恶性条件竞争的典型场景是,要完成一项操作,却需要改动两份或多份不同的数据,而它们只能用单独的指令改动,当其中的一份数据完成改动时,别的线程有可能不期而访。并且由于这样的场景出现的时间窗口小,因此一般很难复现场景定位。
1.2 防止恶性条件竞争
有如下方法:
1 采取保护措施包装数据结构,确保中间状态只对执行改动的线程可见。
2 修改设计,由一连串不可拆分的改动完成数据变更,每个改动都维持不变量不被破坏。这通常称为无锁编程,难以正确编写。如果从事这一层面的开发,就要探究内存模型的细节,以及区分每个线程能够看到什么数据集。
3 修改数据结构来当作事务处理。
2 用互斥保护共享数据
访问一个数据结构前,先锁住与数据相关的互斥,访问结束后再解锁互斥。C++线程库保证了,一旦有线程锁住了某个互斥,若其他线程试图再给他加锁,需要等待。
互斥也可能带来某些问题,比如死锁,对数据的过保护和欠保护。
2.1 std::mutex
C++中使用std::mutex的实例来构造互斥。
可以通过成员函数lock()对其加锁,unlock()进行解锁。但是并不推荐直接调用成员函数,原因是这样需要记住在函数以外的每条代码路径都要调用unlock(),包括异常退出的路径。
取而代之,C++便准库提供了模板std::lock_guard<>,针对互斥类融合实现了RAII:在构造时加锁,在析构时解锁,从而保证互斥总被正确解锁。
#include <list>
#include <mutex>
#include <algorithm>std::list<int> some_list;
std::mutex some_mutex;
void add_to_list(int new_value) {std::lock_guard<std::mutex> guard(some_mutex);some_list.push_back(new_value);
}bool list_contains(int value_to_find) {std::lock_guard<std::mutex> guard(some_mutex);return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
}
C++17支持了模板参数推导,使得上述实现可以写成如下样式。并且引入了std::scoped_lock,他是增强版的lock_guard
std::lock_guard guard(some_mutex);std::scoped_guard guard(some_mutex);
2.2 指针和引用打破互斥保护
如果成员函数返回指针或引用,指向受保护的数据,那么即便成员函数全部按良好、有序的方式锁定互斥,仍然会无济于事。
只要存在任何能访问该指针和引用的代码,它就可以访问受保护的共享数据,而无需锁定互斥。因此,利用互斥保护共享数据,需要谨慎设计程序接口,从而保证互斥已先行锁定,再对受保护的共享数据进行访问。
2.3 组织和编排代码以保护共享数据
我们除了要防止成员函数向调用者传出指针或者引用,还要注意成员函数内部调用的别的函数,也不要向这些函数传递指针或者引用。
#include <mutex>
#include <string>class some_data {int a;std::string b;public:void do_something();
};class data_wrapper {
private:some_data data;std::mutex m;
public:template<typename Function>void process_data(Function func) {std::lock_guard<std::mutex> l(m);func(data);}
};some_data* unprotected;void malicious_function(some_data& protected_data) {unprotected=&protected_data;
}
data_wrapper x;void foo() {x.process_data(malicious_function);unprotected->do_something();
}
比如上述代码,malicious_function方法将被互斥锁保护的data_wrapper中的some_data的引用赋值给外面的unprotected,导致互斥保护被打破,在外面可直接通过unprotected进行操作。
2.4 发现接口固有的条件竞争
#include <deque>
template<typename T, typename Container=std::deque<T>>
class stack {
public:explicit stack(const Container&);explicit stack(Container&& = Container());template <class Alloc> explicit stack(const Alloc&);template <class Alloc> stack(const Container&, const Alloc&);template <class Alloc> stack(Container&, const Alloc&);template <class Alloc> stack(stack&&, const Alloc&);bool empty() const;size_t size() const;T& top();T const& top() const;void push(T const&);void push(T&&);void pop();void swap(stack&&);template <class... Args> void emplace(Args&&... args);
};
上述实现会导致条件竞争,也就是empty和size的结果不可信,因为在函数返回后,其他线程不再受限,可能马上会有新元素入栈或者出栈。
线程1 | 线程2 |
if(!s.empty()) | |
if(!s.empty()) | |
int const value=s.top(); | |
int const value=s.top(); | |
s.pop(); | |
do_something(value); | s.pop(); |
do_something(value); |
这样,当一个栈只有一个元素的时候,第二个pop的线程会导致未定义行为。
并且,当我们复制vector时,如果vector中的元素数量巨大,可能导致因为资源不足造成的内存分配失败。pop函数的定义是,返回栈顶元素的值,并且将其从栈顶移除。因此,只有在栈被改动之后,弹出的元素才返回给调用者,然而在向调用者复制数据的过程中,有可能抛出异常。万一弹出的元素已经从栈上移除,但是复制不成功,就会造成数据丢失。
2.4.1 消除竞争
2.4.1.1 传入引用
std::vector<int> result;
some_stack.pop(result);
优点:pop的元素在外部容器白村了生命周期
缺点:如果要调用pop,还要先闯将一个别的容器。
2.4.1.2 提供不抛出异常的拷贝构造函数,或不抛出异常的移动构造函数
这样虽然安全,但是效果并不理想。栈容器的用途会受限。
2.4.1.3 返回指针,指向弹出元素
优点:指针可以自由的复制,不会抛出异常。
缺点:指向的对象仍然在内存中,需要额外的内存管理,可以使用shared_ptr。
2.4.1.4 结合1,2或者1,3
2.4.1.5 线程安全的栈容器
#include <exception>
#include <memory>
#include <mutex>
#include <stack>struct empty_stack: std::exception {const char* what() const throw();
};template<typename T>
class threadsafe_stack {
private:std::stack<T> data;mutable std::mutex m;
public:threadsafe_stack() {}threadsafe_stack(const threadsafe_stack& other) {std::lock_guard<std::mutex> lock(other.m);data=other.data;}threadsafe_stack operator=(const threadsafe_stack&) = delete;void push(T new_value) {std::lock_guard<std::mutex> lock(m);data.push(std::move(new_value));}std::shared_ptr<T> pop() {std::lock_guard<std::mutex> lock(m);if (data.empty()) throw empty_stack();std::shared_ptr<T> const res(std::make_shared<T>(data.top()));data.pop();return res;}void pop(T& value) {std::lock_guard<std::mutex> lock(m);if (data.empty()) throw empty_stack();value = data.pop();data.pop();}bool empty() const {std::lock_guard<std::mutex> lock(m);return data.empty();}
};
2.5 死锁:问题和解决方法
防范死锁的建议通常是,始终按照相同的顺序对两个互斥加锁。
C++标准提供了std::lock函数,使得可以同时锁住多个互斥。
#include <mutex>
class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X {
private:some_big_object some_detail;std::mutex m;
public:X(some_big_object const& sd) : some_detail(sd){}friend void swap(X& lhs, X& rhs);{if (&lhs == & rhs)return;std::lock(lhs.m, rhs.m);std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);swap(lhs.some_detail, rhs.some_detail);}
};
std::adopt_lock对象指明了互斥已被锁住,即互斥上有锁存在。std::lock_guard实例据此接收锁的归属权,不会在构造函数内试图另行加锁。
无论是正常返回还是异常退出,std::lock_guard都保证了互斥全都正确解锁。
另外,lock()对lhs.m或rhs.m进行加锁,这一函数调用可能导致抛出异常。
C++17还提供了全新的特性std::scoped_lock<>。它和std::lock_guard<>完全等价。只不过前者是可变参数模板,接收各种互斥型别作为模板参数列表,还能以多个互斥对象作为构造函数的参数列表。
void swap(X& lhs, X& rhs)
{if (&lhs==&rhs)return;std::scoped_lock guard(lhs.m, rhs.m);swap(lhs.some_detail, rhs.some_detail);
}
使用新特性实现如上,并且上述代码还是用了类模板参数推导(C++17)。使用std::scoped_lock将lock和lock_guard合并为一句,降低出错概率。
2.6 防范死锁的补充准则
即使没有牵涉锁,也会发生死锁现象。假定有两个线程,各自关联了std::thread实例,若同时在对方的std::thread实例上调用join,那么就能制造出死锁现象。
防范死锁的最终准则:只要另一个线程有可能正在等待当前线程,那么当前线程不要反过来等待他。
2.6.1 避免嵌套锁
假如已经持有锁,就不要试图获取第二个锁。这样保证每个线程最多只持有一个锁,仅锁的使用本身不可能导致锁。
但是还存在其他可能引起死锁的场景(比如多个线程彼此等待),操作多个互斥锁很可能是最常见的死锁诱因。如果真的需要获取多个锁,应使用lock函数,单独的调用动作一次获取全部锁来避免死锁。
2.6.2 一旦持锁,就须避免调用由用户提供的程序接口
若程序接口由用户自行实现,则我们无从得知它到底会做什么,可能会试图获取锁。这样便可能违反避免嵌套锁的准则,可能发生死锁。
不过有时候这个情况难以避免,因此在需要调用用户提供的程序接口时,要遵守2.6.3准则。
2.6.3 依从固定顺序获取锁
如果多个锁是绝对必要的,却无法通过std::lock()在一步操作内全部获取,我们只能退而求其次,在每个线程内部依从固定顺序获取这些锁。
也可以同时给这些互斥加锁。
或者对于双向链表来说,规定遍历的方向,让线程总是必须先锁住A,再锁住B,也可以防范死锁。
2.6.4 按层级加锁
锁的层级划分就是按照特定的方式规定加锁次序,在运行期据此查验加锁操作是否遵从预设规则。若某个线程已对低层级互斥加锁,则不准它再对高层级互斥加锁。不过这种模式C++标准库尚未提供支持,自行实现如下: