【C++11】异步执行之既有函数的包装:packaged_task类和async方法

栏目: C++ · 发布时间: 6年前

内容简介:上篇中讲到,C++11的标准库提供了promise用于在线程执行的具体方法中返回数据,接收端通过future阻塞获取。这么做的前提是你可以修改方法的参数,或者说你需要写一个包装函数。想要让既有函数异步的话,你可以使用packaged_task类或者async方法。具体分析之前,以下代码是在线程中需要执行的方法。MyString是很早之前自己用来查看copy/move次数的类,不想用的话,可以替换为std::string。

上篇中讲到,C++11的标准库提供了promise用于在线程执行的具体方法中返回数据,接收端通过future阻塞获取。这么做的前提是你可以修改方法的参数,或者说你需要写一个包装函数。想要让既有函数异步的话,你可以使用packaged_task类或者async方法。

具体分析之前,以下代码是在线程中需要执行的方法。

MyString some_function() {
    return MyString{"foo"};
}

MyString是很早之前自己用来查看copy/move次数的类,不想用的话,可以替换为std::string。

packaged_task

packaged_task是一个封装了被调用的函数的task。注意,packaged_task本身并不提供异步执行的机制,所以你仍旧需要把packaged_task放到thread中去执行。

std::packaged_task<MyString()> task{some_function};
std::future<MyString> future = task.get_future();
 
std::thread task_thread{std::move(task)};
task_thread.join();
 
MyString string = future.get();
std::cout << string << std::endl;

从设计角度来说,packaged_task是桥接了被包装的函数(这里是some_function)和future,同时又能被thread执行的一个类,所以实现上,需要能执行(重载operator()),持有被包装函数的引用或指针。

为了进一步理解,考虑实现一个单一所有权的MyTask

template<class C>
class MyTask;
 
template<class R, class ...Args>
class MyTask<R(Args...)> {
    std::function<R(Args...)> function_;
    MyFuture<R> future_;
public:
    template<class F>
    MyTask(F &&f) : function_{std::forward<F>(f)} {}
 
    // no copy
    MyTask(const MyTask &) = delete;
 
    MyTask &operator=(const MyTask &) = delete;
 
    // move is ok
    MyTask(MyTask &&task) :
            function_{std::move(task.function_)},
            future_{std::move(task.future_)} {
    }
 
    MyTask &operator=(MyTask &&) = delete;
 
    MyFuture<R> get_future() {
        return future_;
    }
 
    void operator()(Args... &&args) {
        future_.set(function_(std::forward<Args>(args)...));
    }
};

注意,如果你要实现类似packaged_task的模版类的话,你需要一个只有一个参数的模版,然后再是一个R(Args…)的模版。不这么做的话你会得到一个编译错误。从模版类角度来说,第二个模版类是第一个的具体化。

packaged_task用内部自己的方式存储了被包装函数的指针,这里使用std::function代替。

future使用前一节的MyFuture,支持复制和转移。

MyTask<MyString()> task{some_function};
MyFuture<MyString> future = task.get_future();
 
std::thread task_thread{std::move(task)};
task_thread.join();
 
MyString string = future.get();
std::cout << string << std::endl;

执行上述代码,结果和packaged_task的类似。

作为参考,packaged_task的实际代码中,保存了function和promise,整理结构和MyTask类似。

async

虽然packaged_task能够包装需要异步执行的函数,但是仍旧需要你自己操作thread。为此C++11的标准库里提供了另外一个方法级别的异步执行工具:async。

std::future<MyString> future = std::async(some_function);
MyString string = future.get();
std::cout << string << std::endl;

可以看到,代码量比packaged_task要少,而且直接返回我们需要的future。

虽然async可以用很少的代码异步执行,但是需要考虑

  1. async是不是异步执行?
  2. 什么时候执行?
  3. 是否支持线程池?
  4. 调用async之后线程会怎么样?

在阅读了async的文档和async本身代码之后的回答

  1. 是的
  2. 可以参数指定,async为调用时开始执行,deferred是在调用future的get时执行
  3. 不支持
  4. 参数为async时创建线程并detach。deferred并不创建thread,只在第一次调用future的get时在调用线程中执行。严格来说,deferred不算异步调用

理解以上几点对使用async方法很重要。

从行为上来看,async并不属于packaged_task的封装版。而且从所有权上来看,被包含的function必须被async的返回值future所持有。

实际代码其实也是这样设计的。具体来说,由于future必须允许复制,future持有一个关联状态。这个关联状态拥有类似shared_ptr的行为,比如说之前的MyFutureInner。在保持MyFutureInner行为的同时,增加一个function字段,并且启动一个线程调用MyFutureInner的set方法就可以实现async。换句话说,需要从MyFutureInner派生一个子类。以下是实现

template<class R>
class MyFutureInner {
protected:
    R value_;
    bool value_set_ = false;
    std::mutex mutex_;
    std::condition_variable condition_;
    std::atomic_int count_;
public:
    MyFutureInner() : count_{1} {}
 
    void increase_count() {
        count_.fetch_add(1, std::memory_order_relaxed);
    }
 
    int decrease_count() {
        return count_.fetch_sub(1, std::memory_order_acq_rel) - 1;
    }
 
    void set(R &&value) {
        std::unique_lock<std::mutex> lock{mutex_};
        value_ = std::move(value);
        value_set_ = true;
        condition_.notify_all();
    }
 
    virtual R get() {
        std::unique_lock<std::mutex> lock{mutex_};
        if (!value_set_) {
            condition_.wait(lock);
        }
        return std::move(value_);
    }
};
 
template<class R, class...Args>
class MyFutureInnerWithFunction : public MyFutureInner<R> {
    std::function<R(Args...)> function_;
public:
    MyFutureInnerWithFunction(std::function<R(Args...)> &&function) : MyFutureInner<R>{}, function_{std::move(function)} {}
 
    void execute() {
        this->set(function_());
    }
};
 
template<class R>
class MyFuture {
    MyFutureInner<R> *inner_ptr_;
public:
    MyFuture() : inner_ptr_{new MyFutureInner<R>{}} {}
 
    explicit MyFuture(MyFutureInner<R> *inner_ptr) : inner_ptr_{inner_ptr} {}
 
    MyFuture(const MyFuture &future) : inner_ptr_{future.inner_ptr_} {
        std::cout << "MyFuture(copy)\n";
        inner_ptr_->increase_count();
    }
 
    MyFuture &operator=(const MyFuture &) = delete;
 
    MyFuture(MyFuture &&future) {
        std::cout << "MyFuture(move)\n";
        inner_ptr_ = future.inner_ptr_;
        future.inner_ptr_ = nullptr;
    }
 
    MyFuture &operator=(MyFuture &&) = delete;
 
    R get() {
        return inner_ptr_->get();
    }
 
    ~MyFuture() {
        if (inner_ptr_ != nullptr && inner_ptr_->decrease_count() == 0) {
            delete inner_ptr_;
        }
    }
};

注意这里的MyFutureInner和前篇有所不同,成员变量改成可以被子类访问的protected,get方法也加了virtual。

MyFutureInner的子类里增加了成员function,还有一个方法execute。

接下来是async方法的实现

template<class F, class... Args>
auto my_async(F &&f, Args &&... args) -> MyFuture<decltype(f(args...))> {
    typedef decltype(f(args...)) R;
    std::function<R(Args...)> function{std::forward<F>(f), std::forward<Args>(args)...};
    typedef MyFutureInnerWithFunction<R, Args...> FIWF;
    std::unique_ptr<FIWF> inner_ptr{new FIWF{std::move(function)}};
    std::thread async_thread{&FIWF::execute, inner_ptr.get()};
    async_thread.detach();
    return MyFuture<R>{inner_ptr.release()};
}

这里方法的签名参考 stackoverflow的一个问题

方法内,用MyFutureInnerWithFunction的指针传入MyFuture。同时,方法内启动一个thread并且detach。这里如果不detach的话,thread在方法结束后会被意外销毁掉,这不是我们想要的,所以必须detach。

执行代码

MyFuture<MyString> future = my_async(some_function);
MyString string = future.get();
std::cout << string << std::endl;

结果和async是一样的。

这里考虑一个问题,假设持有MyFuture的调用线程没有调用get直接结束的话会发生什么。由于只有MyFuture持有MyFutureInner的指针,MyFutureInner会被删除。异步线程访问时MyFutureInnerWithFunction会是一个无效的内存地址。以下是再现的代码

MyString some_function() {
    std::this_thread::sleep_for(std::chrono::milliseconds{500});
    return MyString{"foo"};
}
 
void test_future() {
    MyFuture<MyString> future = my_async(some_function);
}
 
int main() {
    test_future();
    std::this_thread::sleep_for(std::chrono::milliseconds{1000});
    return 0;
}

这里根本的原因是MyFutureInnerWithFunction拥有者不只是MyFuture,还有异步线程。那样的话,就不能由MyFuture负责删除MyFutureInner,同样也不能由MyFutureInnerWithFunction里面函数负责删除,所以只能让MyFutureInner自己删除自己,也就是delete this。

修改之后的MyFutureInner和MyFutureInnerWithFunction

template<class R>
class MyFutureInner {
protected:
    std::atomic_int count_;
    R value_;
    bool value_set_ = false;
    std::mutex mutex_;
    std::condition_variable condition_;
public:
    MyFutureInner() : count_{1} {}
 
    void increase_count() {
        count_.fetch_add(1, std::memory_order_relaxed);
    }
 
    void decrease_count() {
        if (count_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
            on_zero_shared();
        }
    }
 
    virtual void on_zero_shared() {
        delete this;
    }
 
    void set(R &&value) {
        std::unique_lock<std::mutex> lock{mutex_};
        value_ = std::move(value);
        value_set_ = true;
        condition_.notify_all();
    }
 
    virtual R get() {
        std::unique_lock<std::mutex> lock{mutex_};
        if (!value_set_) {
            condition_.wait(lock);
        }
        return std::move(value_);
    }
};
 
template<class R, class...Args>
class MyFutureInnerWithFunction : public MyFutureInner<R> {
    std::function<R(Args...)> function_;
 
    typedef MyFutureInner<R> base;
public:
    explicit MyFutureInnerWithFunction(std::function<R(Args...)> &&function)
            : MyFutureInner<R>{}, function_{std::move(function)} {
       this->increase_count();
    }
 
    void execute() {
        this->set(function_());
       this->decrease_count();
    }
};

另外一个解决方案,是在MyFutureInnerWithFunction里覆盖MyFutureInner的decrease_count(或者修改后的on_zero_shared),等待函数完成才能被销毁。

template<class R, class...Args>
class MyFutureInnerWithFunction : public MyFutureInner<R> {
    std::function<R(Args...)> function_;
 
    typedef MyFutureInner<R> base;
public:
    explicit MyFutureInnerWithFunction(std::function<R(Args...)> &&function)
            : MyFutureInner<R>{}, function_{std::move(function)} {
    }
 
    void on_zero_shared() {
        wait_set();
        delete this;
    }
 
    void execute() {
        this->set(function_());
    }
 
private:
    void wait_set() {
        std::unique_lock<std::mutex> lock{base::mutex_};
        if (!base::value_set_) {
            base::condition_.wait(lock);
        }
    }
};

这也是async方法里面返回的future里的实现。也就是说,async返回的future即使你不调用get,你可以不能直接从包含有future的当前方法返回,你会被阻塞住。如果这不是你要的行为,你可以需要自己写一个类似前一种方法的MyFutureInner,也就是增减shared count。

最后,deferred async由于不涉及异步线程,实现比较简单。

template<class R, class...Args>
class MyFutureInnerWithFunctionDeferred : public MyFutureInner<R> {
    bool executed = false;
    std::function<R(Args...)> function_;
 
    typedef MyFutureInner<R> base;
public:
    explicit MyFutureInnerWithFunctionDeferred(std::function<R(Args...)> &&function)
            : MyFutureInner<R>{}, function_{std::move(function)} {
    }
 
    R get() {
        std::unique_lock<std::mutex> lock{base::mutex_};
        if (!executed) {
            lock.unlock();
            this->set(function_());
            executed = true;
        } else if (!base::value_set_) {
            base::condition_.wait(lock);
        }
        return std::move(base::value_);
    }
};

以及调用代码

template<class F, class... Args>
auto my_async_deferred(F &&f, Args &&... args) -> MyFuture<decltype(f(args...))> {
    typedef decltype(f(args...)) R;
    std::function<R(Args...)> function{std::forward<F>(f), std::forward<Args>(args)...};
    return MyFuture<R>(new MyFutureInnerWithFunctionDeferred<R, Args...>{std::move(function)});
}
 
int main() {
    MyFuture<MyString> future = my_async_deferred(some_function);
    std::cout << future.get() << std::endl;
    return 0;
}

小结

总得来说,C++11的标准库提供了好几种异步执行的方式,各有各的适用场景。比如说promise适合自己封装异步执行的函数,packaged_task用于封装既有的函数,但是线程调度要自己来做,async看起来最简单,但是你必须理解async返回的future的行为。

不过老实说,async还不是很理想,比如没有线程池,以及future的行为不可调整。但是作为理解如何构建高层次多线程处理很有帮助。

最后,希望本文对各位有帮助。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Masterminds of Programming

Masterminds of Programming

Federico Biancuzzi、Chromatic / O'Reilly Media / 2009-03-27 / USD 39.99

Description Masterminds of Programming features exclusive interviews with the creators of several historic and highly influential programming languages. Think along with Adin D. Falkoff (APL), Jame......一起来看看 《Masterminds of Programming》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具