内容简介:If you’ve ever wondered howInspiration for this blog post comes from two crates,Our implementation will have slightly different goals from
If you’ve ever wondered how
block_on
from the
futures
crate works, today we are going to write our own version of the function.
Inspiration for this blog post comes from two crates,
wakeful
and
extreme
. wakeful
has devised a simple way to create a
Waker
from a function, while extreme
is an extremely terse implementation of block_on()
.
Our implementation will have slightly different goals from extreme
. Rather than going for zero dependencies and minimal number of lines of code, we’ll go for a safe and efficient but still pretty simple implementation.
Dependencies we’re going to use are
pin-utils
,
crossbeam
, and
async-task
.
Function signature
The signature of block_on
looks as follows. We take a future as an argument, run it on the current thread (blocking whenever it is pending), and return its output:
fn block_on<F: Future>(future: F) -> F::Output { todo!() }
Now let’s implement the missing todo!()
part…
First attempt
Note that the
poll
method on every
Future
takes a pinned future. So we need to pin it first. While there is a way to do that safely using
Box::pin()
, we’d rather pin the future on the stack than heap.
Unfortunately, the only way of pinning futures on the stack safely is by using the pin-utils
crate:
pin_utils::pin_mut!(future);
The
pin_mut
macro converts future
from a variable of type F
into one of type Pin<&mut F>
.
Next we’ll need to specify what happens when this future is woken. In this case, waking should simply unblock the thread running the future.
Constructing a Waker
can be gnarly — just take a peek at the implementation
of extreme
. And this is the simplest possible way of constructing a Waker
by hand! So many raw pointers, so much unsafe code… let’s skip this part for now and fill in the blank later.
let waker = todo!();
Finally, we create a task context from the waker and keep polling the future in a loop. If it completes, return the output. If it’s pending, block the current thread:
let cx = &mut Context::from_waker(&waker); loop { match future.as_mut().poll(cx) { Poll::Ready(output) => return output, Poll::Pending => thread::park(), } }
In case you’re puzzled by the
Context
type, it’s a wrapper around Waker
— there’s nothing more to it. When async/await in Rust was being designed, we weren’t sure if it’d be useful to pass anything else besides a Waker
to poll()
so we came up with this wrapper that might hold more stuff in a future version of Rust.
Anyways… we’re almost done. Let’s go back to waker construction and fill in the blank marked with todo!()
.
If you think about it, Waker
is really just a carefully optimized, fancy version of Arc<dyn Fn() + Send + Sync>
, and
wake()
invokes this function. Also put yet another way, a Waker
is a callback that gets invoked whenever the future can continue execution.
Since Waker
is so difficult to construct, sagebind
came up
waker_fn()
, a straightforward way to convert any function into a Waker
. Unfortunately, wakeful
seems to be yanked at the moment, so I borrowed waker_fn()
and put it into my crate async-task
.
In our block_on
, the callback unblocks the thread running the future:
let thread = thread::current(); let waker = async_task::waker_fn(move || thread.unpark());
So simple! Much better than fiddling with
RawWaker
and
RawWakerVTable
.
Internally, the waker_fn()
constructor literally creates an Arc<impl Fn() + Send + Sync>
and then converts it into Waker
with unsafe code that looks similar to what we saw in extreme
.
Here’s a complete implementation of block_on()
:
fn block_on<F: Future>(future: F) -> F::Output { pin_utils::pin_mut!(future); let thread = thread::current(); let waker = async_task::waker_fn(move || thread.unpark()); let cx = &mut Context::from_waker(&waker); loop { match future.as_mut().poll(cx) { Poll::Ready(output) => return output, Poll::Pending => thread::park(), } } }
See
v1.rs
if you’d like to try running this code.
A problem with parking
But, it’s not time to celebrate yet. There’s a problem. If user code inside the future also makes use of the park/unpark API, it may pick up and “steal” unpark notifications from the callback. Read this issue for a more elaborate explanation.
A possible solution is to use a way of parking and unparking threads different from the one inside the
std::thread
module. That way, code inside the future will not be able to interfere with waking.
There’s a very similar park/unpark mechanism in crossbeam
, except it allows us to create arbitrarily many independent parkers
rather than having one per thread. Let’s create one per invocation of block_on()
:
fn block_on<F: Future>(future: F) -> F::Output { pin_utils::pin_mut!(future); let parker = Parker::new(); let unparker = parker.unparker().clone(); let waker = async_task::waker_fn(move || unparker.unpark()); let cx = &mut Context::from_waker(&waker); loop { match future.as_mut().poll(cx) { Poll::Ready(output) => return output, Poll::Pending => parker.park(), } } }
That’s it! Problem solved.
See
v2.rs
if you’d like to try running this code.
A caching optimization
Creating a
Parker
and Waker
is not free — both of those incur the cost of an allocation, which is unfortunate. Can we improve?
Instead of constructing a Parker
and Waker
on each invocation of block_on
, why not cache them in thread-local storage? That way a thread will reuse the same instances across all invocations of block_on()
:
fn block_on<F: Future>(future: F) -> F::Output { pin_utils::pin_mut!(future); thread_local! { static CACHE: (Parker, Waker) = { let parker = Parker::new(); let unparker = parker.unparker().clone(); let waker = async_task::waker_fn(move || unparker.unpark()); (parker, waker) }; } CACHE.with(|(parker, waker)| { let cx = &mut Context::from_waker(&waker); loop { match future.as_mut().poll(cx) { Poll::Ready(output) => return output, Poll::Pending => parker.park(), } } }) }
If the future is quick to execute, this small change will make block_on()
dramatically more efficient!
See
v3.rs
if you’d like to try running this code.
What about recursion?
Are we done yet? Well… just one more last thing.
What if the future inside block_on()
calls block_on()
again recursively? We can either permit or forbid recursion.
If we choose to permit recursion, then we also need to make sure recursive calls of block_on()
don’t share the same Parker
and Waker
instances, or else there’s no way to tell which block_on()
invocation gets woken.
The block_on()
from the futures
crate panics on recursive invocations of block_on()
. I don’t have a strong opinion on whether permitting or forbidding recursion is better — both behaviors are sensible. But, since we’re mimicking the futures
version, let’s forbid recursion.
To detect recursive invocations, we could introduce another thread-local variable indicating whether we’re currently inside block_on()
or not. But that’s a lot of work.
Here’s a cool trick that requires fewer changes to the code. Let’s wrap (Parker, Waker)
into a
RefCell
, and panic if a mutable borrow is already active:
fn block_on<F: Future>(future: F) -> F::Output { pin_utils::pin_mut!(future); thread_local! { static CACHE: RefCell<(Parker, Waker)> = { let parker = Parker::new(); let unparker = parker.unparker().clone(); let waker = async_task::waker_fn(move || unparker.unpark()); RefCell::new((parker, waker)) }; } CACHE.with(|cache| { let (parker, waker) = &mut *cache.try_borrow_mut().ok() .expect("recursive `block_on`"); let cx = &mut Context::from_waker(&waker); loop { match future.as_mut().poll(cx) { Poll::Ready(output) => return output, Poll::Pending => parker.park(), } } }) }
Finally. Now we’re really done, I promise! This final implementation is as correct, as robust, and as efficient as it gets. More or less. :)
See
v4.rs
if you’d like to try running this code.
Benchmarks
To test how efficient our block_on()
is, let’s benchmark it against the one from futures
.
But first, we’ll write a helper future type that yields a number of times and then completes:
struct Yields(u32); impl Future for Yields { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { if self.0 == 0 { Poll::Ready(()) } else { self.0 -= 1; cx.waker().wake_by_ref(); Poll::Pending } } }
As an example, to benchmark a future yielding 10 times, we write:
#[bench] fn custom_block_on_10_yields(b: &mut Bencher) { b.iter(|| block_on(Yields(10))); }
Let’s make a set of three benchmarks with futures yielding 0, 10, and 50 times. We run those using our custom block_on()
and then using block_on()
from futures
. You can find the full benchmark code in
yield.rs
.
And here are the results on my machine:
test custom_block_on_0_yields ... bench: 3 ns/iter (+/- 0) test custom_block_on_10_yields ... bench: 130 ns/iter (+/- 12) test custom_block_on_50_yields ... bench: 638 ns/iter (+/- 20)
test futures_block_on_0_yields ... bench: 10 ns/iter (+/- 0) test futures_block_on_10_yields ... bench: 236 ns/iter (+/- 10) test futures_block_on_50_yields ... bench: 1,139 ns/iter (+/- 30)
The numbers say our custom block_on()
is roughly 2 or 3 times faster in this particular benchmark, which is not bad at all!
Conclusion
Async Rust can feel intimidating because it contains so much machinery: the Future
trait, pinning, the Context
type, Waker
and its friends RawWaker
and RawWakerVTable
, desugaring of async
and await
, unsafe code, raw pointers, and so on.
But the thing is, a lot of the ugly stuff is not even that important — it’s really just boring boilerplate that can be removed with crates like pin-utils
, async-task
, and crossbeam
.
And indeed, today we managed to build an efficient block_on()
in few lines of safe code without having to understand most of that boilerplate. In another blog post, we’ll build a real executor…
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。