第四章 同步并发操作
第四章 同步并发操作
并发操作不同于数据的共享,如果用等待锁的方式来达到同步的目的,效率则不甚理想。本章介绍如何利用C++标准可提供的同步工具:条件变量(conditional variable)和期望(futures)。
等待事件或其他条件
condition_variable和condition_variable_any
C++头文件condition_variable中提供了condition_variable和condition_variable_any,前者只能配合mutex使用,而后者可以与任意符合互斥标准的类型使用,但会产生额外开销。
condition_variable 与 std::mutex 一起使用, 用于阻塞一个或多个线程,直到另一个线程修改condition的shared variable并通知 condition_variable。
https://en.cppreference.com/w/cpp/thread/condition_variable
意图修改shared variable的线程必须参考下列步骤:
- 获取一个 stdlock_guard**).
- 在锁定期间修改shared variable.
- 即使共享变量是原子的,也必须在拥有锁的情况下进行修改,以便正确地通知等待的线程。
- 在释放锁之后,对 std::condition_variable 调用 notify_one 或 notify_all。
意图等待 std::condition_variable 的线程参考下列步骤:
- 获取用于保护shared variable的互斥量的 std::unique_lock
- 执行以下操作之一:
- 检查条件,以防它已经被更新和通知。
- 对
std::condition_variable调用wait、wait_for或wait_until(原子地释放互斥量并暂停线程执行,直到条件变量被通知、超时到期或发生伪唤醒,然后原子地获取互斥量并在返回之前重新开始)。 - 检查条件并继续等待,如果条件不满足。
Info
伪唤醒(spurious wakeup),也被称为虚假唤醒,是指在多线程编程中,一个线程在等待某个条件变量时,即使没有其他线程对该条件变量进行信号操作,它也可能被唤醒。这种现象通常发生在使用条件变量进行线程间同步时。
伪唤醒的发生主要有以下几个原因:
- 系统调用的中断:当一个线程在等待一个条件变量时,如果发生了系统调用的中断,那么这个线程可能会被唤醒。这是因为系统调用的中断会导致线程从内核态切换到用户态,从而导致线程被唤醒。
- POSIX标准的允许:POSIX标准允许在没有接收到任何信号的情况下唤醒等待条件变量的线程。这种设计是为了防止一些复杂的死锁和竞态条件。
- 操作系统的优化:在某些情况下,操作系统可能会选择唤醒一个或多个等待条件变量的线程,以便更有效地利用系统资源。例如,当系统负载较高时,操作系统可能会选择唤醒一些线程,以便更好地平衡系统负载。
因此,为了避免由于伪唤醒导致的问题,通常的做法是在一个循环中等待条件变量,只有当条件真正满足时,线程才会继续执行。这种模式被称为"条件等待循环"或"条件等待模式".
sample
// 如果发生伪唤醒,cv.wait(lock)会返回,但是ready仍然为false,所以线程会再次进入等待状态。这样就可以确保只有在真正满足条件时线程才会继续执行。
std::unique_lock<std::mutex> lock(mtx);
while(!ready) {
cv.wait(lock);
}
// 如果条件不满足,wait函数会自动重新阻塞线程并等待。这样,即使发生伪唤醒,线程也会继续等待,直到条件满足。
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{return ready;}); // 使用lambda表达式检查条件
std::condition_variable
std::mutex mut; // 队列的互斥
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void thread_a()
{
while (more_data_to_prepare())
{
data_chunk const data = prepare_data();
{
std::lock_guard<std::mutex> lck(mut);
data_queue.push(data);
}
data_cond.notify_one(); // 在解锁互斥是调用notify
// data_cond.notify_all(); // 通知所有等待线程
}
}
void thread_b()
{
while (true)
{
std::unique_lock<std::mutex> lk(mut); // 这里不用lock_guard是因为要反复上/解锁
data_cond.wait( lk, [] {return !data_queue.empty(); });
data_chunk data = data_queue.front();
// do_something
data_queue.pop();
lk.unlock();
}
}
当一个线程调用condition_variable 的wait()函数时,它会检查提供的条件(通常是一个lambda函数)。
- 当条件满足, wait()函数将立即返回并继续持有锁.
- 如果条件不满足,wait()函数将解锁互斥量,并且将这个线程置于阻塞或等待状态。
- 这个解锁操作是为了让其他线程有机会获取互斥量的锁,因为互斥体只能被一个线程锁定。
- 当收到Notify通知后, 线程从睡眠状态中苏醒,重新获取互斥锁并且对条件再次检查.
线程在等待期间必须解锁互斥量,以便其他线程可以获取并修改共享数据。这是为了防止死锁和确保数据的一致性。
互斥量必须在等待期间解锁,并在之后再次上锁。这是stdlock_guard不具备这种灵活性。
使用条件变量构建线程安全的队列
queue
#include <mutex>
#include <queue>
#include <condition_variable>
template<typename T>
class threadsafe_queue {
private:
mutable std::mutex mut; // 必须指定mutable,针对const对象,准许其数据发生变动
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue(){}
threadsafe_queue(threadsafe_queue const& other){
std::lock_guard<std::mutex> lk(mut); // 此处复制构造则const对象也需要上锁
data_queue=other.data_queue;
}
void push(T new_value){
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& data){ // 返回智能指针的方式不再举例
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
data = data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> try_pop(){ // 以参数引用形式返回结果的方式省略
std:lock_guard<std::mutex> lk(mut);
if (data_queue.empty()) return std::shared_ptr<T>(); // 返回空指针
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
};
Info
mutable 从字面意思上来说,是可变的意思。
C++ mutable关键字主要有两个用途:
- mutable 用来修饰类的数据成员;被mutable修饰的数据成员,可以在 const 成员函数中修改
- 这样存在逻辑层面的const, 对一个常量实例来看,从外部观察,它是常量而不可修改;但是内部可以进行修改
- C++ 11 的Lamba表达式中,捕获变量可以按值捕获(Capture by Value: '=') – 不允许在lambda函数中修改捕获的变量,但是加上 mutable 修饰lamba函数,则可以打破这种限制
- int x{10}; auto fun = [=]() mutable { x = 42; } // ok, 创建一个函数类型的实例
使用std::future等待一次性事件
std::future 是 C++11 中引入的一个类,它提供了一种访问异步操作结果的机制, 用于表示一次性事件,如后台运行的计算结果。
- stdasync、stdpromise 创建。
- std::future可能有与之关联的数据(如线程或任务)相关联,也可能没有。一旦事件发生(future 变为就绪),future 就不能被重置。
- C++ 标准库<future>中有两种 future:唯一的 future(stdshared_future<>)。这些都是模仿 stdshared_ptr 的。
- stdshared_future 的多个实例可能引用同一个事件。
- 虽然 future 被用来在线程之间通信,但 future 对象本身不提供同步访问。如果多个线程需要访问单个 future 对象,它们必须通过互斥锁或其他同步机制保护访问。
获得future
stdasync 是一个函数模板,它启动一个异步任务,并返回一个 stdpromise** 对象可以保存某一类型 T 的值,该值可被 stdpromise 提供了一种线程同步的手段。
Create std::future
#include <future>
#include <iostream>
#include <thread>
int main()
{
// future from a packaged_task
std::packaged_task<int()> task([]{ return 7; }); // wrap the function
std::future<int> f1 = task.get_future(); // get a future
std::thread t(std::move(task)); // launch on a thread
// future from an async()
std::future<int> f2 = std::async(std::launch::async, []{ return 8; });
// future from a promise
std::promise<int> p;
std::future<int> f3 = p.get_future();
std::thread([&p]{ p.set_value_at_thread_exit(9); }).detach();
std::cout << "Waiting..." << std::flush;
f1.wait();
f2.wait();
f3.wait();
std::cout << "Done!\nResults are: "
<< f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';
t.join();
}
// output
Waiting...Done!
Results are: 7 8 9
wait函数都是用来等待异步操作结果的。如果异步操作的结果还没有准备好(即,提供者还没有设置其值或异常),这些函数会阻塞调用线程并等待,直到结果准备好为止。
wait系列函数提供了一种机制,允许你在需要结果之前做其他的事情,然后在结果准备好之后再继续处理。这对于并发编程非常有用,因为它允许你在等待结果的同时执行其他任务。
保存异常到future中
async与packaged_task运行的函数抛出异常时会保存在future对象中,promise用set_exception将异常保存。
Example with exceptions
#include <future>
#include <iostream>
#include <thread>
int main()
{
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t([&p]
{
try
{
// code that may throw
throw std::runtime_error("Example");
}
catch (...)
{
try
{
// store anything thrown in the promise
p.set_exception(std::current_exception());
}
catch (...) {} // set_exception() may throw too
}
});
try
{
std::cout << f.get();
}
catch (const std::exception& e)
{
std::cout << "Exception from the thread: " << e.what() << '\n';
}
t.join();
}
// output
Exception from the thread: Example
多个线程一起等待
std::future只支持一对一地在线程间传递。因为future在第一次get之后会发生移动操作,之后该值不复存在。
可以多次调用stdshared_future与std::future的主要区别。
std::shared_future
std::promise<int> p1;
auto f1 = p1.get_future();
assert(f1.valid()); // future对象有效
std::shared_future<int> sf1 = std::move(f1);
// std::shared_future<int> sf1(p1.get_future()); // 隐式转换
assert(!f1.valid()); // f1 future对象不再有效
assert(sf1.valid()); // shared_future生效
std::promise<int> p2;
auto sf2 = p2.get_future().share(); // 上述写法的语法糖
限时等待
在C++的<chrono>库提供了一系列处理时间的类型和函数。这个库主要包含三种类型:时钟(Clocks)、时间点(Time points)和时长(Durations)。
- 时钟(Clocks):时钟有一个起始点(或称为纪元)和一个滴答率。
- 纪元(Epoch):纪元是指时钟的起始点。
- 例如,stdsystem_clock的纪元通常被设置为1970年1月1日。
- 这意味着,当我们从stdsystem_clock获取当前时间时,我们实际上获取的是从1970年1月1日到现在的时间。
- 滴答率(Tick rate):滴答率是指时钟的时间单位。
- 例如,如果一个时钟的滴答率是每秒一次,那么这个时钟的时间单位就是1秒。
- 滴答率决定了时钟的精度。例如,stdhigh_resolution_clock通常具有非常高的滴答率,可以提供高精度的时间测量。
- C++定义了几种时钟类型,如system_clock(系统时钟)、steady_clock(稳定时钟)和high_resolution_clock(高分辨率时钟)。
- 纪元(Epoch):纪元是指时钟的起始点。
- 时间点(Time points):时间点是指自特定时钟的纪元以来已经过去的时长。
- 例如,stdsystem_clock::now()返回的就是一个时间点,表示从system_clock的纪元(1970年1月1日)到现在的时长。
- 时长(Durations):时长表示一段时间间隔,由一定数量的滴答组成,每个滴答代表一定的时间单位。
- 例如,“42秒”可以由一个时长表示,该时长由42个滴答组成,每个滴答代表1秒。
C++有两种超时机制可供选择:
- 延迟超时(duration-based timeout):这种超时机制是指线程会根据指定的时长进行等待。
- 例如,使用stdsleep_for函数可以让当前线程暂停一段时间。
- 另外,stdfuture的wait_for函数也可以用于实现带有超时的等待。
- 绝对超时(absolute timeout):这种超时机制是指线程会一直等待到某个特定的时间点。
- 例如,使用stdsleep_until函数可以让当前线程暂停到某个时间点。
- 另外,stdfuture的wait_until函数也可以用于实现到某个时间点的等待。
时钟类
时钟类提供四个关键信息:
- 当前时刻。调用其静态函数**now()**即可获得。
- 时间值的类型。每个时钟类都有名为time_point的成员类型作为时间点类。
- 该时钟计时单元长度。属于period的成员类型,表示为stdratio<5,2>表示每5秒计时2次。
- 计时速率是否恒定。静态数据成员is_steady。
时长类
stdduration<>,含两个模板参数,前者表示计时单元数量类型,后者是计时单元长度。如:
Durations Sample
// 表示10分钟(10*60sec)
std::chrono::duraion<short, std::ratio<60, 1>> d = std::chrono::duraion<short, std::ratio<60, 1>>(10);
std::chrono中提供了一些预设时长类的typedef声明,包括纳秒(nanoseconds),微秒(miscroseconds),毫秒(milliseconds),秒(seconds),分钟(minutes),小时(hours)。
C++14引入了stdchronochrono::duration_cast<>。
Durations Sample
auto d = 3700s;
std::chrono::milliseconds d2 = d; // 隐式转化
std::chrono::minutes d3 = d; // 编译报错
std::chrono::minutes d4 = std::chrono::duration_cast<std::chrono::minutes>(d); // 显式转换,截断为61min
时长类支持算术预算,时长类或者乘/除数值,或者两个时长类加减得到一个新的时长。计时单元的数量使用成员函数**count()**获得。
Durations Sample
// 如果超时,返回std::future_status::timeout, 如果future延后,返回std::future_status::deferred
auto f = std::async(some_task);
if(f.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready){
process(f.get());
}
时间点类
stdtime_point<>,含两个模板参数,前者表示时钟类,后者是计时单元。如:
time_point Sample
// 起始时间不同的时钟有不同定义
std::chrono::time_point<std::chrono::system_clock, std::chrono::nanoseconds> tp1 = std::chrono::system_clock::now();
如果时间点使用同一个时钟,时间点相减即可得到时长。时间点用于处理绝对超时。
time_point Sample
auto const timeout =
std::chrono::steady_clock::now() + std::chrono::milliseconds(500);
std::unique_lock<std::mutex> lk(m);
while (!done) {
if (cv.wait_until(lk, timeout) == std::cv_status::timeout) break; // 通过不断循环处理伪唤醒
}
接受超时时限的函数
普通的mutex和recursive_mutex不支持限时加锁,但timed_mutex和recursive_timed_mutex支持。
主要函数wait_for/try_lock_for对应时长,wait_until/try_lock_until对应时间点。
利用同步操作简化代码
函数式编程
函数式编程(functional programming)是一种编程风格,函数结果只依赖于传入函数的参数,并不依赖外部状态。
当共享数据没有被修改,那么就不存在条件竞争. 使用互斥量去保护共享数据。这在并发编程系统中可以提高效率。
一个“期望”对象可以在线程间互相传递,并允许其中一个计算结果依赖于另外一个的结果,而非对共享数据的显式访问。
在C++中,我们可以使用lambda表达式和std::function来实现函数式编程。
sample
#include <iostream>
#include <functional>
void applyFunction(int x, int y, std::function<int(int, int)> func) {
std::cout << "Result: " << func(x, y) << std::endl;
}
int main() {
auto add = { return a + b; };
applyFunction(5, 3, add);
return 0;
}
CSP(通信式串行线程)
CSP是一种并发编程模型,其中线程通过管道传递消息进行通信,而不是共享数据。
例如,我们可以创建一个生产者线程和一个消费者线程,它们通过一个队列进行通信。
sample
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
std::queue<int> queue;
std::mutex mtx;
std::condition_variable cv;
void producer() {
for (int i = 0; i < 10; ++i) {
std::lock_guard<std::mutex> lock(mtx);
queue.push(i);
cv.notify_all();
}
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{ return !queue.empty(); });
std::cout << "Consumer got: " << queue.front() << std::endl;
queue.pop();
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
总结
线程间的同步操作是并发编程的重要部分:如果没有同步,线程本质上是独立的,可以编写为单独的应用。因其任务之间的相关性,它们可作为一个群体直接执行。