std::thread的同步机制
在 C++ 中,std::thread 用于创建和管理线程。为了确保多个线程能正确、安全地访问共享资源,避免数据竞争和不一致问题,需要使用同步机制。
互斥锁(std::mutex)
原理:互斥锁是一种最基本的同步原语,用于保护共享资源。同一时间只允许一个线程访问被互斥锁保护的代码段,其他线程必须等待该线程释放锁后才能继续访问。
#include <iostream>
#include <thread>
#include <mutex>std::mutex mtx;
int shared_variable = 0;void increment() {for (int i = 0; i < 100000; ++i) {std::lock_guard<std::mutex> lock(mtx);++shared_variable;}
}int main() {std::thread t1(increment);std::thread t2(increment);t1.join();t2.join();std::cout << "Shared variable: " << shared_variable << std::endl;return 0;
}
std::lock_guard 是一个 RAII(资源获取即初始化)类,它在构造时自动锁定互斥锁 mtx,在析构时自动解锁。这样可以确保对 shared_variable 的访问是线程安全的。
increment()函数运行结束,锁就会被释放。
递归互斥锁(std::recursive_mutex)
原理:递归互斥锁允许同一线程多次锁定该互斥锁,而不会导致死锁。当线程第一次锁定递归互斥锁时,它会记录锁定的次数,每次解锁时,锁定次数减 1,直到锁定次数为 0 时,互斥锁才真正被释放。
#include <iostream>
#include <thread>
#include <mutex>std::recursive_mutex rmtx;void recursive_function(int n) {std::lock_guard<std::recursive_mutex> lock(rmtx);if (n > 0) {std::cout << "Recursive call: " << n << std::endl;recursive_function(n - 1);}
}int main() {std::thread t(recursive_function, 5);t.join();return 0;
}
在 recursive_function 中,同一线程可以多次锁定 rmtx,而不会导致死锁。
创建了一个新线程,该线程会执行 recursive_function 函数,并将 5 作为参数传递给它
定时互斥锁(std::timed_mutex 和 std::recursive_timed_mutex)
原理:定时互斥锁允许线程在尝试锁定互斥锁时设置一个超时时间。如果在超时时间内未能锁定互斥锁,线程可以继续执行其他任务,而不会一直等待。
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>std::timed_mutex tmtx;void try_lock_with_timeout() {if (tmtx.try_lock_for(std::chrono::milliseconds(500))) {std::cout << "Locked the mutex." << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));tmtx.unlock();} else {std::cout << "Failed to lock the mutex within the timeout." << std::endl;}
}int main() {std::thread t(try_lock_with_timeout);t.join();return 0;
}
在 try_lock_with_timeout 函数中,线程尝试在 500 毫秒内锁定 tmtx。如果成功锁定,则执行相应的操作并解锁;如果超时未能锁定,则输出失败信息。
条件变量(std::condition_variable)
原理:条件变量用于线程间的等待 - 通知机制。一个线程可以等待某个条件成立,而另一个线程可以在条件成立时通知等待的线程。条件变量通常与互斥锁一起使用,以确保线程安全。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>std::queue<int> dataQueue;
std::mutex mtx;
std::condition_variable cv;
bool ready = false;void consumer() {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, []{ return ready; });while (!dataQueue.empty()) {std::cout << "Consumed: " << dataQueue.front() << std::endl;dataQueue.pop();}
}void producer() {for (int i = 0; i < 5; ++i) {std::this_thread::sleep_for(std::chrono::seconds(1));{std::unique_lock<std::mutex> lock(mtx);dataQueue.push(i);std::cout << "Produced: " << i << std::endl;}}{std::unique_lock<std::mutex> lock(mtx);ready = true;}cv.notify_one();
}int main() {std::thread t1(producer);std::thread t2(consumer);t1.join();t2.join();return 0;
}
consumer 线程等待 ready 条件成立,producer 线程在生产完所有数据后将 ready 设置为 true,并通知 consumer 线程。cv.wait 会自动释放互斥锁,直到条件成立时再重新锁定互斥锁。
原子操作(std::atomic)
原理:原子操作是一种无锁的同步机制,用于对基本数据类型进行原子读写操作。原子操作保证了操作的不可分割性,避免了数据竞争。
#include <iostream>
#include <thread>
#include <atomic>std::atomic<int> atomic_variable(0);void increment_atomic() {for (int i = 0; i < 100000; ++i) {++atomic_variable;}
}int main() {std::thread t1(increment_atomic);std::thread t2(increment_atomic);t1.join();t2.join();std::cout << "Atomic variable: " << atomic_variable << std::endl;return 0;
}
std::atomic 定义了一个原子整数变量 atomic_variable。对 atomic_variable 的自增操作是原子的,不需要使用互斥锁来保护。
std::thread的future
在 C++ 中,std::thread 用于创建和管理线程,而 std::future 是 C++ 标准库中用于异步操作的一个重要组件,它可以与 std::thread 结合使用来获取异步任务的结果。
std::future 概述
std::future 是一个模板类,定义在 头文件中。它提供了一种机制,允许一个线程等待另一个线程的异步操作结果。当一个异步操作启动时,会返回一个 std::future 对象,通过该对象可以在需要的时候获取异步操作的返回值。
std::thread 结合使用的基本步骤
通常情况下,不直接将 std::future 与 std::thread 结合,而是使用 std::async 来创建异步任务并返回 std::future 对象。不过,也可以手动模拟类似的机制。
手动模拟(结合 std::promise)
std::promise 是一个用于存储值或异常的对象,它可以与 std::future 关联起来,std::promise 设置的值可以通过与之关联的 std::future 获取
#include <iostream>
#include <thread>
#include <future>// 线程函数
void task(std::promise<int>& prom) {// 模拟一些耗时操作std::this_thread::sleep_for(std::chrono::seconds(2));int result = 42;// 设置 promise 的值prom.set_value(result);
}int main() {// 创建一个 promise 对象std::promise<int> prom;// 获取与 promise 关联的 future 对象std::future<int> fut = prom.get_future();// 创建线程并传入 promise 对象std::thread t(task, std::ref(prom));// 等待异步任务完成并获取结果int value = fut.get();std::cout << "The result is: " << value << std::endl;// 等待线程结束t.join();return 0;
}
std::promise prom;:创建一个 std::promise 对象,用于存储一个 int 类型的值。
std::future fut = prom.get_future();:通过 promise 的 get_future 方法获取与之关联的 std::future 对象,用于获取异步操作的结果。
std::thread t(task, std::ref(prom));:创建一个新线程,将 task 函数作为线程函数,并将 promise 对象的引用传递给它。
int value = fut.get();:调用 future 的 get 方法,该方法会阻塞当前线程,直到异步任务完成并设置了 promise 的值,然后返回该值。
t.join();:等待线程结束,确保资源正确释放。
使用 std::async
std::async 是一个更方便的异步操作启动函数,它会自动管理线程和 std::future 对象。
#include <iostream>
#include <future>// 异步任务函数
int asyncTask() {std::this_thread::sleep_for(std::chrono::seconds(2));return 42;
}int main() {// 启动异步任务并获取 future 对象std::future<int> fut = std::async(std::launch::async, asyncTask);// 等待异步任务完成并获取结果int value = fut.get();std::cout << "The result is: " << value << std::endl;return 0;
}
std::future fut = std::async(std::launch::async, asyncTask);:调用 std::async 函数启动一个异步任务,std::launch::async 表示立即启动一个新线程来执行任务,asyncTask 是要执行的任务函数。std::async 会返回一个 std::future 对象,用于获取任务的结果。
int value = fut.get();:调用 future 的 get 方法,阻塞当前线程直到任务完成并返回结果。
std::future 的主要方法
get():用于获取异步操作的结果。如果异步操作尚未完成,调用该方法会阻塞当前线程,直到结果可用。
wait():等待异步操作完成,但不获取结果。该方法会阻塞当前线程,直到异步操作结束。
wait_for():等待异步操作在指定的时间内完成。如果在指定时间内操作完成,返回 std::future_status::ready;如果超时,返回 std::future_status::timeout;如果操作尚未开始,返回 std::future_status::deferred。
wait_until():等待异步操作直到指定的时间点。返回值与 wait_for 类似。
get() 方法示例
get() 方法用于获取异步操作的结果,若操作未完成,调用线程会被阻塞,直至结果可用。
#include <iostream>
#include <future>
#include <thread>// 模拟一个耗时的异步任务
int asyncTask() {std::this_thread::sleep_for(std::chrono::seconds(2));return 42;
}int main() {// 使用 std::async 启动异步任务并获取 std::future 对象std::future<int> fut = std::async(std::launch::async, asyncTask);std::cout << "Waiting for the result..." << std::endl;// 调用 get() 方法获取结果,若任务未完成会阻塞int result = fut.get();std::cout << "The result of the async task is: " << result << std::endl;return 0;
}
std::async(std::launch::async, asyncTask) 启动一个新线程执行 asyncTask 函数,并返回一个 std::future 对象 fut。
fut.get() 调用会阻塞主线程,直到 asyncTask 完成并返回结果,然后将结果赋值给 result 变量。
wait() 方法示例
wait() 方法用于等待异步操作完成,但不获取结果,调用线程会被阻塞直到操作结束。
#include <iostream>
#include <future>
#include <thread>// 模拟一个耗时的异步任务
void asyncTask() {std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "Async task is done." << std::endl;
}int main() {// 使用 std::async 启动异步任务并获取 std::future 对象std::future<void> fut = std::async(std::launch::async, asyncTask);std::cout << "Waiting for the async task to complete..." << std::endl;// 调用 wait() 方法等待任务完成fut.wait();std::cout << "The async task has completed." << std::endl;return 0;
}
std::async(std::launch::async, asyncTask) 启动一个新线程执行 asyncTask 函数,并返回一个 std::future 对象 fut。
fut.wait() 调用会阻塞主线程,直到 asyncTask 完成。
wait_for() 方法示例
wait_for() 方法用于在指定的时间内等待异步操作完成,并根据结果返回不同的 std::future_status。
#include <iostream>
#include <future>
#include <thread>
#include <chrono>// 模拟一个耗时的异步任务
int asyncTask() {std::this_thread::sleep_for(std::chrono::seconds(3));return 42;
}int main() {// 使用 std::async 启动异步任务并获取 std::future 对象std::future<int> fut = std::async(std::launch::async, asyncTask);std::cout << "Waiting for the async task with a timeout..." << std::endl;// 等待 2 秒auto status = fut.wait_for(std::chrono::seconds(2));if (status == std::future_status::ready) {std::cout << "The async task is ready. Result: " << fut.get() << std::endl;} else if (status == std::future_status::timeout) {std::cout << "Timed out while waiting for the async task." << std::endl;} else if (status == std::future_status::deferred) {std::cout << "The async task is deferred." << std::endl;}return 0;
}
std::async(std::launch::async, asyncTask) 启动一个新线程执行 asyncTask 函数,并返回一个 std::future 对象 fut。
fut.wait_for(std::chrono::seconds(2)) 会等待 2 秒,根据等待结果返回 std::future_status 枚举值。
根据 status 的不同值进行不同的处理。
wait_until() 方法示例
wait_until() 方法用于等待异步操作直到指定的时间点,并根据结果返回不同的 std::future_status。
#include <iostream>
#include <future>
#include <thread>
#include <chrono>// 模拟一个耗时的异步任务
int asyncTask() {std::this_thread::sleep_for(std::chrono::seconds(3));return 42;
}int main() {// 使用 std::async 启动异步任务并获取 std::future 对象std::future<int> fut = std::async(std::launch::async, asyncTask);// 计算 2 秒后的时间点auto timeout_time = std::chrono::steady_clock::now() + std::chrono::seconds(2);std::cout << "Waiting for the async task until a specific time..." << std::endl;// 等待直到指定时间点auto status = fut.wait_until(timeout_time);if (status == std::future_status::ready) {std::cout << "The async task is ready. Result: " << fut.get() << std::endl;} else if (status == std::future_status::timeout) {std::cout << "Timed out while waiting for the async task." << std::endl;} else if (status == std::future_status::deferred) {std::cout << "The async task is deferred." << std::endl;}return 0;
}
std::async(std::launch::async, asyncTask) 启动一个新线程执行 asyncTask 函数,并返回一个 std::future 对象 fut。
std::chrono::steady_clock::now() + std::chrono::seconds(2) 计算当前时间 2 秒后的时间点 timeout_time。
fut.wait_until(timeout_time) 会等待直到 timeout_time,根据等待结果返回 std::future_status 枚举值。
根据 status 的不同值进行不同的处理。