windows C++-实现 Future(一)
为了实现未来(future,此处是指尚未发生的,不确定的计算结果),本文定义 async_future 类。 async_future 类使用并发运行时的这些组件:concurrency::task_group 类和 concurrency::single_assignment 类。 async_future 类使用 task_group 类异步计算值,使用 single_assignment 类存储计算结果。 async_future 类的构造函数采用计算结果的工作函数,并采用 get 方法检索结果。
实现 async_future 类
1. 声明一个名为 async_future 的模板类,该类在生成计算的类型上进行了参数化。 将 public 和 private 节添加到此类。
template <typename T>
class async_future
{
public:
private:
};
2. 在 async_future 类的 private 部分,声明一个 task_group 和一个 single_assignment 数据成员。
// Executes the asynchronous work function.
task_group _tasks;// Stores the result of the asynchronous work function.
single_assignment<T> _value;
3. 在 async_future 类的 public 节中,实现构造函数。 构造函数是在计算结果的工作函数上进行参数化的模板。 构造函数在 task_group 数据成员中异步执行工作函数,并使用 concurrency::send 函数将结果写入 single_assignment 数据成员。
template <class Functor>
explicit async_future(Functor&& fn)
{// Execute the work function in a task group and send the result// to the single_assignment object._tasks.run([fn, this]() {send(_value, fn());});
}
4. 在 async_future 类的 public 部分中,实现析构函数。 析构函数等待任务完成。
~async_future()
{// Wait for the task to finish._tasks.wait();
}
5. 在 async_future 类的 public 节中,实现 get 方法。 此方法使用 concurrency::receive 函数来检索工作函数的结果。
// Retrieves the result of the work function.
// This method blocks if the async_future object is still
// computing the value.
T get()
{ return receive(_value);
}
完整代码:
以下示例显示了完整的 async_future 类及其用法示例。 wmain 函数创建一个包含 10,000 个随机整数值的 std::vector 对象。 然后它使用 async_future 对象来查找包含在 vector 对象中的最小值和最大值。
// futures.cpp
// compile with: /EHsc
#include <ppl.h>
#include <agents.h>
#include <vector>
#include <algorithm>
#include <iostream>
#include <numeric>
#include <random>using namespace concurrency;
using namespace std;template <typename T>
class async_future
{
public:template <class Functor>explicit async_future(Functor&& fn){// Execute the work function in a task group and send the result// to the single_assignment object._tasks.run([fn, this]() {send(_value, fn());});}~async_future(){// Wait for the task to finish._tasks.wait();}// Retrieves the result of the work function.// This method blocks if the async_future object is still // computing the value.T get(){ return receive(_value); }private:// Executes the asynchronous work function.task_group _tasks;// Stores the result of the asynchronous work function.single_assignment<T> _value;
};int wmain()
{// Create a vector of 10000 integers, where each element // is between 0 and 9999.mt19937 gen(2);vector<int> values(10000); generate(begin(values), end(values), [&gen]{ return gen()%10000; });// Create a async_future object that finds the smallest value in the// vector.async_future<int> min_value([&]() -> int { int smallest = INT_MAX;for_each(begin(values), end(values), [&](int value) {if (value < smallest){smallest = value;}});return smallest;});// Create a async_future object that finds the largest value in the// vector.async_future<int> max_value([&]() -> int { int largest = INT_MIN;for_each(begin(values), end(values), [&](int value) {if (value > largest){largest = value;} });return largest;});// Calculate the average value of the vector while the async_future objects// work in the background.int sum = accumulate(begin(values), end(values), 0);int average = sum / values.size();// Print the smallest, largest, and average values.wcout << L"smallest: " << min_value.get() << endl<< L"largest: " << max_value.get() << endl<< L"average: " << average << endl;
}
该示例产生下面的输出:
smallest: 0
largest: 9999
average: 4981
可靠编程
若要扩展 async_future 类以处理工作函数引发的异常,请修改 async_future::get 方法以便调用 concurrency::task_group::wait 方法。 task_group::wait 方法会引发工作函数生成的任何异常。
以下示例显示了 async_future 类的修改版本。 wmain 函数使用 try-catch 块来打印 async_future 对象的结果或打印由工作函数生成的异常的值。
// futures-with-eh.cpp
// compile with: /EHsc
#include <ppl.h>
#include <agents.h>
#include <vector>
#include <algorithm>
#include <iostream>using namespace concurrency;
using namespace std;template <typename T>
class async_future
{
public:template <class Functor>explicit async_future(Functor&& fn){// Execute the work function in a task group and send the result// to the single_assignment object._tasks.run([fn, this]() {send(_value, fn());});}~async_future(){// Wait for the task to finish._tasks.wait();}// Retrieves the result of the work function.// This method blocks if the async_future object is still// computing the value.T get(){ // Wait for the task to finish.// The wait method throws any exceptions that were generated// by the work function._tasks.wait();// Return the result of the computation.return receive(_value);}private:// Executes the asynchronous work function.task_group _tasks;// Stores the result of the asynchronous work function.single_assignment<T> _value;
};int wmain()
{// For illustration, create a async_future with a work // function that throws an exception.async_future<int> f([]() -> int { throw exception("error");});// Try to read from the async_future object. try{int value = f.get();wcout << L"f contains value: " << value << endl;}catch (const exception& e){wcout << L"caught exception: " << e.what() << endl;}
}
该示例产生下面的输出:
caught exception: error