内容简介:Streams Concurrency— 2019-12-21In our last post about Async Rust we looked atFutures concurrency, and before that we looked atRust streams. In this post we bring the two together, and will take a closer look at concurrency with Rust streams.
Streams Concurrency
— 2019-12-21
In our last post about Async Rust we looked atFutures concurrency, and before that we looked atRust streams. In this post we bring the two together, and will take a closer look at concurrency with Rust streams.
Primer: Data Modeling
Much of async programming is about modeling relationships in code. So before we dig into streams concurrency, it's worth talking about the relationships we're trying to model first.
In the wild you may see relationships expressed as "M:1" or "MPSC" , but all they are is descriptors of relationships. I've written about these relationships before if you're interested in learning more. But for the sake of this post, here's a brief overview:
Relationship | Kind | Graph Kind | Channel Kind | Example |
---|---|---|---|---|
one-to-one | 1:1 | List | SPSC | Iterating over stream output in a loop. |
one-to-many | 1:N | Tree | SPMC | Writing stream output to multiple destination channels. |
many-to-one | N:1 | Tree | MPSC | Combining multiple streams into a single stream. |
many-to-many | M:N | Graph | MPMC | An event bus where each node can both emit and listen. |
In theory any of the relationships can be expressed in terms of any other relationship. But in practice using constructs that were actually designed for the relationship is much nicer.
one-to-one
The most basic way of processing a stream is in a sequence. For each element produced in the stream we perform an operation. We guaranteed a new operation is run only after the previous operation has ended.
In today's Rust this is usually written using a while let Some / await
loop or for_each
stream adapter:
let a = stream::repeat(1u8); while let Some(num) = a.next().await { println!("{:?}", num); } let b = stream::repeat(1u8) .for_each(|num| println!("{:?}", num)) .await;
for_each
and while let Some
loops are mostly comparable. The only difference is that the for_each
cannot use break
or continue
statements the way while let Some
loops can.
Serial parsing of streams is most comparable to synchronous loops. But instead of blocking between iterations of the loop, it waits for the task without blocking any threads through an executor. Just like serial processing of iterators is a common operation, so is serial processing of streams.
many-to-one
Sometimes you want to combine multiple streams into a single stream. For example there might be a stream of events coming from a network connection, and a series of events coming from another part of the system through a local channel. You may want to combine these into a single stream of events. In async-std
we expose 3 ways of combining streams:
- chain : exhaust "a", then exhaust "b".
- zip : combine "a" and "b" into a stream of tuples.
- merge : combine "a" and "b" into a stream that outputs items from either as they become ready.
Both chain
and zip
have counterparts in std::Iterator
. However merge
is a novel API we've introduced as part of async_std
. It allows awaiting two streams in parallel, as if they were a single stream. To see why let's take a look at how we might solve this without Stream::merge
:
// Define shared inner loop logic. fn print(num: u8) { println!("num: {}", num); } // Exhaust the first stream in one thread. let a = task::spawn(async { let nums = stream::repeat(1u8).take(100); while let Some(num) = nums.next().await { print(num); } }); // Exhaust the second stream in another thread. let b = task::spawn(async { let nums = stream::repeat(2u8).take(100); while let Some(num) = nums.next().await { print(num); } }); // Wait until both streams have been exhausted a.join(b).await;
However with Stream::merge
we can greatly reduce this logic by creating a single stream that yields items from both streams as soon as they become available, and spawning fewer tasks in the process:
let a = stream::repeat(1u8).take(100); let b = stream::repeat(2u8).take(100); let nums = a.merge(b); while let Some(num) = nums.next().await { println!("num: {}", num); }
one-to-many
Going from a single stream to multiple streams is a bit trickier than what we've seen before. But luckily Rust makes this easy through the channel
abstraction. And as part of async-std
we've built a channel impl that's both easy and performant.
Channels always come in pairs. When you create a new channel, you get back a tuple of (Sender, Receiver)
. Whenever a value is written from a Sender
, it's received by a Receiver
. Both structs implement Clone
, Send
, and Sync
so they can freely be shared around between as many threads and tasks as you want. Receiver
also implements Stream
which makes it easy to read values with.
In the following example we split a stream of numbers into two streams: one of even numbers, and one of uneven numbers. We pass both Sender
s on one side, which means that from the other side we're now free to read values from either stream.
use async_std::stream; use async_std::sync; let (even_reader, even_writer) = sync::channel(1); let (uneven_reader, uneven_writer) = sync::channel(1); let num_stream = stream::repeat(10u8).take(20); while let Some(num) = num_stream.next().await { match num % 1 { 0 => even_writer.send(num).await; _ => uneven_writer.send(num).await; } } // We can now asynchronously read from `even_reader` // and `uneven_reader` in separate tasks.
Channels in async-std
are incredibly versatile. They're very similar to event emitters in other languages, and can often be used in similar scenarios. The only difference is that channels in async-std
currently don't support sending values from a single reader to all receivers; but that's something we're considering adding. This would allow our streams to model many-to-many relationships as well.
Collecting Streams
Often times when a computation is over you'd like to store the results somewhere. An example is printing to stdout, but also collecting all output in a vector. In std this is done through Iterator::collect
. Similarly with streams this can be done through Stream::collect
.
The trick to making collect
work is the FromStream
trait. This converts a stream into a type asynchronously. In async-std
we've implemented this for a lot of std's types. When combined with some of the other stream types this allows for creating really nice pipeline patterns.
// Create a stream of tuples, and collect into a hashmap. let a = stream::once(1u8); let b = stream::once(0u8); let s = a.zip(b); let map: HashMap<u8, u8> = s.collect().await; assert_eq!(map.get(&1), Some(&0u8));
There are even more interesting patterns possible; for example if you have a stream of Result<T, E>
you could collect into a Result<Vec<T>, E>
. This allows short-circuiting the stream as soon as an error occurs.
use async_std::prelude::*; use async_std::stream; let v = stream::from_iter(vec![1, 2]); let res: Result<Vec<u32>, &'static str> = v.map(|x: u32| x.checked_add(1).ok_or("Overflow!") ).collect().await; assert_eq!(res, Ok(vec![2, 3]));
Creating streams from collections
The IntoStream
trait implements how to convert any type into a stream. It's the asynchronous counterpart to IntoIterator
:
// sync vec![1, 2, 3].iter().for_each(|n| dbg!(n)); // async, proposed vec![1, 2, 3].stream().for_each(|n| dbg!(n)).await;
Unfortunately due to the orphan rules we're currently not able to implement IntoStream
for any of the std::collection
types. This would need to happen as part of futures-core
because that is where the Stream
trait is defined.
We'd also love to propose adding FromStream
to futures-core
, but unfortunately it's hard to implement without language support for async fn
in traits. In async-std
we require a Pin<Box<T>>
as the return type, which is less efficient than what we'd want it to be. And for collect
to work, we really need both traits.
Cancelling streams
Often it's desirable to stop a stream remotely. If we want to shut down gracefully often the first step is to stop processing new data. And that's something that needs to be initialized remotely.
For that we have the stop-token
crate. An experimental stream built on top of channels that provides remote cancellation of streams:
use stop_token::StopToken; async fn do_work(work: impl Stream<Item = Event>, stop_token: StopToken) { // The `work` stream will end early: as soon as `stop_token` is cancelled. let mut work = stop_token.stop_stream(work); while let Some(event) = work.next().await { process_event(event).await } }
We think cancellation is something that should be easy to do, and stop-token
is a first attempt at implementing that. All credit to matklad for this.
Parallel Streams
A common pattern when processing data is to create a parallel fan-out / fan-in pipeline . Say we have a stream of data; we'd like to process that data in parallel. And once it's done processing, we either collect
it, or iterate over values one-by-one.
In synchronous Rust you can choose to hand-write such a pipeline, or perhaps use crossbeam::scope
. But the nicest way of pipelining data is still rayon
.
The way rayon
works is by providing a parallel version of Iterator
called ParallelIterator
that, as its name implies allows processing iterators in parallel. Going from sequential execution to parallel execution usually means just replacing iter
with par_iter
:
use rayon::prelude::*; fn sum_of_squares(input: &[i32]) -> i32 { input.par_iter() // <-- just change that! .map(|&i| i * i) .sum() }
Similarly we should be able to apply this model to Stream
by introducing a new ParallelStream
trait that operates on items in parallel by calling task::spawn
under the hood. All that would be needed would be an added call to par
.
use parallel_stream::prelude::*; async fn sum_of_squares(input: impl Stream<Item = u32>) -> i32 { input.par() // <-- just add that! .map(|&i| i * i) .sum() .await }
Unfortunately despite being fairly convinced that this is possible to implement, we currently don't have an implementation of besides this sketch . If you'd like to help out on parallel-stream
, let us know on GitHub or Discord and we'll gladly help. We really want to see this exist!
Concurrency in futures-rs
You may have noticed that we haven't mentioned the futures
library much. In part that is because it doesn't provide many abstractions that work with executors. But let's take a quick look at the stream concurrency abstractions that are provided:
-
stream::futures_unordered
: aSet
of futures that can resolve in any order.. -
StreamExt::for_each_concurrent
: a concurrent version ofStream::for_each
. -
StreamExt::try_for_each_concurrent
: a concurrent version ofStream::try_for_each
. -
select!
: Polls multiple futures and streams simultaneously, executing the branch for the future that finishes first.
futures_unordered
, for_each_concurrent
, and friends occupy roughly the same space as ParallelStream
. But ParallelStream
has the benefit that it should be a bit more flexible, and make better use of system resources by leveraging an executor.
select!
is an abstraction which is somewhat similar to match
but operates on streams and futures directly. But it's not without its shortcoming. select!
introduces a two new keywords: default
and complete
. It requires all futures and streams to be manually fused. And has also required changes in futures
that have created deviations from the stdlib, which is a cost in itself.
let a_fut = async_identity_fn(1).fuse(); let b_fut = async_identity_fn(2).fuse(); let mut total = 0; loop { select! { a = a_fut => total += a, b = b_fut => total += b, complete => break, default => panic!(), // never runs (futures run first, then complete) }; } assert_eq!(total, 10);
It seems that as we're moving towards extending the stdlib with more of futures
' core types, we'll eventually need to provide a solution on how to operate on streams. And because there's no precedent for including a macro as complex as select!
in the stdlib, it seems likely it would either need to be upgraded to a language construct, or we would need to look at alternatives that can be included.
With async-std
we've chosen to look for alternatives instead. And between Stream::merge
for combining streams, and stop-token
for cancelling them it seems we're well on our way. But we're not at the end of the road yet, and we'd like to keep experimenting and documenting stream adapters until we confidently can replace all uses of select!
.
Looking Ahead: Language Support
A bit further down the line it might be interesting to consider what language support could look like for parallel Rust. If we can define parallel counterparts to Iterator
and Stream
, it begs the question if we could also define parallel counterparts to loop
, for
, and while
.
Take for example the following TCP server. It listens for incoming requests, and processes them in sequence. It has a maximum concurrency of 1, but the code is really easy to follow.
let mut listener = TcpListener::bind("127.0.0.1:8080"); println!("Listening on {}", listener.local_addr()?); for stream in listener.incoming() { let stream = stream?; println!("Accepting from: {}", stream.peer_addr()?); io::copy(&stream, &stream)?; }
Now if we try and parallelize the server using async-std
today we can suddenly handle thousands of requests concurrently, but the code itself is much less readable. And worse, we can no longer pass errors from the inner scope back up to the outer scope without calling .await
on each JoinHandle
, and making the request handler serial again.
let mut listener = TcpListener::bind("127.0.0.1:8080").await?; println!("Listening on {}", listener.local_addr()?); while let Some(stream) = listener.incoming().await { let stream = stream?; task::spawn(async move { println!("Accepting from: {}", stream.peer_addr().unwrap()); io::copy(&stream, &stream).await.unwrap(); }); }
An API such as ParallelStream
would provide some alleviation here. It would allow us to implement a parallel server without losing the ability to return errors from scopes.
let mut listener = TcpListener::bind("127.0.0.1:8080").await?; println!("Listening on {}", listener.local_addr()?); listener .incoming() .par() .try_for_each(|stream| async move { let stream = stream?; println!("Accepting from: {}", stream.peer_addr()?); io::copy(&stream, &stream).await?; Ok(()) }) .await?;
But this suddenly feels very different from the API we started with. Code is just as much about reading as it is about writing. And being able to spot loops at a glance is very useful. So instead of only having chained operators, wouldn't it be nice if we could write parallel loops instead? Perhaps something like this:
let mut listener = TcpListener::bind("127.0.0.1:8080").await?; println!("Listening on {}", listener.local_addr()?); for par stream.await? in listener.incoming() { println!("Accepting from: {}", stream.peer_addr()?); io::copy(&stream, &stream).await?; }
This again looks a lot like the synchronous TCP server we started with. But instead of processing requests serially, it now uses all available cores without blocking.
Now this is not so much a concrete proposal, as a sketch to share what things could look like if we designed them that way. I'd like us to think big on this; following the tradition of making seemingly complex concepts surprisingly accessible. Not only would we be free of data races in parallel code. It'd be incredibly convenient to write as well.
Conclusion
In this post we've looked at different variants of streams concurrency, discussed the current state of support they have in Async Rust, and shared avenues worth exploring to improve the status quo.
Personally I'm very excited about how this post has come together. I feel like Rust is currently on a path to achieve what no other mainstream language has achieved: to make writing parallel code not only correct and performant. But to make it as easy as sequential code.
This might seem like a tall order, but between the diagnostics efforts, language design, compiler work, and libraries it really feels like that's where we're heading. And I couldn't be more excited.
Thanks to: Irina Shestak for the illustrations. And Stjepan Glavina, Sunjay Varma, Niko Matsakis, Withoutboats, Aaron Turon, Aleksey Kladov, Ryan Levick, and Friedel Ziegelmayer.
以上所述就是小编给大家介绍的《Streams Concurrency》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Programming Computer Vision with Python
Jan Erik Solem / O'Reilly Media / 2012-6-22 / USD 39.99
If you want a basic understanding of computer vision's underlying theory and algorithms, this hands-on introduction is the ideal place to start. As a student, researcher, hacker, or enthusiast, you'll......一起来看看 《Programming Computer Vision with Python》 这本书的介绍吧!
CSS 压缩/解压工具
在线压缩/解压 CSS 代码
HSV CMYK 转换工具
HSV CMYK互换工具