Future使用搭配
async
启动一个异步任务,返回一个future对象
1 2 3 4
| std::future<int> res = std::async(std::launch::async, []{ std::this_thread::sleep_for(std::chrono::seconds(5)); return 1;}); int i = res.get();
|
async启动一个线程执行异步操作,并返回一个future对象
std::launch表示异步执行
使用get获取数据,阻塞线程直到返回结果
启动策略:
std::launch::deferred:
延迟执行,直到调用get或wait才开始执行,意味同步执行
std::launch::async:
异步执行
- 默认情况
launch::async|launch::deferred
,则行为是未定义的
处理
- std::future::get():阻塞调用,获取对象返回结果或异常,只能调用一次后更改future对象状态
- std::future::wait():阻塞调用,等待对象完成,可以多次调用,
使用wait_for或wait_until来检查异步操作是否完成,返回status值
1
| res.wait_for(std::chrono:seconds(0))==std::future_status::ready
|
析构:
当使用asnc返回的future是最后一个shared state时,在析构的时候会调用wait等待结果再析构,这样变成同步执行,并且倘若外部加锁的情况下,内部再加锁,则会造成死锁
1 2 3 4 5 6 7
| lock(mt); { async([&](){ lock(mt); }); } unlock(mt);
|
packaged_task
封装一个可执行任务,通过future捕获任务的返回值或异常
适合用于无返回值的异步操作,比如线程池等
1 2 3 4 5 6 7 8
| std::packaged_task<int()> task([]{ std::this_thread::sleep_for(std::chrono::seconds(5)); return 1; }); std::future<int> res = task.get_future(); std::thread t(std::move(task)); t.detach(); int v = res.get();
|
要注意packaged_task无拷贝构造和赋值运算,所以要加std::move
packaged_task是一个模板类,还可以包装可调用对象
promise
std::promise和std::future配对使用,允许在某个地方设置值或异常,利用future获取它
1 2 3 4 5 6 7
| std::promise<int> prom; std::future<int> fur=prom.get_future(); std::thread t([](std::promise<int> prom){ prom.set_value(10); }, std::move(prom)); t.detach(); fur.get();
|
这里的get依然是阻塞直到已经set_value成功,除此之外还有set_exception()
,参数是一个std::exception_ptr,调用std::current_exception()
获取
注意:当promise被释放再get报错eeror_value
共享future
当多个线程获取同一个异步任务时使用,比如上面例子修改
1 2 3 4 5 6
| std::shared_future<int> sfur=prom.get_future(); ... std::thread t1([&]{sfur.get();}); std::thread t2([&]{sfur.get();}); t1.join(); j2.join();
|
异常处理
future会重新抛出异常给用户处理
1 2 3 4 5
| try { res.get(); } catch (const std::exception &e){ ...e.what(); }
|
c++11线程池实现
利用future和packaged_task构建线程池,获取异步任务的结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| class ThreadPool { public: ThreadPool(int num = thread::hardware_concurrency()) : stop_(false) { thread_num = max(1, num); start(); } using Task = packaged_task<void()>; ThreadPool(ThreadPool &) = delete; ThreadPool &operator=(ThreadPool &) = delete; ~ThreadPool() { stop(); }
template <class F, class... Args> auto submit(F &&f, Args &&...args) -> future<decltype(f(args...))> { using RType = decltype(f(args...)); if (stop_) return future<RType>(); auto task = make_shared<packaged_task<RType()>>( bind(forward<F>(f), forward<Args>(args)...));
future<RType> res = task->get_future(); { lock_guard<mutex> lock(mtx); tasks.emplace([task]() { (*task)(); }); } cv.notify_one(); return res; }
int idleThreadCount() { return thread_num; }
private: void start() { for (int i = 0; i < thread_num; i++) { threads.emplace_back([this]() { while (!this->stop_) { Task task; { unique_lock<mutex> lock(mtx); cv.wait(lock, [this]() { return !this->tasks.empty() || this->stop_; }); if (this->stop_) return; task = move(tasks.front()); tasks.pop(); } task(); } }); } }
void stop() { stop_ = true; cv.notify_all(); for (auto &thread : threads) { if (thread.joinable()) thread.join(); } }
vector<thread> threads; queue<Task> tasks; mutex mtx; condition_variable cv; bool stop_; atomic_int thread_num; };
|