内容简介:I started experimenting with asynchronous Rust code back whenBut that's all in the past! I guess!Now everything is
I started experimenting with asynchronous Rust code back when futures 0.1
was all we had - before async/await
. I was a Rust baby then (I'm at least
a toddler now), so I quickly drowned in a see of .and_then
, .map_err
and Either<A, B>
.
But that's all in the past! I guess!
Now everything is fine
, and things go smoothly. For the most part. But even
with async/await
, there are still some cases where the compiler diagnostics are,
just, so much
.
There's been serious improvemenst already in terms of diagnostics - the errors aren't as rough as they used to be, but there's still ways to go. Despite that, it's not impossible to go around them and achieve the result you need.
So let's try to do some HTTP requests, get ourselves in trouble, and instead of just "seeing if a different crate would work", get to the bottom of it, and come out the other side slightly more knowledgeable.
Let's get started!
Shell session
$ cargo new trouble
Created binary (application) `trouble` package
Idealized HTTP requests
For this article, we'll use the tokio runtime, with the reqwest HTTP client.
TOML markup
# in `Cargo.toml`
[dependencies]
tokio = { version = "0.2.21", features = ["full"] }
reqwest = "0.10.6"
Cool bear's hot tip
You can opt in and out of Tokio features, but Amos is just
being lazy here and using the full
feature to get "the whole
package".
Hey, no dissent in the ranks. We have places to go!
So, we'll make a request to the IANA-managed reserved
domain
example.org
:
Rust code
// in `src/main.rs`
use std::{cmp::min, error::Error};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let text = reqwest::get("http://example.org").await?.text().await?;
println!("response = {}", &text[..min(text.len(), 40)]);
Ok(())
}
Mhh. Seems a little too simple to be a fully-working example, but:
Shell session
$ cargo run -q
response = <!doctype html>
<html>
<head>
<title
It works!
Ah, I wish I could end the article here. Look how pretty it is. Even though
we're in in an async
function, we can return a boxed error ( Box<dyn Error>
), we can use the ?
sigil to bubble up errors, we can chain await
s. First to send
the request, and then to retrieve the body of the
response.
It's beautiful.
Cool bear's hot tip
Alright champ, settle down.
But I'm not really satisfied with it - there's so many of you following at home, coding as you go along - I wouldn't want y'all to DDoS the IANA servers. That would be uncouth.
So let's spin up a test server of our own. We'll grab a crate at random closes eyes, spins lib.rs uhhhhhh that one .
Shell session
$ cargo add tiny_http
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding tiny_http v0.7.0 to dependencies
We'll also do proper logging:
Shell session
$ cargo add log pretty_env_logger
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding log v0.4.8 to dependencies
Adding pretty_env_logger v0.4.0 to dependencies
Rust code
// in `src/main.rs`
use std::error::Error;
mod server;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::formatted_timed_builder()
.filter_level(log::LevelFilter::Info)
.init();
server::start();
let text = reqwest::get("http://localhost:1729").await?.text().await?;
log::info!("Request successful: {}", &text[..]);
Ok(())
}
Cool bear's hot tip
1729 is Ramanujan's number - it is the smallest number expressible as the sum of two cubes in two different ways.
Rust code
// in `src/server.rs`
use std::thread::spawn;
use tiny_http::{Response, Server};
pub fn start() {
spawn(|| run());
}
fn run() {
let server = Server::http("127.0.0.1:1729").unwrap();
for req in server.incoming_requests() {
log::info!("Serving {:?}", req.url());
let res = Response::from_string("Hello from test server!");
req.respond(res).unwrap();
}
}
Shell session
$ cargo run -q
2020-07-07T15:00:28.055Z INFO trouble::server > Serving "/"
2020-07-07T15:00:28.056Z INFO trouble > Request successful: Hello from test server!
Wonderful.
Real-world HTTP requests (in the Paleoproterozoic Era)
The approach we used above is pretty naive though. Real-world HTTP requests don't always succeed. Sometimes they fail, and sometimes, it's not even your fault!
It's a story as old as time.
The year is 1800 million years ago.
Cool bear's hot tip
Okay, gonna sit this one out, good luck readers.
You are a string of beads connected by a very fine thread , and you're trying to break into the service industry. You hear about this internet thing, investigate, and before you know it, you've signed a contract with a CDN company with a minimum 10TB monthly commitment.
Things appear to be running smoothly, until you notice a series of failed requests in the log. Turns out the CDN you picked has POP in Laurentia , but not in Baltica !
Your Baltic customers are seeing request errors left and right, it's a disater. But you can't afford to break off your six-month engagement with your CDN, so you decide to solve the solution client-side.
Meanwhile, at your CDN's offices, they've just deployed their state of the art DDoS protection:
Rust code
// in `src/server.rs`
fn run() {
let server = Server::http("127.0.0.1:1729").unwrap();
let mut map: HashMap<_, usize> = HashMap::new();
for req in server.incoming_requests() {
let count = *map
.entry(req.remote_addr().ip())
.and_modify(|c| *c += 1)
.or_default();
log::info!("Serving {:?}", req.url());
let res = if count < 2 {
Response::from_string("The mainframe is warming up...").with_status_code(503)
} else {
Response::from_string("Hello from test server!")
};
req.respond(res).unwrap();
}
}
Your client is unprepared for such measures, as it doesn't even return an error:
Shell session
$ cargo run -q
2020-07-07T15:27:46.388Z INFO trouble::server > Serving "/"
2020-07-07T15:27:46.389Z INFO trouble > Request successful: The mainframe is warming up...
Luckily, it's pretty easy to address that
Rust code
// in `src/main.rs`
// in `async fn main()`
let text = reqwest::get("http://localhost:1729")
.await?
.error_for_status()?
.text()
.await?;
Shell session
$ cargo run -q
2020-07-07T15:29:24.127Z INFO trouble::server > Serving "/"
Error: reqwest::Error { kind: Status(503), url: "http://localhost:1729/" }
..but that doesn't really solve your problem, now does it?
You decide to simply retry a request if it fails:
Rust code
// in `src/main.rs`
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// omitted: server & logging setup
async fn do_req() -> Result<(), Box<dyn Error>> {
let text = reqwest::get("http://localhost:1729")
.await?
.error_for_status()?
.text()
.await?;
log::info!("Request successful: {}", &text[..]);
Ok(())
}
for _ in 0..5 {
if let Err(e) = do_req().await {
log::error!("{}", e);
} else {
break;
}
}
Ok(())
}
sh
$ cargo run -q
2020-07-07T15:51:54.177Z INFO trouble::server > Serving "/"
2020-07-07T15:51:54.178Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/)
2020-07-07T15:51:54.183Z INFO trouble::server > Serving "/"
2020-07-07T15:51:54.184Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/)
2020-07-07T15:51:54.190Z INFO trouble::server > Serving "/"
2020-07-07T15:51:54.190Z INFO trouble > Request successful: Hello from test server!
Better! For you.
But not for your CDN.
If at first you don't succeed, back off
Shortly after you roll out your client update, your Paleophone rings.
"Hello? Yes, this is Horodyskia..."
The conversation is short and to the point.
"High request volume, you say? I'll be right on it."
Your CDN rep is not happy. Not happy at all. In a bout of anger, they scolded you: "it's not the Archean anymore - get with the program!"
So, you look up "http request retry best practices", and quickly realize you're supposed to wait before retrying, to give the backend some time to recover.
Rust code
// in `src/main.rs`
use std::time::Duration;
// in `async fn main()`
for _ in 0..5 {
if let Err(e) = do_req().await {
log::error!("{}", e);
tokio::time::delay_for(Duration::from_secs(1)).await;
} else {
break;
}
}
Things seem a little better:
Shell session
$ cargo run -q
2020-07-07T16:01:30.501Z INFO trouble::server > Serving "/"
2020-07-07T16:01:30.502Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/)
2020-07-07T16:01:31.509Z INFO trouble::server > Serving "/"
2020-07-07T16:01:31.510Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/)
2020-07-07T16:01:32.517Z INFO trouble::server > Serving "/"
2020-07-07T16:01:32.518Z INFO trouble > Request successful: Hello from test server!
But you're not about to stop there. The CDN rep's diatribe has left you wounded. Your code? Bad? That's impossible. But just in case, you started a full code review.
And you realize there's a lot of code duplication. For every request you do,
you have to use a for
loop to retry.
You decide to make a re-usable facility to retry HTTP requests.
Rust code
// in `src/client.rs`
use reqwest::{Error, IntoUrl, Method, Request, RequestBuilder, Response};
use std::time::Duration;
pub struct Client {
inner: reqwest::Client,
}
impl Client {
pub fn new() -> Result<Self, Error> {
let inner = reqwest::Client::builder()
.user_agent("horo bot/1.0")
.build()?;
Ok(Self { inner })
}
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
self.inner.request(method, url)
}
pub async fn execute(&self, req: Request) -> Result<Response, Error> {
let mut tries: usize = 5;
loop {
let res = self
.inner
.execute(req.try_clone().unwrap())
.await
.and_then(|r| r.error_for_status());
match res {
Err(e) if tries > 1 => {
tries -= 1;
log::error!("{}", e);
tokio::time::delay_for(Duration::from_secs(1)).await;
}
res => return res,
}
}
}
}
You're pleased with how easy it is to use:
Rust code
// in `src/main.rs`
mod client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::formatted_timed_builder()
.filter_level(log::LevelFilter::Info)
.init();
server::start();
let client = client::Client::new()?;
let req = client
.request(reqwest::Method::GET, "http://localhost:1729")
.build()?;
let text = client.execute(req).await?.text().await?;
log::info!("Request successful: {}", &text[..]);
Ok(())
}
Sure, it panics if the request has a body. But bodies haven't been invented yet, so all is well in Proto-Laurasia.
As you re-read "Retrying HTTP requests - 10 best practices", you notice number 7 says to wait longer and longer between requests - and to add a random amount of time ("jitter", they call it), to avoid having many instances of your client synchronize their retry cycles and hammer the backend mercilessly.
"Well", you think to yourself (you don't possess any mouth-like appendages), that seems like something another organism has done before.
And sure enough, crates.io (which has always existed) has your back.
Shell session
$ cargo add backoff backoff-futures
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding backoff v0.1.6 to dependencies
Adding backoff-futures v0.3.0 to dependencies
It's great timing, too, as your CDN has decided to increase the number of "warm-up rounds" to 5.
Rust code
// in `src/server.rs`
// in `fn run()`
// in `for req in server.incoming_requests()`
let res = if count < 5 {
You find that the backoff
crates are rather easy to use, and
you're happy:
Rust code
// in `src/client.rs`
impl Client {
// omitted: other methods
pub async fn execute(&self, req: Request) -> Result<Response, Error> {
let exec = || async {
self.inner
.execute(req.try_clone().unwrap())
.await
.and_then(|r| r.error_for_status())
.map_err(backoff::Error::Transient)
};
let mut backoff = backoff::ExponentialBackoff::default();
use backoff_futures::BackoffExt;
exec.with_backoff(&mut backoff).await.map_err(|e| match e {
backoff::Error::Permanent(e) | backoff::Error::Transient(e) => e,
})
}
}
It even seems to work!
Shell session
$ cargo run -q
2020-07-07T16:58:18.982Z INFO trouble::server > Serving "/"
2020-07-07T16:58:19.356Z INFO trouble::server > Serving "/"
2020-07-07T16:58:19.767Z INFO trouble::server > Serving "/"
2020-07-07T16:58:21.425Z INFO trouble::server > Serving "/"
2020-07-07T16:58:22.338Z INFO trouble::server > Serving "/"
2020-07-07T16:58:25.834Z INFO trouble::server > Serving "/"
2020-07-07T16:58:25.835Z INFO trouble > Request successful: Hello from test server!
Cool bear's hot tip
15 minutes in and not a single Futures-related problem.
Nicely done.
A Futures-related problem
Time has passed - it's time... for the Mesoproterozoic era.
It comes with its share of code churn. What used to be a standalone executable must now become a shared library, so it can be loaded as a "native add-on" for an Electron app.
The #[tokio::main]
attribute can no longer be used - since there is
no longer a main
function. You have to manage the Runtime manually.
And all the library calls are synchronous, so you have to spawn tasks
onto the runtime and deal with the outcome later.
Rust code
// in `src/main.rs`
use std::error::Error;
use tokio::runtime::Runtime;
mod client;
mod server;
fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::formatted_timed_builder()
.filter_level(log::LevelFilter::Info)
.init();
server::start();
let mut runtime = Runtime::new().unwrap();
let task = || async {
let client = client::Client::new()?;
let req = client
.request(reqwest::Method::GET, "http://localhost:1729")
.build()?;
let text = client.execute(req).await?.text().await?;
log::info!("Request successful: {}", &text[..]);
let res: Result<_, Box<dyn Error>> = Ok(());
res
};
let join_handle = runtime.spawn((|| async move {
match task().await {
Ok(_) => {}
Err(e) => {
log::error!("Something went wrong: {}", e);
}
}
})());
runtime.block_on(join_handle).unwrap();
Ok(())
}
Cool bear's hot tip
There's uhhh.. there's still a main
function.
Yeah cool bear, I know. There's still a main
function. But do you
really want me to go into details of exposing our functionality as
a native node.js addon now?
Cool bear's hot tip
Ah. Maybe later.
Exactly.
The important thing is that it compiles and runs just fine, and as you can see, it..
Shell session
$ cargo run -q
error: future cannot be sent between threads safely
--> src/main.rs:29:31
|
29 | let join_handle = runtime.spawn((|| async {
| ^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = std::result::Result<reqwest::async_impl::response::Response, backoff::error::Error<reqwest::error::Error>>>`
note: future is not `Send` as it awaits another future which is not `Send`
--> src/client.rs:33:9
|
33 | exec.with_backoff(&mut backoff).await.map_err(|e| match e {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here on type `std::pin::Pin<std::boxed::Box<dyn std::future::Future<Output = std::result::Result<reqwest::async_impl::response::Response, backoff::error::Error<reqwest::error::Error>>>>>`, which is not `Send`
...doesn't?
But it just worked. And you haven't touched that part of the code.
What happened?
rustc says, " Send
is not implemented for this Future
". And that's because,
this Future
awaits another future
, which itself is not Send
.
What's the std::marker::Send trait for again?
Types that can be transferred across thread boundaries.
This trait is automatically implemented when the compiler determines it's appropriate.
An example of a non- Send
type is the reference-counting pointer
rc::Rc
. If two
threads attempt to clone
Rc
s that point to
the same reference-counted value, they might try to update the reference
count at the same time, which is undefined
behavior
because
Rc
doesn't use atomic operations. Its cousin
sync::Arc
does
use atomic operations (incurring some overhead) and thus is Send
.
See the Nomicon for more details.
Ah, that makes sense. I think.
We're using Runtime::spawn
to "poll this future in the background", it
would make sense that, since we've enabled the threaded executor (via the full
feature), it would be sent from the main thread to another thread.
Cool bear's hot tip
Even though you end up using Runtime::block_on
to wait on it from the
same thread?
Yeah! Even then, it might run in a different thread. We need to know that
we can send it to any thread we want, and that's what Send
is about.
But what about our Future
isn't Send
?
Let's try to see what the concrete type of our Future
is. A cool trick
to find the type of something is to try to assign it to a variable
of type ()
:
Rust code
// in `src/main.rs`
let f: () = task();
Shell session
$ cargo check -q
error[E0308]: mismatched types
--> src/main.rs:29:17
|
15 | let task = || async {
| _________________________-
16 | | let client = client::Client::new()?;
17 | |
18 | | let req = client
... |
26 | | res
27 | | };
| |_____- the found generator
28 |
29 | let f: () = task();
| -- ^^^^^^ expected `()`, found opaque type
| |
| expected due to this
|
::: /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/future/mod.rs:48:43
|
48 | pub const fn from_generator<T>(gen: T) -> impl Future<Output = T::Return>
| ------------------------------- the found opaque type
|
= note: expected unit type `()`
found opaque type `impl std::future::Future`
Ah. An opaque type. Well that trick doesn't work here, but it's a cool one anyway. Keep it in mind for synchronous code.
Let's try another trick:
Rust code
fn take_f<F, O>(f: F)
where
F: std::future::Future<Output = O> + Send,
{
}
take_f(task());
Shell session
$ cargo check -q
error: future cannot be sent between threads safely
--> src/main.rs:35:5
|
29 | fn take_f<F, O>(f: F)
| ------ required by a bound in this
30 | where
31 | F: std::future::Future<Output = O> + Send,
| ---- required by this bound in `main::take_f`
...
35 | take_f(task());
| ^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = std::result::Result<reqwest::async_
impl::response::Response, backoff::error::Error<reqwest::error::Error>>>`
note: future is not `Send` as it awaits another future which is not `Send`
--> src/client.rs:33:9
|
33 | exec.with_backoff(&mut backoff).await.map_err(|e| match e {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here on type `std::pin::Pin<std::boxed::Box<dyn std::future::Future<Output
= std::result::Result<reqwest::async_impl::response::Response, backoff::error::Error<reqwest::error::Error>>>>>`, which is not `Send`
Variations on a theme, huh. But at least we can now climb up the chain,
and see where the lack of Send
really comes from.
For example, we could go in client.rs
and mess with execute
:
Rust code
// in `src/client.rs`
impl Client {
pub async fn execute(&self, req: Request) -> Result<Response, Error> {
let exec = || async {
self.inner
.execute(req.try_clone().unwrap())
.await
.and_then(|r| r.error_for_status())
.map_err(backoff::Error::Transient)
};
// new:
fn take_f<F, O>(f: F)
where
F: std::future::Future<Output = O> + Send,
{
}
take_f(exec());
let mut backoff = backoff::ExponentialBackoff::default();
use backoff_futures::BackoffExt;
exec.with_backoff(&mut backoff).await.map_err(|e| match e {
backoff::Error::Permanent(e) | backoff::Error::Transient(e) => e,
})
}
}
This compiles just fine. So the future returned by exec()
is
Send
. Which
means Result<_, _>
is Send
, which means reqwest::Response
and reqwest::Error
are both Send
.
Let's move down the chain a bit. We'll improve our checking function so that it
returns the Future
as-is:
Rust code
// in `src/client.rs`
impl Client {
pub async fn execute(&self, req: Request) -> Result<Response, Error> {
let exec = || async {
self.inner
.execute(req.try_clone().unwrap())
.await
.and_then(|r| r.error_for_status())
.map_err(backoff::Error::Transient)
};
let mut backoff = backoff::ExponentialBackoff::default();
use backoff_futures::BackoffExt;
let f = async {
exec.with_backoff(&mut backoff).await.map_err(|e| match e {
backoff::Error::Permanent(e) | backoff::Error::Transient(e) => e,
})
};
fn check<F, O>(f: F) -> F
where
F: std::future::Future<Output = O> + Send,
{
f
}
check(f).await
}
}
Shell session
$ cargo check -q
error: future cannot be sent between threads safely
--> src/client.rs:45:9
|
39 | fn check<F, O>(f: F) -> F
| ----- required by a bound in this
40 | where
41 | F: std::future::Future<Output = O> + Send,
| ---- required by this bound in `client::Client::execute::{{closure}}#0::check`
...
45 | check(f).await
| ^^^^^ future created by async block is not `Send`
AhAH! So backoff-futures
is the problem.
Shell session
$ mkdir vendor
$ git clone https://github.com/jakubadamw/backoff-futures vendor/backoff-futures
Cloning into 'vendor/backoff-futures'...
(etc.)
$ git checkout 0.3.0
Note: switching to '0.3.0'.
You are in 'detached HEAD' state. You can look around, make experimental
(etc.)
Take a look around, git say. Alright, let's use ripgrep for that:
Shell session
$ rg -A 1 'Send'
src/lib.rs
91:#[async_trait::async_trait(?Send)]
92-pub trait BackoffExt<T, E, Fut, F> {
--
123:#[async_trait::async_trait(?Send)]
124-impl<T, E, Fut, F> BackoffExt<T, E, Fut, F> for F
Those are the only two mentions of Send
in the entire codebase.
You're starting to doubt that codebase ever worked, so, you decide to run the tests:
Shell session
$ cargo t -q
running 5 tests
.....
test result: ok. 5 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
running 1 test
test src/lib.rs - (line 13) ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
Nope, everything's fine.
Alright, let's look at that async_trait
then:
Rust code
// in `vendor/backoff-futures/src/lib.rs`
#[async_trait::async_trait(?Send)]
pub trait BackoffExt<T, E, Fut, F> {
/// Returns a future that, when polled, will first ask `self` for a new future (with an output
/// type `Result<T, backoff::Error<_>>` to produce the expected result.
///
/// If the underlying future is ready with an `Err` value, the nature of the error
/// (permanent/transient) will determine whether polling the future will employ the provided
/// `backoff` strategy and will result in the work being retried.
///
/// Specifically, [`backoff::Error::Permanent`] errors will be returned immediately.
/// [`backoff::Error::Transient`] errors will, depending on the particular [`backoff::backoff::Backoff`],
/// result in a retry attempt, most likely with a delay.
///
/// If the underlying future is ready with an [`std::result::Result::Ok`] value, it will be returned immediately.
async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>>
where
B: Backoff,
T: 'async_trait,
E: 'async_trait,
Fut: 'async_trait;
/// Same as [`BackoffExt::with_backoff`] but takes an extra `notify` closure that will be called every time
/// a new backoff is employed on transient errors. The closure takes the new delay duration as an argument.
async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>>
where
B: Backoff,
N: FnMut(&Error<E>, Duration),
T: 'async_trait,
E: 'async_trait,
Fut: 'async_trait;
}
Wait. An async fn
in a trait? I thought that wasn't stable yet?
Well... there's a crate for that .
And in that crate's documentation in the Non-threadsafe futures section, it says:
Not all async traits need futures that are dyn Future + Send
. To avoid
having Send and Sync bounds placed on the async trait methods, invoke the
async trait macro as #[async_trait(?Send)]
on both the trait and the impl
blocks.
AhAH! So async-trait
transforms async fn
s into regular fn
s, which by
default return Send
futures, but if you specify ?Send
, they're not Send
.
You take a moment to collect your thoughts. Things escalated quickly, you weren't prepared for this. But heroes rise to the occasion - even if, in that scenario, "rising" means "maintaining your own fork".
So you decide to get your beads dirty, and fix it:
Rust code
// in `vendor/backoff-futures/src/lib.rs`
#[async_trait::async_trait]
pub trait BackoffExt<T, E, Fut, F> {
// etc.
}
#[async_trait::async_trait]
impl<T, E, Fut, F> BackoffExt<T, E, Fut, F> for F
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, backoff::Error<E>>> {
// etc.
}
Shell session
$ # in vendor/backoff-futures
$ cargo check -q
cargo check -q error[E0277]: `F` cannot be sent between threads safely
--> src/lib.rs:135:5
|
129 | async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>>
| ------------------- within this `impl std::future::Future`
...
135 | / {
136 | | let backoff_struct = BackoffFutureBuilder { backoff, f: self };
137 | | backoff_struct.fut(|_, _| {}).await
138 | | }
| |_____^ `F` cannot be sent between threads safely
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `F`
= note: required because it appears within the type `[static generator@src/lib.rs:135:5: 138:6 _self:F, backoff:&mut B for<'r
, 's, 't0, 't1, 't2, 't3> {std::future::ResumeTy, F, &'r mut B, BackoffFutureBuilder<'s, B, F, Fut, T, E>, [closure@src/lib.rs:13
7:28: 137:37], impl std::future::Future, ()}]`
= note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@src/lib.rs:135:
5: 138:6 _self:F, backoff:&mut B for<'r, 's, 't0, 't1, 't2, 't3> {std::future::ResumeTy, F, &'r mut B, BackoffFutureBuilder<'s, B
, F, Fut, T, E>, [closure@src/lib.rs:137:28: 137:37], impl std::future::Future, ()}]>`
= note: required because it appears within the type `impl std::future::Future`
= note: required because it appears within the type `impl std::future::Future`
= note: required for the cast to the object type `dyn std::future::Future<Output = std::result::Result<T, backoff::error::Err
or<E>>> + std::marker::Send`
help: consider further restricting this bound
|
126 | F: FnMut() -> Fut + std::marker::Send,
| ^^^^^^^^^^^^^^^^^^^
(omitted: 10 more errors)
Okay, well, this should be easy. You just need to add a bunch of Send
bounds, right?
Rust code
// in `vendor/backoff-futures/src/lib.rs`
#[async_trait::async_trait]
pub trait BackoffExt<T, E, Fut, F> {
async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>>
where
B: Backoff + Send, // new!
T: 'async_trait,
E: 'async_trait,
Fut: 'async_trait;
/// Same as [`BackoffExt::with_backoff`] but takes an extra `notify` closure that will be called every time
/// a new backoff is employed on transient errors. The closure takes the new delay duration as an argument.
async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>>
where
B: Backoff + Send, // new!
N: FnMut(&Error<E>, Duration) + Send, // new!
T: 'async_trait,
E: 'async_trait,
Fut: 'async_trait;
}
#[async_trait::async_trait]
impl<T, E, Fut, F> BackoffExt<T, E, Fut, F> for F
where
F: (FnMut() -> Fut) + Send, // new
T: Send, // new
E: Send, // new
Fut: Future<Output = Result<T, backoff::Error<E>>> + Send, // new
{
async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>>
where
B: Backoff + Send, // new
T: 'async_trait,
E: 'async_trait,
Fut: 'async_trait,
{
let backoff_struct = BackoffFutureBuilder { backoff, f: self };
backoff_struct.fut(|_, _| {}).await
}
async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>>
where
B: Backoff + Send, // new
N: FnMut(&Error<E>, Duration) + Send, // new
T: 'async_trait,
E: 'async_trait,
Fut: 'async_trait,
{
let backoff_struct = BackoffFutureBuilder { backoff, f: self };
backoff_struct.fut(notify).await
}
}
Cool bear's hot tip
One, two, three... ten! Ten Send
bounds.
Yeah, that should do it:
sh
$ # still in vendor/backoff-futures
$ cargo t -q
running 5 tests
.....
test result: ok. 5 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
running 1 test
test src/lib.rs - (line 13) ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
Great! Before publishing your fork on crates.io, or thinking about upstreaming it, you decide to check that those changes actually solve your problem, though.
TOML markup
# in `Cargo.toml` (the root one)
[patch.crates-io]
backoff-futures = { path = "vendor/backoff-futures" }
Shell session
$ cargo run -q
2020-07-07T17:57:36.378Z INFO trouble::server > Serving "/"
2020-07-07T17:57:37.080Z INFO trouble::server > Serving "/"
2020-07-07T17:57:38.123Z INFO trouble::server > Serving "/"
2020-07-07T17:57:39.386Z INFO trouble::server > Serving "/"
2020-07-07T17:57:40.695Z INFO trouble::server > Serving "/"
2020-07-07T17:57:43.523Z INFO trouble::server > Serving "/"
2020-07-07T17:57:43.524Z INFO trouble > Request successful: Hello from test server!
Hooray! Wall of text: vanquished. Service: production-ready. Almost.
Right after you figure out which errors should be transient and which should be permanent .
Nevertheless, your codebase is in much better shape now. It's sure to last well into the Mesozoic era.
Closing words
I hope you enjoyed this little adventure in finding and figuring out a Rust futures issue. It's not always that colorful - but sometimes, it is!
And when it is, there's always something that can be done. Walls of rustc errors are not necessarily dead-ends. And everyone involved is already aware of the current usability issues, and working towards a smoother experience in general.
It might take some time . But it's in the works.
Cool bear's hot tip
GAT s or nothing!
We'll see, cool bear. We'll see.
以上所述就是小编给大家介绍的《Getting in and out of trouble with Rust futures》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Kafka权威指南
Neha Narkhede、Gwen Shapira、Todd Palino / 薛命灯 / 人民邮电出版社 / 2017-12-26 / 69.00元
每个应用程序都会产生数据,包括日志消息、度量指标、用户活动记录、响应消息等。如何移动数据,几乎变得与数据本身一样重要。如果你是架构师、开发者或者产品工程师,同时也是Apache Kafka新手,那么这本实践指南将会帮助你成为流式平台上处理实时数据的专家。 本书由出身于LinkedIn的Kafka核心作者和一线技术人员共同执笔,详细介绍了如何部署Kafka集群、开发可靠的基于事件驱动的微服务,......一起来看看 《Kafka权威指南》 这本书的介绍吧!