第三章 线程间共享数据
第三章 线程间共享数据
共享数据带来的问题
在C++多线程编程中,确实存在共享数据的问题,特别是当多个线程试图同时修改同一份数据时。
只读操作通常不会导致问题,因为所有线程都可以同时读取相同的数据,而不会对数据产生修改。
当一个或多个线程需要修改共享数据时,就需要特别小心。
不变量(Invariants)是一个非常重要的概念,它可以帮助程序员更好地理解和描述特殊数据结构的状态。
- 在计算机科学中,不变量通常被用来描述一个程序或者系统的某些属性,这些属性在程序或者系统的生命周期中始终保持不变。
- 例如,对于一个List,一个常见的不变量是"链表中包含的元素数量"。当这个不变量被破坏时,通常意味着数据结构的状态已经发生了改变。
- 对于双链表来说,在删除一个节点时,需要更新其前后的节点的指针。在这个过程中,不变量会被暂时破坏,直到两个指针都完成更新。
- 线程间潜在问题就是修改共享数据,致使不变量遭到破坏, 这个时候就会产生条件竞争(race conditions) 。
在C++多线程编程中,共享数据的问题可以通过使用 互斥量**(mutex)** 或者 读写锁(read-write lock) 来解决。
这些锁可以确保在某一时刻只有一个线程可以修改共享数据,从而避免数据竞争的问题。
此外,也可以**使用原子操作(atomic operations)来保证操作的原子性,**防止在多线程环境下发生数据竞争。
使用互斥量保护共享数据
在C++多线程编程中,**互斥量(mutex)**是一种用于保护共享数据的关键工具,它能够确保在某一时刻只有一个线程可以访问共享数据,从而避免条件竞争和不变量被破坏的问题。
因为只有一个线程可以访问共享数据,其他线程必须等待该线程完成操作后才能继续执行。这样可以确保每个线程都能看到共享数据的一致性状态。
虽然互斥量是一种强大的工具,但它并非万能的。
使用互斥量需要谨慎,因为不正确的使用可能会导致死锁(两个或更多的线程无限期地等待对方释放互斥量)。
此外,过度保护(锁定太多的数据)或保护不足(锁定太少的数据)也可能导致问题。
C++中使用互斥量
C++中通过实例化 stdlock_guard模板类,它会在构造时上锁互斥量,并在析构时解锁互斥量,确保互斥量总是被正确解锁。
- stdlock_guard 的构造函数将会阻塞,直到互斥体变为未锁定状态。
- stdlock_guard 锁定,就不应该再手动解锁这个互斥体。
这种RAII(Resource Acquisition Is Initialization)语法可以简化多线程程序中的资源管理。
Protecting a list with a mutex
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> some_list;
std::mutex some_mutex;
void add_to_list(int new_value)
{
// 使用std::lock_guard互斥访问数据
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
std::lock_guard<std::mutex> guard(some_mutex);
return std::find(some_list.begin(), some_list.end(), value_to_find)
!= some_list.end();
}
- 大多数情况下,互斥量通常会与保护的数据放在同一个类中. 而不是定义成全局变量。
- 互斥量和要保护的数据,在类中都需要定义为private成员, 不能返回这些数据的指针或引用 — 这样会破坏对数据的保护,如下例:
Accidentally passing out a reference to protected dat
class data_wrapper
{
private:
some_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func)
{
std::lock_guard<std::mutex> l(m);
func(data); // 1 传递“保护”数据给用户函数
}
};
some_data* unprotected;
void malicious_function(some_data& protected_data)
{
unprotected = &protected_data;
}
data_wrapper x;
void foo()
{
x.process_data(malicious_function); // 2 传递一个恶意函数
unprotected->do_something(); // 3 锁被穿透,在无保护的情况下访问保护数据
}
发现接口内在的条件竞争
注意观察是否有多个线程或进程同时访问和修改共享数据的情况.
- 识别哪些数据是共享的.
- 仔细审查与接口相关的代码,特别是那些涉及条件判断和循环的部分.
A simple std::stack container
template<typename T,typename Container=std::deque<T> >
class stack
{
public:
explicit stack(const Container&);
explicit stack(Container&& = Container());
template <class Alloc> explicit stack(const Alloc&);
template <class Alloc> stack(const Container&, const Alloc&);
template <class Alloc> stack(Container&&, const Alloc&);
template <class Alloc> stack(stack&&, const Alloc&);
bool empty() const;
size_t size() const;
T& top();
T const& top() const;
void push(T const&);
void push(T&&);
void pop();
void swap(stack&&);
};
// 单线程安全代码
stack<int> s;
if (! s.empty())
{
int const value = s.top();
s.pop();
do_something(value);
}
在多线程调用过程中可能发生的问题:

- 线程2的处理的top元素不是预期的值
- s.pop的两次调用可能导致崩溃
- 推荐使用 std::stack 提供的 pop() 和 top() 的原子操作版本。
下面介绍一些其它的方法:
- 传入一个引用,作为想要的弹出值来进行使用。
std::vector<int> result;
some_stack.pop(result); - 无异常抛出的拷贝构造函数或则移动构造函数
- 返回指向弹出值的指针
这样可以方便自由拷贝,并且不会产生异常,缺点是返回一个指针需要对内存分配进行管理,对于简单数据类型,内存管理的开销远大于直接返回值。通常使用std::shared_ptr进行操作。 - 根据需要选择“1+2”或“1+3”.
A fleshed-out class definition for a thread-safe stack
// 实现了选项1和选项3:重载了pop(),使用一个局部引用去存储弹出值,并返回一个 std::shared_ptr<> 对象。
// 定义了空栈结构体来抛出异常
struct empty_stack : std::exception
{
const char* what() const throw() {
return "empty stack!";
};
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack()
: data(std::stack<T>()) {}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
// 在构造函数体中的执行拷贝
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
// 在调用pop前,检查栈是否为空
if (data.empty()) throw empty_stack();
// 在修改堆栈前,分配出返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
//上面的检查拷贝操作全部都加上了锁,虽然性能有所损耗,但是实现了线程级安全。
死锁:问题描述及解决方案
在C++多线程编程中,死锁是一个常见的问题。死锁通常发生在两个或更多的线程互相等待对方持有的资源,导致所有涉及的线程都无法继续执行。
死锁产生的主要原因:
- 嵌套锁:如果一个线程已经持有一个锁,然后试图获取第二个锁,这可能会导致死锁。特别是当两个线程以不同的顺序获取同一组锁时,就可能发生死锁。
- 调用用户提供的代码:在持有锁的情况下调用用户提供的代码可能会导致死锁,因为我们无法确定用户提供的代码中是否会尝试获取其他锁。
- 锁的顺序:如果多个线程需要获取多个锁,且每个线程获取锁的顺序不一致,那么可能会发生死锁。
避免死锁的方法:
- 避免嵌套锁:如果可能,应尽量避免在已经持有一个锁的情况下获取另一个锁。
- 使用
std::lock:C++标准库提供了std::lock函数,可以一次性获取多个锁,从而避免死锁。 - 固定锁的顺序:如果需要获取多个锁,应该让每个线程都以固定的顺序来获取锁。
- 避免在持有锁时调用用户提供的代码:在设计接口时,应尽量避免在持有锁的情况下需要调用用户提供的代码。
- 使用层次锁(Use a lock hierarchy): 通过定义一个锁的层次结构来确保线程以一致的顺序获取锁。这种策略可以在运行时检查是否遵守约定,从而避免死锁。
- 当需要等待一个线程结束时,为了避免死锁,可以采取以下策略
- 确定线程的层级:为每个线程分配一个唯一的层级标识,确保在等待比层次低的线程结束时才进行。这样可以避免循环等待条件。
a thread waits only for threads lower down the hierarchy - 确保线程的
join调用在启动它们的相同函数中进行:比如在主线程中启动的子线程应该在主线程中调用join或detach,以确保资源正确释放。
- 确定线程的层级:为每个线程分配一个唯一的层级标识,确保在等待比层次低的线程结束时才进行。这样可以避免循环等待条件。
stdlock_guard in a swap operation
#include <mutex>
class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X {
private:
//目标对象
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd) :some_detail(sd) {};
friend void swap(X& lhs, X& rhs)
{
if (&lhs == &rhs)
return;
// 调用 std::lock()锁住两个互斥变量
std::lock(lhs.m, rhs.m);
// std::adopt_lock 参数除了表示 std::lock_guard 对象可获取锁之外,还将锁交由 std::lock_guard 对象管理,而不需要std::lock_guard对象再去构建新的锁。
// 能保证在大多数情况下,函数退出时互斥量能被正确的解锁(保护操作可能会抛出一个异常).
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
swap(lhs.some_detail, rhs.some_detail);
}
};
Using a lock hierarchy to prevent deadlock
//使用层次锁来避免死锁, 可以结合下面的例子来看
hierarchical_mutex high_level_mutex(10000); //层次锁-10000
hierarchical_mutex low_level_mutex(5000); //层次锁-5000
int do_low_level_stuff();
int low_level_func()
{
//层级锁互斥
std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
return do_low_level_stuff();
}
void high_level_stuff(int some_param);
void high_level_func()
{
// 先激活高层次锁 high_level_mutex(10000)
std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
// 调用 low_level_func 激活次层次锁 low_level_mutex(5000)
high_level_stuff(low_level_func());
}
// 线程a遵守层级规则,所以运行没有问题
void thread_a()
{
high_level_func();
}
hierarchical_mutex other_mutex(100);
void do_other_stuff();
void other_stuff()
{
high_level_func();
do_other_stuff();
}
//线程b无视层级规则
void thread_b()
{
// 先激活低层次锁 other_mutex(100)
std::lock_guard<hierarchical_mutex> lk(other_mutex);
other_stuff(); // 低层次锁先锁定, 会造成high_level_func()的调用产生错误。导致程序意外终止
}
A simple hierarchical mutex
class hierarchical_mutex
{
//互斥信号量
std::mutex internal_mutex;
//接受的层级值
unsigned long const hierarchical_value;
//原来的层级值
unsigned long previous_hierarchy_value;
//静态指针,指向线程的层级值
static thread_local unsigned long this_thread_hierarchy_value;
void check_for_hierarchy_violation()
{
// 检查层级值保持递减的序列
if (this_thread_hierarchy_value <= hierarchical_value)
{
throw std::logic_error(“mutex hierarchy violated”);
}
}
void update_hierachy_value()
{
//更改层级值前,先将原来的层级值保存
previous_hierarchy_value = this_thread_hierarchy_value;
//更改当前的层级值
this_thread_hierarchy_value = hierarchical_value;
}
public:
explicit hierarchical_mutex(unsigned long value) :hierarchical_value(value), previous_hierarchy_value(0)
{}
~hierarchical_mutex();
//加锁函数
void lock()
{
//确认当前值小于新值
check_for_hierarchy_violation();
//当前线程加锁
internal_mutex.lock();
update_hierachy_value();
}
void unlock()
{
// 还原到原来的层级值 -- 从锁定顺序来看,还原是先还原到一个较小值
this_thread_hierarchy_value = previous_hierarchy_value;
internal_mutex.unlock();
}
bool try_lock()
{
check_for_hierarchy_violation();
if (!internal_mutex.try_lock())
{
return false;
}
update_hierachy_value();
return true;
}
};
// 直接将信号变量,初始化为最大值
thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
Info
thread_local
thread_local 是 C++11 引入的一种存储类型指定符,它用于定义线程局部变量。这意味着每个线程都拥有一份独立的变量副本,互不干扰。
在多线程编程中,thread_local 非常有用,可以避免因多个线程之间共享变量而导致的线程安全问题。
如果你在函数中定义了一个 thread_local 变量,那么每个线程都会有自己的这个变量的副本,每个副本都有自己的生命周期,当线程结束时,它的副本就会被销毁。
需要注意的是,thread_local 变量的初始化只在第一次线程调用时进行,这一点与静态变量类似。
此外,thread_local 变量的作用域仍然受到其定义位置的限制,例如,在函数内定义的 thread_local 变量在函数外部是不可见的.
:::success
Live Lock
活锁(Live Lock)是一种并发系统中可能发生的有趣情况:
在活锁中,进程并没有被阻塞,但它们不断改变状态以响应彼此。然而,尽管它们不断活动,但没有一个进程真正朝着完成任务的目标前进。
活锁的特点:
- 持续状态变化:活锁中的进程不断改变状态,通常是由于与其他进程的交互。
- 没有进展:尽管状态不断变化,但没有一个进程实际上在完成任务方面取得进展。
- 相互干扰:活锁发生在进程相互干扰状态的情况下,阻止了任何有意义的进展。
现实世界的类比:
- 把活锁想象成一个拥挤的走廊,人们不断试图让开让别人通过。然而,他们不断的挪动阻止了任何人真正前进,结果是一个有趣但无效的舞蹈。
:::
std::unique_lock– 灵活的锁
stdlock_guard 更灵活的锁管理.
创建一个 std::unique_lock 对象:
- 你可以通过将互斥量作为参数传入
std::unique_lock的构造函数来创建这个对象。此时,std::unique_lock对象会自动锁定这个互斥量。- 不传第二个参数也是自动锁定这个互斥量
- 如果你想在创建
std::unique_lock对象时不锁定互斥量,可以使用std::defer_lock作为第二个参数传入std::unique_lock的构造函数。 - 如果你想确保在创建
std::unique_lock对象之前互斥量已经被锁定,并让std::unique_lock对象接管这个已经被锁定的互斥量,可以使用std::adopt_lock作为第二个参数。 - https://en.cppreference.com/w/cpp/thread/unique_lock
请注意,如果在创建 std::unique_lock 对象时互斥量并未被锁定,而你使用 std::adopt_lock 作为第二个参数,那么这会导致未定义行为。
Create std::unique_lock object
// auto lock
std::mutex m;
std::unique_lock<std::mutex> lock(m);
// std::defer_lock
std::mutex m;
std::unique_lock<std::mutex> lock(m, std::defer_lock);
// ...
lock.lock(); // 在需要的时候锁定互斥量
// std::adopt_lock
std::mutex m;
m.lock(); // 先锁定互斥量
std::unique_lock<std::mutex> lock(m, std::adopt_lock); // 然后让 std::unique_lock 对象接管已经被锁定的互斥量
std::unique_lock 的其他特点:
- 解锁和再次锁定:std::unique_lock 对象支持 unlock() 和 lock() 成员函数,所以你可以在需要的时候解锁和再次锁定互斥量.
- 所有权转移:stdunique_lock 对象转移到另一个 std::unique_lock 对象。
- 与条件变量配合使用:stdcondition_variable 对象配合使用,以便在等待条件变量时自动解锁和重新锁定互斥量。
Using stdunique_lock in a swap operation
class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd) :some_detail(sd) {}
friend void swap(X& lhs, X& rhs)
{
if (&lhs == &rhs)
return;
// std::defer_lock leaves mutexes unlocked
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
// Mutexes are locked here
std::lock(lock_a, lock_b);
swap(lhs.some_detail, rhs.some_detail);
}
};
需要注意的是,stdunique_lock 对象比 stdlock_guard 能满足你的需求,那么建议继续使用 stdunique_lock。
锁的粒度
在多线程编程中,锁的粒度是一个重要的概念,它描述了通过一个锁保护的数据量大小。
细粒度锁(fine-grained lock)能够保护较小的数据量,而粗粒度锁(coarse-grained lock)能够保护较大的数据量。选择合适的锁粒度对于提高多线程程序的性能至关重要。
在多线程编程中,应该尽量减少锁的持有时间,只对需要保护的共享数据进行加锁,避免对锁外数据进行处理,尤其是避免执行费时的操作,如文件输入/输出操作。
保护共享数据的替代设施
stdcall_once 是C++11中引入的线程安全的函数和类型,用于确保某个函数只被调用一次。
- stdcall_once函数,以确保在多线程环境下仅仅执行一次
- stdonce_flag对象的引用。
- 该函数会检查stdonce_flag对象为已设置状态.
- stdonce_flag提供了一种简洁、高效的方式来确保在多线程环境下某个函数只被执行一次,从而避免了条件竞争的问题。这比显式使用互斥量消耗的资源更少,特别是当初始化完成后.
std::call_once
class X
{
private:
connection_info connection_details;
connection_handle connection;
std::once_flag connection_init_flag;
void open_connection()
{
connection = connection_manager.open(connection_details);
}
public:
X(connection_info const& connection_details_) :
connection_details(connection_details_)
{}
void send_data(data_packet const& data)
{
std::call_once(connection_init_flag, &X::open_connection, this);
connection.send_data(data);
}
data_packet receive_data()
{
std::call_once(connection_init_flag, &X::open_connection, this);
return connection.receive_data();
}
};
// 上面的例子确保open_connection在多线程环境下只执行一次
// 虚幻用例
/** Thread-safe call once helper, similar to std::call_once without the std::once_flag */
#define UE_CALL_ONCE(Func) static int32 ANONYMOUS_VARIABLE(ThreadSafeOnce) = ((Func)(), 1)
Warning
每个线程在调用同一个包含stdlock_guard的函数时都会在自己的栈上面创建独立的锁对象。
但是它们都尝试锁定同一个互斥体。因此,即使每个线程都有自己的锁对象,但在任何时候,只有一个线程能够成功锁定互斥体。