第六章 基于锁的并发数据结构
第六章 基于锁的并发数据结构
线程安全的数据结构
线程安全(thread-safe)的数据结构意味着多个线程可以同时访问数据结构,执行相同或不同的操作,每个线程都会看到数据结构的一致视图(self-consistent view)。
它可以保证在并发访问时不会有数据丢失或损坏,所有的不变性都将被维护,也不会有问题的竞态条件。
序列化 Serialization
"序列化"在并发编程中,是指通过某种机制(如互斥锁)确保多个线程不能同时访问某个数据结构或资源。
这意味着,尽管有多个线程可能试图同时访问,但在任何给定的时间点,只有一个线程被允许访问。
这就确保了在任何时候,只有一个线程可以看到数据结构的一致状态,从而防止了数据的混乱和不一致。
但是,这种方法也可能导致性能下降,因为线程必须等待锁。
因此,设计并发数据结构时,需要尽量减少需要序列化的操作,以提高并发性和性能。
支持并发的数据结构设计指南
设计并发数据结构要考虑两点,一个确保线程安全(thread-safe), 二是提高并发度。
- 确保****线程安全的基本要求如下
- 数据结构的不变量(invariant)被一个线程破坏时,确保不被其他线程看到此状态。
- 提供操作完整的函数来避免数据结构接口中固有的条件竞争(race condition)。
- 注意数据结构出现异常时的行为,以确保不变量不被破坏。
- 限制锁的范围,避免可能的嵌套锁,最小化死锁的概率。
- 作为数据结构的设计者,要提高数据结构的并发度,可以从以下角度考虑:
- 部分操作是否能在锁的范围外执行?
- 数据机构的不同部分是否被不同的 mutex 保护
- 是否所有操作需要同级别的保护
- 在不影响操作语义的前提下,能否对数据结构做简单的修改提高并发度
- 总计:最小化线程对数据结构的轮流访问,最大化真实的并发量
基于锁的并发数据结构
thread-safe stack
// A class definition for a thread-safe stack
#include <exception>
struct empty_stack : std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
std::shared_ptr<T> const res(
std::make_shared<T>(std::move(data.top())));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = std::move(data.top());
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
thread-safe queue(shared_ptr)
// A thread-safe queue holding std::shared_ptr<> instances
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T> > data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = std::move(*data_queue.front());
data_queue.pop();
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = std::move(*data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
std::shared_ptr<T> res = data_queue.front();
data_queue.pop();
return res;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res = data_queue.front();
data_queue.pop();
return res;
}
void push(T new_value)
{
std::shared_ptr<T> data(
std::make_shared<T>(std::move(new_value)));
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
Info
使用 std::shared_ptr<>的优势
- 从队列中检索实例:返回std::shared_ptr<>实例的pop函数可以在返回给调用者之前直接从队列中检索它。
- 在锁外分配新实例:内存分配是相当昂贵的操作,放到锁外对于队列的性能可能非常有益,因为它减少了持有互斥体的时间,允许其他线程在此期间对队列执行操作。
thread-safe queue(fine-grained locks and condition variables)
上面实现的 thread-safe stack 和 queue 都是用一把锁定保护整个数据结构,这限制了并发性,多线程在成员函数中阻塞时,同一时间只有一个线程能工作。
这种限制主要是因为内部实现使用的是 std::queue,为了支持更高的并发,需要更换内部的实现方式,使用细粒度的(fine-grained)锁。
最简单的实现方式是包含头尾指针的单链表,不考虑并发的单链表实现如下:
// A simple single-threaded queue implementation
template<typename T>
class Queue
{
public:
Queue() = default;
Queue(const Queue&) = delete;
Queue& operator=(const Queue&) = delete;
// 使用引用避免额外调用拷贝函数
void push(T& x)
{
// 通过移动语义转移X的数据
auto new_node = std::make_unique<Node>(std::move(x));
Node* new_tail_node = new_node.get();
if (tail)
{
// operate =(...) for move semantics
tail->next = std::move(new_node);
}
else
{
head = std::move(new_node);
}
tail = new_tail_node;
}
std::shared_ptr<T> try_pop()
{
if (!head)
{
return nullptr;
}
auto res = std::make_shared<T>(std::move(head->v));
std::unique_ptr<Node> head_node = std::move(head);
head = std::move(head_node->next);
return res;
}
private:
struct Node
{
explicit Node(T x) : v(std::move(x)) {}
T v;
std::unique_ptr<Node> next;
};
std::unique_ptr<Node> head;
Node* tail = nullptr;
};
对于上面的模块队列代码,就算使用两个 mutex 分别保护头尾指针,在多线程下也存在明显问题。
push 可以同时修改头尾指针,会对两个 mutex 上锁,另外仅有一个元素时头尾指针相等,push 写和 try_pop 读的 next 节点是同一对象,产生了竞争,锁的也是同一个 mutex。
启动数据分离来实现并发(Enabling Concurrency by separating data)
在头节点前初始化一个 空(dummy)节点当占位符,避免并发队列中的空指针问题。这样 push 只访问尾节点,不会再与 try_pop 竞争头节点.
Preallocate a Dummy Node:
- 创建一个没有实际数据的空节点,确保队列中始终至少有一个节点。
- 头指针和尾指针在初始化的时候都指向这个空节点(而不是nullptr), 确保队列为空时也是
- 这种方法确保了即使队列为空,它依然是良定义的。
Handling an Empty Queue:
- 当队列为空时,try_pop() 不会访问 head->next。
- 由于空节点始终存在,不存在解引用空指针的风险。
- 空节点充当哨兵,防止 head->next 上的条件竞争。
Adding a Real Node:
- 当向队列中添加一个实际的带有数据的节点时,头和尾将指向不同的节点。
- 这消除了访问 head->next 或 tail->next 时潜在的条件竞争。
// A simple queue with a dummy node
#include <memory>
#include <utility>
template <typename T>
class Queue {
public:
Queue() : head_(new Node), tail_(head_.get()) {}
Queue(const Queue&) = delete;
Queue& operator=(const Queue&) = delete;
// push函数只针对尾指针, 这里没有处理额外拷贝, 可以优化
void push(T x) {
// 通过移动语义转移x的数据
auto new_val = std::make_shared<T>(std::move(x));
// 创建新的空节点
auto new_node = std::make_unique<Node>();
Node* new_tail_node = new_node.get();
// 更新尾指针指向空节点的数据, 并移动到新的空节点
tail_->v = new_val;
tail_->next = std::move(new_node);
tail_ = new_tail_node;
}
// pop函数只针对头指针
std::shared_ptr<T> try_pop() {
// 空队列
if (head_.get() == tail_)
{
return nullptr;
}
std::shared_ptr<T> res = head_->v;
// 转移头指针用于离开函数后销毁节点
std::unique_ptr<Node> head_node = std::move(head_);
head_ = std::move(head_node->next);
return res;
}
private:
struct Node {
std::shared_ptr<T> v;
std::unique_ptr<Node> next;
};
std::unique_ptr<Node> head_;
Node* tail_ = nullptr;
};
支持多线程的版本(锁的范围应该尽可能小)
// A thread-safe queue with fine-grained locking
template<typename T>
class threadsafe_queue
{
public:
threadsafe_queue(): head(new node), tail(head.get()) {}
threadsafe_queue(const threadsafe_queue& other) = delete;
threadsafe_queue& operator=(const threadsafe_queue& other) = delete;
std::shared_ptr<T> TryPop()
{
std::unique_ptr<Node> old_head = PopHead();
return old_head ? old_head->data : std::shared_ptr<T>();
}
void Push(T new_value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
std::unique_ptr<Node> p(new Node);
Node* const new_tail = p.get();
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data = new_data;
tail->next = std::move(p);
tail = new_tail;
}
private:
struct Node
{
std::shared_ptr<T> data;
std::unique_ptr<Node> next;
};
std::mutex head_mutex;
std::unique_ptr<Node> head;
std::mutex tail_mutex;
Node* tail;
Node* GetTail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<Node> PopHead()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if (head.get() == GetTail())
{
return nullptr;
}
std::unique_ptr<Node> old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}
};
- push 中创建新值和新节点都没上锁,多线程可用并发创建新值和新节点。虽然同时只有一个线程能添加新节点,但这只需要一个指针赋值操作,锁住尾节点的时间很短.
- try_pop 中对尾节点只是用来做一次比较,持有尾节点的时间同样很短,因此 try_pop 和 push 几乎可以同时调用。
- try_pop 中锁住头节点所做的也只是指针赋值操作,开销较大的析构在锁外进行,这意味着虽然同时只有一个线程能 pop_head,但允许多线程删除节点并返回数据,提升了 try_pop 的并发调用数量
结合 std::condition_variable 实现 wait_and_pop,即得到与之前接口相同但并发度更高的 thread-safe queue, 参考如下:
// A thread-safe queue with locking and waiting
#include <condition_variable>
#include <memory>
#include <mutex>
#include <utility>
template <typename T>
class ConcurrentQueue {
public:
ConcurrentQueue() : head_(new Node), tail_(head_.get()) {}
ConcurrentQueue(const ConcurrentQueue&) = delete;
ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;
void push(T x) {
auto new_val = std::make_shared<T>(std::move(x));
auto new_node = std::make_unique<Node>();
Node* new_tail_node = new_node.get();
{
std::lock_guard<std::mutex> l(tail_mutex_);
tail_->v = new_val;
tail_->next = std::move(new_node);
tail_ = new_tail_node;
}
cv_.notify_one();
}
std::shared_ptr<T> try_pop() {
std::unique_ptr<Node> head_node = try_pop_head();
return head_node ? head_node->v : nullptr;
}
bool try_pop(T& res) {
std::unique_ptr<Node> head_node = try_pop_head(res);
return head_node != nullptr;
}
std::shared_ptr<T> wait_and_pop() {
std::unique_ptr<Node> head_node = wait_pop_head();
return head_node->v;
}
void wait_and_pop(T& res) { wait_pop_head(res); }
bool empty() const {
std::lock_guard<std::mutex> l(head_mutex_);
return head_.get() == get_tail();
}
private:
struct Node {
std::shared_ptr<T> v;
std::unique_ptr<Node> next;
};
private:
std::unique_ptr<Node> try_pop_head() {
std::lock_guard<std::mutex> l(head_mutex_);
if (head_.get() == get_tail()) {
return nullptr;
}
return pop_head();
}
std::unique_ptr<Node> try_pop_head(T& res) {
std::lock_guard<std::mutex> l(head_mutex_);
if (head_.get() == get_tail()) {
return nullptr;
}
res = std::move(*head_->v);
return pop_head();
}
std::unique_ptr<Node> wait_pop_head() {
std::unique_lock<std::mutex> l(wait_for_data());
return pop_head();
}
std::unique_ptr<Node> wait_pop_head(T& res) {
std::unique_lock<std::mutex> l(wait_for_data());
res = std::move(*head_->v);
return pop_head();
}
std::unique_lock<std::mutex> wait_for_data() {
std::unique_lock<std::mutex> l(head_mutex_);
cv_.wait(l, [this] { return head_.get() != get_tail(); });
return l;
}
std::unique_ptr<Node> pop_head() {
std::unique_ptr<Node> head_node = std::move(head_);
head_ = std::move(head_node->next);
return head_node;
}
Node* get_tail() {
std::lock_guard<std::mutex> l(tail_mutex_);
return tail_;
}
private:
std::unique_ptr<Node> head_;
Node* tail_ = nullptr;
std::mutex head_mutex_;
mutable std::mutex tail_mutex_;
std::condition_variable cv_;
};
thread-safe map
- 并发访问 std::map 和 std::unordered_map 的接口的问题在于迭代器,其他线程删除元素时会导致迭代器失效,因此 thread-safe map 的接口设计就要跳过迭代器
- 为了使用细粒度锁,就不应该使用标准库容器。可选的关联容器数据结构有三种,一是二叉树(如红黑树),但每次查找修改都要从访问根节点开始,也就表示根节点需要上锁,尽管沿着树向下访问节点时会解锁,但这个比起覆盖整个数据结构的单个锁好不了多少
- 第二种方式是有序数组,这比二叉树还差,因为无法提前得知一个给定的值应该放在哪,于是同样需要一个覆盖整个数组的锁
- 第三种方式是哈希表。假如有一个固定数量的桶,一个 key 属于哪个桶取决于 key 的属性和哈希函数,这意味着可以安全地分开锁住每个桶。如果使用读写锁,就能将并发度提高相当于桶数量的倍数
:::success
Hash 表
- 关键数据和哈希函数:
- 在哈希表中,关键数据(Key)是最重要的部分。哈希函数将这些关键数据映射到哈希表中的特定位置,即哈希桶。
- 好的哈希函数应该使不同的关键数据映射到不同的桶,以减少冲突。
- 关于关键数据的唯一性:
- 关键数据不一定是唯一的。在哈希表中,不同的关键数据可能映射到相同的哈希桶,这就是哈希冲突。
- 处理哈希冲突的方法是使用不同的策略,例如开放地址法或链地址法。
- 双哈希探测:
- 双哈希探测是一种解决哈希冲突的技术。
- 为了确保固定的步长能够遍历到所有位置,我们通常使用一个质数作为第二个哈希函数。
- 这样,即使发生冲突,我们可以通过不断探测下一个位置来找到新的插入位置。
因此,当使用关键数据(Key)在哈希表(Hash Table)中查找数据时,该Key不仅被用作生成哈希键(Hash Key)的依据,还在出现哈希冲突时作为检查是否插入值的依据。
这样,可以确保在存在冲突的情况下,能够正确地识别并访问目标数据。
:::
// A thread-safe lookup table
#include <algorithm>
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <utility>
#include <vector>
template <typename K, typename V, typename Hash = std::hash<K>>
class ConcurrentMap {
public:
// 桶数默认为 19(一般用 x % 桶数作为 x 的桶索引,桶数为质数可使桶分布均匀)
ConcurrentMap(std::size_t n = 19, const Hash& h = Hash{})
: buckets_(n), hasher_(h) {
for (auto& x : buckets_) {
x.reset(new Bucket);
}
}
ConcurrentMap(const ConcurrentMap&) = delete;
ConcurrentMap& operator=(const ConcurrentMap&) = delete;
V get(const K& k, const V& default_value = V{}) const {
return get_bucket(k).get(k, default_value);
}
void set(const K& k, const V& v) { get_bucket(k).set(k, v); }
void erase(const K& k) { get_bucket(k).erase(k); }
// 为了方便使用,提供一个到 std::map 的映射
std::map<K, V> to_map() const {
std::vector<std::unique_lock<std::shared_mutex>> locks;
for (auto& x : buckets_) {
locks.emplace_back(std::unique_lock<std::shared_mutex>(x->m));
}
std::map<K, V> res;
for (auto& x : buckets_) {
for (auto& y : x->data) {
res.emplace(y);
}
}
return res;
}
private:
struct Bucket {
std::list<std::pair<K, V>> data;
mutable std::shared_mutex m; // 每个桶都用这个锁保护
V get(const K& k, const V& default_value) const {
// 没有修改任何值,异常安全
std::shared_lock<std::shared_mutex> l(m); // 只读锁,可共享
auto it = std::find_if(data.begin(), data.end(),
[&](auto& x) { return x.first == k; });
return it == data.end() ? default_value : it->second;
}
void set(const K& k, const V& v) {
std::unique_lock<std::shared_mutex> l(m); // 写,单独占用
auto it = std::find_if(data.begin(), data.end(),
[&](auto& x) { return x.first == k; });
if (it == data.end()) {
data.emplace_back(k, v); // emplace_back 异常安全
} else {
it->second = v; // 赋值可能抛异常,但值是用户提供的,可放心让用户处理
}
}
void erase(const K& k) {
std::unique_lock<std::shared_mutex> l(m); // 写,单独占用
auto it = std::find_if(data.begin(), data.end(),
[&](auto& x) { return x.first == k; });
if (it != data.end()) {
data.erase(it);
}
}
};
Bucket& get_bucket(const K& k) const { // 桶数固定因此可以无锁调用
return *buckets_[hasher_(k) % buckets_.size()];
}
private:
std::vector<std::unique_ptr<Bucket>> buckets_;
Hash hasher_;
};
thread-safe list
// A thread-safe list with iteration support
#include <memory>
#include <mutex>
#include <utility>
template <typename T>
class ConcurrentList {
public:
ConcurrentList() = default;
~ConcurrentList() {
remove_if([](const Node&) { return true; });
}
ConcurrentList(const ConcurrentList&) = delete;
ConcurrentList& operator=(const ConcurrentList&) = delete;
void push_front(const T& x) {
std::unique_ptr<Node> t(new Node(x));
std::lock_guard<std::mutex> head_lock(head_.m);
t->next = std::move(head_.next);
head_.next = std::move(t);
}
template <typename F>
void for_each(F f) {
Node* cur = &head_;
std::unique_lock<std::mutex> head_lock(head_.m);
while (Node* const next = cur->next.get()) {
std::unique_lock<std::mutex> next_lock(next->m);
head_lock.unlock(); // 锁住了下一节点,因此可以释放上一节点的锁
f(*next->data);
cur = next; // 当前节点指向下一节点
head_lock = std::move(next_lock); // 转交下一节点锁的所有权,循环上述过程
}
}
template <typename F>
std::shared_ptr<T> find_first_if(F f) {
Node* cur = &head_;
std::unique_lock<std::mutex> head_lock(head_.m);
while (Node* const next = cur->next.get()) {
std::unique_lock<std::mutex> next_lock(next->m);
head_lock.unlock();
if (f(*next->data)) {
return next->data; // 返回目标值,无需继续查找
}
cur = next;
head_lock = std::move(next_lock);
}
return nullptr;
}
template <typename F>
void remove_if(F f) {
Node* cur = &head_;
std::unique_lock<std::mutex> head_lock(head_.m);
while (Node* const next = cur->next.get()) {
std::unique_lock<std::mutex> next_lock(next->m);
if (f(*next->data)) { // 为 true 则移除下一节点
std::unique_ptr<Node> old_next = std::move(cur->next);
cur->next = std::move(next->next); // 下一节点设为下下节点
next_lock.unlock();
} else { // 否则继续转至下一节点
head_lock.unlock();
cur = next;
head_lock = std::move(next_lock);
}
}
}
private:
struct Node {
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<Node> next;
Node() = default;
Node(const T& x) : data(std::make_shared<T>(x)) {}
};
Node head_;
};