Future使用搭配

async

启动一个异步任务,返回一个future对象

1
2
3
4
std::future<int> res = std::async(std::launch::async, []{
std::this_thread::sleep_for(std::chrono::seconds(5));
return 1;});
int i = res.get();

async启动一个线程执行异步操作,并返回一个future对象

std::launch表示异步执行

使用get获取数据,阻塞线程直到返回结果

启动策略:

  1. std::launch::deferred:延迟执行,直到调用get或wait才开始执行,意味同步执行
  2. std::launch::async:异步执行
  3. 默认情况launch::async|launch::deferred,则行为是未定义的

处理

  1. std::future::get():阻塞调用,获取对象返回结果或异常,只能调用一次后更改future对象状态
  2. std::future::wait():阻塞调用,等待对象完成,可以多次调用,

使用wait_for或wait_until来检查异步操作是否完成,返回status值

1
res.wait_for(std::chrono:seconds(0))==std::future_status::ready

析构:

当使用asnc返回的future是最后一个shared state时,在析构的时候会调用wait等待结果再析构,这样变成同步执行,并且倘若外部加锁的情况下,内部再加锁,则会造成死锁

1
2
3
4
5
6
7
lock(mt);
{
async([&](){
lock(mt);
});
}
unlock(mt);

packaged_task

封装一个可执行任务,通过future捕获任务的返回值或异常

适合用于无返回值的异步操作,比如线程池等

1
2
3
4
5
6
7
8
std::packaged_task<int()> task([]{
std::this_thread::sleep_for(std::chrono::seconds(5));
return 1;
});
std::future<int> res = task.get_future();
std::thread t(std::move(task));
t.detach();
int v = res.get();

要注意packaged_task无拷贝构造和赋值运算,所以要加std::move

packaged_task是一个模板类,还可以包装可调用对象

promise

std::promise和std::future配对使用,允许在某个地方设置值或异常,利用future获取它

1
2
3
4
5
6
7
std::promise<int> prom;
std::future<int> fur=prom.get_future();
std::thread t([](std::promise<int> prom){
prom.set_value(10);
}, std::move(prom));
t.detach();
fur.get();

这里的get依然是阻塞直到已经set_value成功,除此之外还有set_exception(),参数是一个std::exception_ptr,调用std::current_exception()获取

注意:当promise被释放再get报错eeror_value

共享future

当多个线程获取同一个异步任务时使用,比如上面例子修改

1
2
3
4
5
6
std::shared_future<int> sfur=prom.get_future();
...
std::thread t1([&]{sfur.get();});
std::thread t2([&]{sfur.get();});
t1.join();
j2.join();

异常处理

future会重新抛出异常给用户处理

1
2
3
4
5
try {
res.get();
} catch (const std::exception &e){
...e.what();
}

c++11线程池实现

利用future和packaged_task构建线程池,获取异步任务的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class ThreadPool {
public:
ThreadPool(int num = thread::hardware_concurrency()) : stop_(false) {
thread_num = max(1, num);
start();
}
using Task = packaged_task<void()>;
ThreadPool(ThreadPool &) = delete;
ThreadPool &operator=(ThreadPool &) = delete;
~ThreadPool() { stop(); }

template <class F, class... Args>
auto submit(F &&f, Args &&...args) -> future<decltype(f(args...))> {
using RType = decltype(f(args...));
if (stop_) return future<RType>();
auto task = make_shared<packaged_task<RType()>>(
bind(forward<F>(f), forward<Args>(args)...));

future<RType> res = task->get_future();
{
lock_guard<mutex> lock(mtx);
tasks.emplace([task]() { (*task)(); });
}
cv.notify_one();
return res;
}

int idleThreadCount() { return thread_num; }

private:
void start() {
for (int i = 0; i < thread_num; i++) {
threads.emplace_back([this]() {
while (!this->stop_) {
Task task;
{
unique_lock<mutex> lock(mtx);
cv.wait(lock,
[this]() { return !this->tasks.empty() || this->stop_; });
if (this->stop_) return;
task = move(tasks.front());
tasks.pop();
}
task();
}
});
}
}

void stop() {
stop_ = true;
cv.notify_all();
for (auto &thread : threads) {
if (thread.joinable()) thread.join();
}
}

vector<thread> threads;
queue<Task> tasks;
mutex mtx;
condition_variable cv;
bool stop_;
atomic_int thread_num;
};