微软C++并行库 pplx 的基本用法

栏目: C++ · 发布时间: 5年前

内容简介:版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/kesalin/article/details/86713720

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/kesalin/article/details/86713720

并行计算库充分利用多核的优势,通过并行运算提高程序效率,业界有两个知名的 c++ 并行库,一个是 intel 开发的 TBB ,一个是微软开发的 PPL

TBB(Intel® Threading Building Blocks )

TBBintel 用标准 c++ 写的一个开源的并行计算库。它的目的是提升数据并行计算的能力,可以在其 官网 下载最新的库和文档。 TBB 主要功能:并行算法、任务调度、并行容器、同步原语、内存分配器。

PPL(Parallel Patterns Library)

PPL 是微软开发的并行计算库,它的功能和 TBB 是差不多的,主要是在 windows 上使用。二者在并行算法的使用上基本上是一样的, 但还是有些差异: TBBtask 没有 PPLtask 强大, PPLtask 可以链式连续执行还可以组合任务, TBBtask 则不行。

PPL C++ 库与 C# 并行库 TaskParallelLibrary 的设计理念、基本框架以及接口使用上非常类似,熟悉 C# 并行库的朋友上手 C++ 版的 PPL 非常容易。下面我将介绍微软跨平台 PPL 的一个简易实现 pplx ,该库是附在微软的开源项目 cpprestsdk 中的。

pplx 并行库

C++ REST SDK 是 Microsoft 的一个开源跨平台项目, 其使用大量现代异步 C++ API 实现了一个基于 HTTP / HTTPS 协议的 B/S 组件,使用该组件,可以方便地进行高性能 RESTfulHTTP / HTTPS 服务器、客户端开发,且可以在 WindowsLinuxOSXiOSAndroid 各平台下使用。

当然今天我要介绍的主角是该项目中的并行库 PPLX 。下面先介绍如何编译安装 cpprestsdk ,然后介绍如何使用并行库 PPLX 。以下都是在 Ubuntu 系统上进行。

编译安装

有两种方式可以安装 cpprestsdk ,一种是直接用 apt-get 安装,另一种是从源码安装。

通过 apt-get 安装

sudo apt-get install libcpprest-dev

从 source 编译安装

ref: How to build for Linux

1, 系统要求: Ubuntu 16.04 及之后的版本

2, 安装必要的工具:boost库,ninja 用于编译,

sudo apt-get install g++ git libwebsocketpp-dev openssl libssl-dev ninja-build

sudo apt-get install libboost-atomic-dev libboost-thread-dev libboost-system-dev libboost-date-time-dev libboost-regex-dev libboost-filesystem-dev libboost-random-dev libboost-chrono-dev libboost-serialization-dev

3, 下载代码

git clone https://github.com/Microsoft/cpprestsdk.git casablanca

4, 编译:

cd casablanca
mkdir build.release
cd build.release
cmake -G Ninja .. -DCMAKE_BUILD_TYPE=Release
ninja

如果想编译成 debug 版本,把上面代码中的 release/Release 修改为 debug/Debug 即可。

5, 编译完成之后,跑一下 test_runner 测试验证一下:

cd Release/Binaries
./test_runner *_test.so

或者运行 bing 搜索示例:

cd Release/Binaries
./BingRequest kesalin kesalin.html

6, 安装:

sudo ninja install
sudo ldconfig

7, 编译单个文件的参数:

g++ -std=c++11 my_file.cpp -o my_file -lboost_system -lcrypto -lssl -lcpprest
./my_file

使用 pplx 并行库

创建并运行任务

可以通过多种途径创建任务:

//构造函数
auto task = pplx::task<int>([](){
    return 10;
});
 
//lambda
auto task = []()->pplx::task<int>{
    return pplx::task_from_result(10);
};
 
//create_task
auto task = pplx::create_task([](){
    return 10;
});
 
//create_task 创建延迟任务
pplx::task_completion_event<int> tce;// task_completion_event 需按值传递
auto task = pplx::create_task(tce);

也可以创建任务链:

pplx::task<std::string> create_print_task(const std::string& init_value)
{
    return pplx::create_task([init_value]() {
        std::cout <<"Current value:" << init_value << std::endl;
        return std::string("Value 2");
    })

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 3");
    })

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 4");
    });
}

使用 task.get() 或者 task.wait() 执行任务:

  • 阻塞方式 get() : 阻塞直到任务执行完成,并返回任务结果,当任务取消时,抛出 task_canceled 异常,发生其它异常也会被抛出;
  • 非阻塞方式 wait() :等待任务到达终止状态,然后返回任务状态: completedcanceled ,如果发生异常会被抛出。
void test_task_chain()
{
    auto task_chain = create_print_task("Value 1");
    task_chain.then([](std::string value) {
        std::cout << "Result value: " << value << std::endl;
        return value;
    })

    // process exception
    .then([](pplx::task<std::string> previousTask) {
        try {
            previousTask.get();
        }
        catch (const std::exception& e) {
            std::cout << "exception: " << e.what() << std::endl;
        }
    })

    .wait();
}

可以创建和执行一组任务,根据需要来选择是全部执行再返回,还是执行任一任务就返回。

  • when_all :返回组任务,只有当所有任务都完成时组任务才会返回成功;如果任一任务被取消或者抛出异常,则组任务会完成并处理取消状态,在组任务 get() 或者 wait() 时抛出异常。如果任务类型为 task<T> ,则组任务类型为 task<vector<T>>
  • when_any :返回组任务,当任一任务完成时组任务就会返回成功;如果所有任务都被取消或者抛出异常,则组任务会完成并处理取消状态,并且如果任何任务发生异常,在组任务 get 或者 wait 时抛出异常。如果任务类型为 task<T> ,则组任务类型为 task<T, size_t>size_t 返回完成任务的索引。
void test_group_tasks()
{
    auto sleep_print = [](int seconds, const std::string& info) {
        if (seconds > 0) {
            sleep(seconds);
        }

        std::cout << info << std::endl;
    };

    auto/*std::array<pplx::task<int>, 3>*/ tasks = {
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 1"); return 1; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 2"); return 2; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(4, "Task 3"); return 3; })
    };

    {
        std::cout << "=== when_all ===" << std::endl;

        auto joinTask = pplx::when_all(std::begin(tasks), std::end(tasks));
        auto result = joinTask.wait();
        std::cout << "All joined thread. result: " << result << std::endl;
    }

    {
        std::cout << "=== when_any ===" << std::endl;

        auto joinTask = pplx::when_any(std::begin(tasks), std::end(tasks))
        .then([](std::pair<int, size_t> result) {
            std::cout << "First task to finish returns "
                  << result.first
                  << " and has index "
                  << result.second << std::endl;
        });

        auto result = joinTask.wait();
        std::cout << "Any joined thread. result: " << result << std::endl;
    }
}

取消任务

cancellation_token_source 通过封装一个 cancellation_token 指针来提供取消操作,通过 cancellation_token.is_canceled() 在执行任务的过程中判断任务是否要被取消。

示例中的任务会循环执行,直到显式取消任务:

void test_cancellation()
{
    pplx::cancellation_token_source cts;
    std::cout << "Creating task..." << std::endl;

    auto task = pplx::create_task([cts]{
        bool moreToDo = true;
        while (moreToDo) {
           if (cts.get_token().is_canceled()) {
               return;
           }
           else {
               moreToDo = []()->bool {
                   std::cout << "Performing work at " << now() << std::endl;
                   sleep(1);
                   return true;
               }();
           }
        }
    });

    sleep(3);

    std::cout << "Canceling task... " << now() << std::endl;
    cts.cancel();

    std::cout << "Waiting for task to complete... " << now() << std::endl;
    task.wait();

    std::cout << "Done. " << now() << std::endl;
}

当要在异步任务链中支持取消时,需要将 cancellation_token 作为构造 task 的参数传递,然后结合 task.wait() 判断是否要取消:

void test_cancellation_async()
{
    pplx::cancellation_token_source cts;
    auto task = pplx::task<void>([cts]() {
        std::cout << "Cancel continue_task" << std::endl;
        cts.cancel();
    })

    .then([]() {
        std::cout << "This will not run" << std::endl;
    }, cts.get_token());

    try {
        if (task.wait() == pplx::task_status::canceled) {
            std::cout<<"Taks has been canceled"<<std::endl;
        }
        else {
            task.get();
        }
    }
    catch (const std::exception& e) {
        std::cout << "exception: " << e.what() << std::endl;
    }
}

处理异常

之前说过如果任务发生异常,会在get或者wait时抛出,但是如果希望在异步任务链中判定之前执行是否发生异常做出操作时,可以采用另外的方式。

当使用task.then时一般是这样写的:

task<T>.then([](T t){
     //处理任务结果t
})

这时候进入then时之前的任务已经执行完成了,task.then有另外一种写法,能够在then时并没有执行任务:

task<T>.then([](task<T> task){
       try 
       {
              task.get(); //使用get或者wait执行任务
       }
       catch(...)
       {
           //处理异常
       }
})

示例:

void test_task_exception()
{
    auto task_chain = create_print_task("Value 1");
    task_chain.then([](std::string value) {
        // uncomment this line to throw an exception.
        throw std::runtime_error("An exception happened!");
        std::cout << "Result value: " << value << std::endl;
        return value;
    })

    // process exception
    .then([](pplx::task<std::string> previousTask) {
        try {
            previousTask.get();
        }
        catch (const std::exception& e) {
            std::cout << "exception: " << e.what() << std::endl;
        }
    })

    .wait();
}

本文完整代码

// g++ -std=c++11 pplxdemo.cpp -o pplxdemo -lboost_system -lcrypto -lssl -lcpprest

#include <pplx/pplxtasks.h>
#include <iostream>
#include <sstream>
#include <vector>
#include <functional>
#include <iomanip>
#include <ctime>
#include <thread>
#include <chrono>
#include <stdexcept>

std::string now()
{
    auto t = std::time(nullptr);
    auto tm = *std::localtime(&t);

    std::ostringstream oss;
    oss << std::put_time(&tm, "%Y-%m-%d %H-%M-%S");
    auto str = oss.str();
    return str;
}

void sleep(int seconds)
{
    std::this_thread::sleep_for(std::chrono::seconds(seconds));
}

pplx::task<std::string> create_print_task(const std::string& init_value)
{
    return pplx::create_task([init_value]() {
        std::cout <<"Current value:" << init_value << std::endl;
        return std::string("Value 2");
    })

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 3");
    })

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 4");
    });
}

void test_task_chain()
{
    auto task_chain = create_print_task("Value 1");
    task_chain.then([](std::string value) {
        // uncomment this line to throw an exception.
        // throw std::runtime_error("An exception happened!");
        std::cout << "Result value: " << value << std::endl;
        return value;
    })

    // process exception
    .then([](pplx::task<std::string> previousTask) {
        try {
            previousTask.get();
        }
        catch (const std::exception& e) {
            std::cout << "exception: " << e.what() << std::endl;
        }
    })

    .wait();
}

void test_group_tasks()
{
    auto sleep_print = [](int seconds, const std::string& info) {
        if (seconds > 0) {
            sleep(seconds);
        }

        std::cout << info << std::endl;
    };

    auto/*std::array<pplx::task<int>, 3>*/ tasks = {
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 1"); return 1; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 2"); return 2; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(4, "Task 3"); return 3; })
    };

    {
        std::cout << "=== when_all ===" << std::endl;

        auto joinTask = pplx::when_all(std::begin(tasks), std::end(tasks));
        auto result = joinTask.wait();
        std::cout << "All joined thread. result: " << result << std::endl;
    }

    {
        std::cout << "=== when_any ===" << std::endl;

        auto joinTask = pplx::when_any(std::begin(tasks), std::end(tasks))
        .then([](std::pair<int, size_t> result) {
            std::cout << "First task to finish returns "
                  << result.first
                  << " and has index "
                  << result.second << std::endl;
        });

        auto result = joinTask.wait();
        std::cout << "Any joined thread. result: " << result << std::endl;
    }
}

void test_cancellation()
{
    pplx::cancellation_token_source cts;
    std::cout << "Creating task..." << std::endl;

    auto task = pplx::create_task([cts]{
        bool moreToDo = true;
        while (moreToDo) {
           if (cts.get_token().is_canceled()) {
               return;
           }
           else {
               moreToDo = []()->bool {
                   std::cout << "Performing work at " << now() << std::endl;
                   sleep(1);
                   return true;
               }();
           }
        }
    });

    sleep(3);

    std::cout << "Canceling task... " << now() << std::endl;
    cts.cancel();

    std::cout << "Waiting for task to complete... " << now() << std::endl;
    task.wait();

    std::cout << "Done. " << now() << std::endl;
}

void test_cancellation_async()
{
    pplx::cancellation_token_source cts;
    auto task = pplx::task<void>([cts]() {
        std::cout << "Cancel continue_task" << std::endl;
        cts.cancel();
    })

    .then([]() {
        std::cout << "This will not run" << std::endl;
    }, cts.get_token());

    try {
        if (task.wait() == pplx::task_status::canceled) {
            std::cout<<"Taks has been canceled"<<std::endl;
        }
        else {
            task.get();
        }
    }
    catch (const std::exception& e) {
        std::cout << "exception: " << e.what() << std::endl;
    }
}

int main(int argc, char* args[])
{
    std::cout << "==== pplx demo ====" << std::endl;
    test_task_chain();
    test_group_tasks();
    test_cancellation();
    test_cancellation_async();
    return 0;
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

永无止境

永无止境

[美] 道格拉斯•艾德华兹 / 刘纯毅 / 中信出版社 / 2012-12-15 / 59.00元

★ 值得中国初创公司反复思考的企业传记 ★ 互联网行业必读书 ★ Google高管揭开Google的神秘面纱 ★ 探寻“G力量”重塑人类知识景观的心路历程 ★ Google走过的路,Google未来的路 ★ 编辑推荐: 它是目前被公认为全球最大的搜索引擎!它是互联网上五大最受欢迎的网站之一! 它在操作界面中提供多达30余种语言选择,在全球范围内拥有无数用户......一起来看看 《永无止境》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

SHA 加密
SHA 加密

SHA 加密工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具