内容简介:版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/kesalin/article/details/86713720
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/kesalin/article/details/86713720
并行计算库充分利用多核的优势,通过并行运算提高程序效率,业界有两个知名的 c++
并行库,一个是 intel
开发的 TBB
,一个是微软开发的 PPL
。
TBB(Intel® Threading Building Blocks )
TBB
是 intel
用标准 c++
写的一个开源的并行计算库。它的目的是提升数据并行计算的能力,可以在其 官网
下载最新的库和文档。 TBB
主要功能:并行算法、任务调度、并行容器、同步原语、内存分配器。
PPL(Parallel Patterns Library)
PPL
是微软开发的并行计算库,它的功能和 TBB
是差不多的,主要是在 windows
上使用。二者在并行算法的使用上基本上是一样的, 但还是有些差异: TBB
的 task
没有 PPL
的 task
强大, PPL
的 task
可以链式连续执行还可以组合任务, TBB
的 task
则不行。
PPL C++
库与 C#
并行库 TaskParallelLibrary
的设计理念、基本框架以及接口使用上非常类似,熟悉 C#
并行库的朋友上手 C++
版的 PPL
非常容易。下面我将介绍微软跨平台 PPL
的一个简易实现 pplx
,该库是附在微软的开源项目 cpprestsdk
中的。
pplx 并行库
C++ REST SDK
是 Microsoft 的一个开源跨平台项目, 其使用大量现代异步 C++ API
实现了一个基于 HTTP / HTTPS
协议的 B/S
组件,使用该组件,可以方便地进行高性能 RESTful
、 HTTP / HTTPS
服务器、客户端开发,且可以在 Windows
、 Linux
、 OSX
、 iOS
、 Android
各平台下使用。
当然今天我要介绍的主角是该项目中的并行库 PPLX
。下面先介绍如何编译安装 cpprestsdk
,然后介绍如何使用并行库 PPLX
。以下都是在 Ubuntu
系统上进行。
编译安装
有两种方式可以安装 cpprestsdk
,一种是直接用 apt-get
安装,另一种是从源码安装。
通过 apt-get 安装
sudo apt-get install libcpprest-dev
从 source 编译安装
1, 系统要求: Ubuntu 16.04 及之后的版本
2, 安装必要的工具:boost库,ninja 用于编译,
sudo apt-get install g++ git libwebsocketpp-dev openssl libssl-dev ninja-build sudo apt-get install libboost-atomic-dev libboost-thread-dev libboost-system-dev libboost-date-time-dev libboost-regex-dev libboost-filesystem-dev libboost-random-dev libboost-chrono-dev libboost-serialization-dev
3, 下载代码
git clone https://github.com/Microsoft/cpprestsdk.git casablanca
4, 编译:
cd casablanca mkdir build.release cd build.release cmake -G Ninja .. -DCMAKE_BUILD_TYPE=Release ninja
如果想编译成 debug 版本,把上面代码中的 release/Release 修改为 debug/Debug 即可。
5, 编译完成之后,跑一下 test_runner 测试验证一下:
cd Release/Binaries ./test_runner *_test.so
或者运行 bing 搜索示例:
cd Release/Binaries ./BingRequest kesalin kesalin.html
6, 安装:
sudo ninja install sudo ldconfig
7, 编译单个文件的参数:
g++ -std=c++11 my_file.cpp -o my_file -lboost_system -lcrypto -lssl -lcpprest ./my_file
使用 pplx 并行库
创建并运行任务
可以通过多种途径创建任务:
//构造函数 auto task = pplx::task<int>([](){ return 10; }); //lambda auto task = []()->pplx::task<int>{ return pplx::task_from_result(10); }; //create_task auto task = pplx::create_task([](){ return 10; }); //create_task 创建延迟任务 pplx::task_completion_event<int> tce;// task_completion_event 需按值传递 auto task = pplx::create_task(tce);
也可以创建任务链:
pplx::task<std::string> create_print_task(const std::string& init_value) { return pplx::create_task([init_value]() { std::cout <<"Current value:" << init_value << std::endl; return std::string("Value 2"); }) .then([](std::string value) { std::cout << "Current value:" << value << std::endl; return std::string("Value 3"); }) .then([](std::string value) { std::cout << "Current value:" << value << std::endl; return std::string("Value 4"); }); }
使用 task.get()
或者 task.wait()
执行任务:
-
阻塞方式
get()
: 阻塞直到任务执行完成,并返回任务结果,当任务取消时,抛出task_canceled
异常,发生其它异常也会被抛出; -
非阻塞方式
wait()
:等待任务到达终止状态,然后返回任务状态:completed
、canceled
,如果发生异常会被抛出。
void test_task_chain() { auto task_chain = create_print_task("Value 1"); task_chain.then([](std::string value) { std::cout << "Result value: " << value << std::endl; return value; }) // process exception .then([](pplx::task<std::string> previousTask) { try { previousTask.get(); } catch (const std::exception& e) { std::cout << "exception: " << e.what() << std::endl; } }) .wait(); }
可以创建和执行一组任务,根据需要来选择是全部执行再返回,还是执行任一任务就返回。
-
when_all
:返回组任务,只有当所有任务都完成时组任务才会返回成功;如果任一任务被取消或者抛出异常,则组任务会完成并处理取消状态,在组任务get()
或者wait()
时抛出异常。如果任务类型为task<T>
,则组任务类型为task<vector<T>>
。 -
when_any
:返回组任务,当任一任务完成时组任务就会返回成功;如果所有任务都被取消或者抛出异常,则组任务会完成并处理取消状态,并且如果任何任务发生异常,在组任务get
或者wait
时抛出异常。如果任务类型为task<T>
,则组任务类型为task<T, size_t>
,size_t
返回完成任务的索引。
void test_group_tasks() { auto sleep_print = [](int seconds, const std::string& info) { if (seconds > 0) { sleep(seconds); } std::cout << info << std::endl; }; auto/*std::array<pplx::task<int>, 3>*/ tasks = { pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 1"); return 1; }), pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 2"); return 2; }), pplx::create_task([sleep_print]() -> int { sleep_print(4, "Task 3"); return 3; }) }; { std::cout << "=== when_all ===" << std::endl; auto joinTask = pplx::when_all(std::begin(tasks), std::end(tasks)); auto result = joinTask.wait(); std::cout << "All joined thread. result: " << result << std::endl; } { std::cout << "=== when_any ===" << std::endl; auto joinTask = pplx::when_any(std::begin(tasks), std::end(tasks)) .then([](std::pair<int, size_t> result) { std::cout << "First task to finish returns " << result.first << " and has index " << result.second << std::endl; }); auto result = joinTask.wait(); std::cout << "Any joined thread. result: " << result << std::endl; } }
取消任务
cancellation_token_source
通过封装一个 cancellation_token
指针来提供取消操作,通过 cancellation_token.is_canceled()
在执行任务的过程中判断任务是否要被取消。
示例中的任务会循环执行,直到显式取消任务:
void test_cancellation() { pplx::cancellation_token_source cts; std::cout << "Creating task..." << std::endl; auto task = pplx::create_task([cts]{ bool moreToDo = true; while (moreToDo) { if (cts.get_token().is_canceled()) { return; } else { moreToDo = []()->bool { std::cout << "Performing work at " << now() << std::endl; sleep(1); return true; }(); } } }); sleep(3); std::cout << "Canceling task... " << now() << std::endl; cts.cancel(); std::cout << "Waiting for task to complete... " << now() << std::endl; task.wait(); std::cout << "Done. " << now() << std::endl; }
当要在异步任务链中支持取消时,需要将 cancellation_token
作为构造 task
的参数传递,然后结合 task.wait()
判断是否要取消:
void test_cancellation_async() { pplx::cancellation_token_source cts; auto task = pplx::task<void>([cts]() { std::cout << "Cancel continue_task" << std::endl; cts.cancel(); }) .then([]() { std::cout << "This will not run" << std::endl; }, cts.get_token()); try { if (task.wait() == pplx::task_status::canceled) { std::cout<<"Taks has been canceled"<<std::endl; } else { task.get(); } } catch (const std::exception& e) { std::cout << "exception: " << e.what() << std::endl; } }
处理异常
之前说过如果任务发生异常,会在get或者wait时抛出,但是如果希望在异步任务链中判定之前执行是否发生异常做出操作时,可以采用另外的方式。
当使用task.then时一般是这样写的:
task<T>.then([](T t){ //处理任务结果t })
这时候进入then时之前的任务已经执行完成了,task.then有另外一种写法,能够在then时并没有执行任务:
task<T>.then([](task<T> task){ try { task.get(); //使用get或者wait执行任务 } catch(...) { //处理异常 } })
示例:
void test_task_exception() { auto task_chain = create_print_task("Value 1"); task_chain.then([](std::string value) { // uncomment this line to throw an exception. throw std::runtime_error("An exception happened!"); std::cout << "Result value: " << value << std::endl; return value; }) // process exception .then([](pplx::task<std::string> previousTask) { try { previousTask.get(); } catch (const std::exception& e) { std::cout << "exception: " << e.what() << std::endl; } }) .wait(); }
本文完整代码
// g++ -std=c++11 pplxdemo.cpp -o pplxdemo -lboost_system -lcrypto -lssl -lcpprest #include <pplx/pplxtasks.h> #include <iostream> #include <sstream> #include <vector> #include <functional> #include <iomanip> #include <ctime> #include <thread> #include <chrono> #include <stdexcept> std::string now() { auto t = std::time(nullptr); auto tm = *std::localtime(&t); std::ostringstream oss; oss << std::put_time(&tm, "%Y-%m-%d %H-%M-%S"); auto str = oss.str(); return str; } void sleep(int seconds) { std::this_thread::sleep_for(std::chrono::seconds(seconds)); } pplx::task<std::string> create_print_task(const std::string& init_value) { return pplx::create_task([init_value]() { std::cout <<"Current value:" << init_value << std::endl; return std::string("Value 2"); }) .then([](std::string value) { std::cout << "Current value:" << value << std::endl; return std::string("Value 3"); }) .then([](std::string value) { std::cout << "Current value:" << value << std::endl; return std::string("Value 4"); }); } void test_task_chain() { auto task_chain = create_print_task("Value 1"); task_chain.then([](std::string value) { // uncomment this line to throw an exception. // throw std::runtime_error("An exception happened!"); std::cout << "Result value: " << value << std::endl; return value; }) // process exception .then([](pplx::task<std::string> previousTask) { try { previousTask.get(); } catch (const std::exception& e) { std::cout << "exception: " << e.what() << std::endl; } }) .wait(); } void test_group_tasks() { auto sleep_print = [](int seconds, const std::string& info) { if (seconds > 0) { sleep(seconds); } std::cout << info << std::endl; }; auto/*std::array<pplx::task<int>, 3>*/ tasks = { pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 1"); return 1; }), pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 2"); return 2; }), pplx::create_task([sleep_print]() -> int { sleep_print(4, "Task 3"); return 3; }) }; { std::cout << "=== when_all ===" << std::endl; auto joinTask = pplx::when_all(std::begin(tasks), std::end(tasks)); auto result = joinTask.wait(); std::cout << "All joined thread. result: " << result << std::endl; } { std::cout << "=== when_any ===" << std::endl; auto joinTask = pplx::when_any(std::begin(tasks), std::end(tasks)) .then([](std::pair<int, size_t> result) { std::cout << "First task to finish returns " << result.first << " and has index " << result.second << std::endl; }); auto result = joinTask.wait(); std::cout << "Any joined thread. result: " << result << std::endl; } } void test_cancellation() { pplx::cancellation_token_source cts; std::cout << "Creating task..." << std::endl; auto task = pplx::create_task([cts]{ bool moreToDo = true; while (moreToDo) { if (cts.get_token().is_canceled()) { return; } else { moreToDo = []()->bool { std::cout << "Performing work at " << now() << std::endl; sleep(1); return true; }(); } } }); sleep(3); std::cout << "Canceling task... " << now() << std::endl; cts.cancel(); std::cout << "Waiting for task to complete... " << now() << std::endl; task.wait(); std::cout << "Done. " << now() << std::endl; } void test_cancellation_async() { pplx::cancellation_token_source cts; auto task = pplx::task<void>([cts]() { std::cout << "Cancel continue_task" << std::endl; cts.cancel(); }) .then([]() { std::cout << "This will not run" << std::endl; }, cts.get_token()); try { if (task.wait() == pplx::task_status::canceled) { std::cout<<"Taks has been canceled"<<std::endl; } else { task.get(); } } catch (const std::exception& e) { std::cout << "exception: " << e.what() << std::endl; } } int main(int argc, char* args[]) { std::cout << "==== pplx demo ====" << std::endl; test_task_chain(); test_group_tasks(); test_cancellation(); test_cancellation_async(); return 0; }
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- sqltoy-orm-4.17.6 发版,支持 Greenplum、并行查询可设置并行数量
- PostgreSQL并行查询介绍
- nodejs“并行”处理尝试
- 并行python迭代
- Golang 多核并行
- haskell – 有效的并行策略
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。