【C++11】基于std::thread异步执行时的输入输出

栏目: C++ · 发布时间: 5年前

内容简介:本篇主要是记录自己在学习C++11下std::thread异步执行时的一些细节性的东西,为之后基于C++11写并发代码打基础。C++11引入了std::thread。据说之前因为需要区分对待pthread和win下的线程库,代码中有大量的预编译的if else,非常丑陋。现在的话,统一用std::thread就行了。基于std::thread最简单的异步执行代码。

本篇主要是记录自己在学习C++11下std::thread异步执行时的一些细节性的东西,为之后基于C++11写并发代码打基础。

C++11引入了std::thread。据说之前因为需要区分对待pthread和win下的线程库,代码中有大量的预编译的if else,非常丑陋。现在的话,统一用std::thread就行了。

基于std::thread最简单的异步执行代码。

#include <iostream>
#include <thread>
 
void thread_run() {
    std::cout << "thread run\n";
}
 
int main() {
    std::thread thread1{thread_run};
    thread1.join();
    return 0;
}

代码创建了一个线程thread1,thread1执行thread_run(立马执行)。main函数创建了线程之后,通过join等待thread1执行结束。

如果你写过基于pthread的代码的话,会发现pthread_t不再需要。因为按照C++的风格,肯定会封装在std::thread中。

(注意,本文不会介绍创建thread有哪些不同的方法,或者延迟执行的方法)

输入

接下来要考虑一个问题是,如何向thread_run传递参数。

在多线程程序里面,主要有

  1. 按值传递,或者讲复制
  2. 转移,所有权在新线程中
  3. 共享(可变或不可变)

第一个不用多说,基础类型,或者对复制不敏感的类型都可以这么做。第二种其实算是独享,好处是这样就不会有共享下的并发修改问题。第三种,很多并发程序潜在有这种要求:超过一个线程共享某个变量,虽然处理起来必须很小心。

具体三种写法如下

#include <iostream>
#include <thread>
#include <vector>
 
void thread1_run(int n) {
}
 
void thread2_run(std::vector<int>&& vector) {
}
 
void thread3_run(std::shared_ptr<std::string> vector_ptr) {
}
 
int main() {
    std::thread thread1{thread1_run, 1};
 
    std::vector<int> vector = {1, 2, 3};
    std::thread thread2{thread2_run, std::move(vector)};
 
    std::shared_ptr<std::string> vector_ptr{new std::string{"foo"}};
    std::thread thread3{thread3_run, vector_ptr};
 
    thread1.join();
    thread2.join();
    thread3.join();
    return 0;
}

(注意,本文不会介绍类似向线程传递引用怎么做,请自行参阅std::thread的文档)

请留意代码中传递的地方和接受的函数签名。

第三种使用shared_ptr的方式背后依赖于shared_ptr可以被复制,当然里面的内容不会被复制。

这里特别讲一下第二种转移的时候,参数实际上在到达实际执行的函数前会有大约两次或以上move的原因,顺便了解一下thread的构造函数在做什么。

写过pthread代码的人可能知道,pthread里线程执行的函数签名是这样的

void* some_function(void* arg) {
    return NULL;
}

std::thread在构造过程中,除了把函数名改成函数指针(为了符合pthread_create的函数签名)之外,还需要打包函数和参数,在内部一个模拟上述代码的统一函数中解包然后执行函数,具体来说

  • 创建一个decay_copy(forward(function),), decay_copy(forward(arg1))…的元组(tuple),这里是一次move或者copy
  • 在内部同一个执行函数中从元组中解包function和arguments,这里是第二次move
  • 然后调用用户定义的函数

有兴趣的人可以看一下libc++的实现。

输出

使用std::thread另外要注意的一个问题是如何返回值,和pthread不同,std::thread没法直接返回结果。

C++11提供了几种异步执行的手段:promise,packaged_task和async。这几种手段都支持返回future,一个可以获取线程执行结果的工具。以promise/future为例。

#include <iostream>
#include <thread>
#include <future>
 
void thread1_run(std::promise<std::string>&& promise) {
    promise.set_value(std::string{"foo"});
}
 
int main() {
    std::promise<std::string> promise;
    std::future<std::string> future = promise.get_future();
    std::thread thread1{thread1_run, std::move(promise)};
    thread1.join();
    std::string string = future.get();
    std::cout << string << std::endl;
    return 0;
}

promise作为工作线程(thread1)写入结果的工具,future作为非工作线程读取数据的工具,组合起来达到获取返回值的效果。

这里考虑一下,promise和future是怎么实现的?

在promise端写入的数据,要future端能读出来,说明promise和future共享某个数据结构,里面包含了结果。

同时,一方写入后,被阻塞的一方可以执行,很明显是一种条件变量的行为模式。

为了进一步理解,试着自己写一个模拟实现。

#include <iostream>
#include <thread>
#include <atomic>
#include <mutex>
 
template<class T>
class MyFutureInner {
    std::atomic_int count_;
    std::mutex mutex_;
    std::condition_variable condition_;
    T value_;
    bool value_set_ = false;
public:
    MyFutureInner() : count_{1} {}
 
    int use_count() const {
        return count_.load(std::memory_order_relaxed);
    }
 
    // get and increase
    int increase_reference() {
        return count_.fetch_add(1, std::memory_order_relaxed);
    }
 
    // decrease and get
    int decrease_reference() {
        return count_.fetch_add(-1, std::memory_order_acq_rel) - 1;
    }
 
    void set(T &&value) {
        std::unique_lock<std::mutex> lock{mutex_};
        value_ = std::move(value); // move assignment
        value_set_ = true;
        condition_.notify_all();
    }
 
    T move() {
        std::unique_lock<std::mutex> lock{mutex_};
        if (!value_set_) {
            condition_.wait(lock);
        }
        return std::move(value_); // move
    }
 
    T copy() {
        std::unique_lock<std::mutex> lock{mutex_};
        if (!value_set_) {
            condition_.wait(lock);
        }
        return value_; // copy
    }
};
 
template<class T>
class MyFuture {
    MyFutureInner<T> *inner_ptr_;
public:
    MyFuture() : inner_ptr_{new MyFutureInner<T>} {}
 
    // copy
    MyFuture(const MyFuture &future) : inner_ptr_{future.inner_ptr_} {
        inner_ptr_->increase_reference();
    }
 
    MyFuture &operator=(const MyFuture &) = delete;
 
    MyFuture(MyFuture &&future) {
        inner_ptr_ = future.inner_ptr_;
        future.inner_ptr_ = nullptr;
    }
 
    MyFuture &operator=(MyFuture &&) = delete;
 
    int use_count() const {
        return inner_ptr_->use_count();
    }
 
    void set(T &&value) {
        inner_ptr_->set(std::move(value));
    }
 
    T get(bool copy = false) {
        return copy ? inner_ptr_->copy() : inner_ptr_->move();
    }
 
    ~MyFuture() {
        if (inner_ptr_ != nullptr && inner_ptr_->decrease_reference() == 0) {
            delete inner_ptr_;
        }
    }
};
 
void thread2_run(MyFuture<std::string> future) {
    future.set(std::string{"foo"});
}
 
int main() {
    MyFuture<std::string> future;
    std::thread thread2{thread2_run, future};
    std::string result = future.get();
    std::cout << result << std::endl;
    thread2.join();
    return 0;
}

这里为了简化代码,合并了promise和future的功能为一个类MyFuture。

MyFuture内部持有MyFutureInner的指针,当所有持有MyFutureInner指针的MyFuture被销毁之后,MyFutureInner才会被销毁。行为上类似shared_ptr,当然这里不能直接用shared_ptr。

MyFutureInner实现了shared_ptr的核心行为,增加了set和move/copy几个针对结果的操作。同时内部通过mutex加条件变量的方式,支持阻塞和通知。

除了future之外,MyFuture其实还实现了shared_future的功能。shared_future可以通过调用future的share方法得到。shared_future内部是返回结果的克隆而不是move。在多个线程等待同一个结果时使用。为了避免复制的话,可以使用shared_ptr,具体写法如下

void thread3_run(MyFuture<std::shared_ptr<std::string>> future) {
    future.set(std::shared_ptr<std::string>{new std::string{"foo"}});
}
 
int main() {
    MyFuture<std::shared_ptr<std::string>> future;
    std::thread thread3{thread3_run, future};
    std::shared_ptr<std::string> string_ptr1 = future.get(true);
    thread3.join();
    std::shared_ptr<std::string> string_ptr2 = future.get(true);
    std::cout << *string_ptr1 << std::endl;
    std::cout << *string_ptr2 << std::endl;
    return 0;
}

如果你有机会阅读libc++的代码的话,可以发现promise/future内部用的也是类似上述代码的机制。当然这里没有处理结果是void(即只需要通知完成)的情况,有机会再考虑。

小结

std::thread是C++11多线程编程比较基础的类,个人觉得学习如何使用std::thread输入输出对之后其他多线程的编程技术会有帮助。希望我的文章对你有用。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

暗时间

暗时间

刘未鹏 / 电子工业出版社 / 2011-7 / 35.00元

2003年,刘未鹏在杂志上发表了自己的第一篇文章,并开始写博客。最初的博客较短,也较琐碎,并夹杂着一些翻译的文章。后来渐渐开始有了一些自己的心得和看法。总体上在这8年里,作者平均每个月写1篇博客或更少,但从未停止。 刘未鹏说—— 写博客这件事情给我最大的体会就是,一件事情如果你能够坚持做8年,那么不管效率和频率多低,最终总能取得一些很可观的收益。而另一个体会就是,一件事情只要你坚持得足......一起来看看 《暗时间》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

MD5 加密
MD5 加密

MD5 加密工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具