Getting in and out of trouble with Rust futures

栏目: IT技术 · 发布时间: 4年前

内容简介:I started experimenting with asynchronous Rust code back whenBut that's all in the past! I guess!Now everything is

I started experimenting with asynchronous Rust code back when futures 0.1 was all we had - before async/await . I was a Rust baby then (I'm at least a toddler now), so I quickly drowned in a see of .and_then , .map_err and Either<A, B> .

But that's all in the past! I guess!

Now everything is fine , and things go smoothly. For the most part. But even with async/await , there are still some cases where the compiler diagnostics are, just, so much .

There's been serious improvemenst already in terms of diagnostics - the errors aren't as rough as they used to be, but there's still ways to go. Despite that, it's not impossible to go around them and achieve the result you need.

So let's try to do some HTTP requests, get ourselves in trouble, and instead of just "seeing if a different crate would work", get to the bottom of it, and come out the other side slightly more knowledgeable.

Let's get started!

Shell session

$ cargo new trouble Created binary (application) `trouble` package

Idealized HTTP requests

For this article, we'll use the tokio runtime, with the reqwest HTTP client.

TOML markup

# in `Cargo.toml` [dependencies] tokio = { version = "0.2.21", features = ["full"] } reqwest = "0.10.6"

Cool bear's hot tip

You can opt in and out of Tokio features, but Amos is just being lazy here and using the full feature to get "the whole package".

Hey, no dissent in the ranks. We have places to go!

So, we'll make a request to the IANA-managed reserved domain example.org :

Rust code

// in `src/main.rs` use std::{cmp::min, error::Error}; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let text = reqwest::get("http://example.org").await?.text().await?; println!("response = {}", &text[..min(text.len(), 40)]); Ok(()) }

Mhh. Seems a little too simple to be a fully-working example, but:

Shell session

$ cargo run -q response = <!doctype html> <html> <head> <title

It works!

Ah, I wish I could end the article here. Look how pretty it is. Even though we're in in an async function, we can return a boxed error ( Box<dyn Error> ), we can use the ? sigil to bubble up errors, we can chain await s. First to send the request, and then to retrieve the body of the response.

It's beautiful.

Cool bear's hot tip

Alright champ, settle down.

But I'm not really satisfied with it - there's so many of you following at home, coding as you go along - I wouldn't want y'all to DDoS the IANA servers. That would be uncouth.

So let's spin up a test server of our own. We'll grab a crate at random closes eyes, spins lib.rs uhhhhhh that one .

Shell session

$ cargo add tiny_http Updating 'https://github.com/rust-lang/crates.io-index' index Adding tiny_http v0.7.0 to dependencies

We'll also do proper logging:

Shell session

$ cargo add log pretty_env_logger Updating 'https://github.com/rust-lang/crates.io-index' index Adding log v0.4.8 to dependencies Adding pretty_env_logger v0.4.0 to dependencies

Rust code

// in `src/main.rs` use std::error::Error; mod server; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { pretty_env_logger::formatted_timed_builder() .filter_level(log::LevelFilter::Info) .init(); server::start(); let text = reqwest::get("http://localhost:1729").await?.text().await?; log::info!("Request successful: {}", &text[..]); Ok(()) }

Cool bear's hot tip

1729 is Ramanujan's number - it is the smallest number expressible as the sum of two cubes in two different ways.

Rust code

// in `src/server.rs` use std::thread::spawn; use tiny_http::{Response, Server}; pub fn start() { spawn(|| run()); } fn run() { let server = Server::http("127.0.0.1:1729").unwrap(); for req in server.incoming_requests() { log::info!("Serving {:?}", req.url()); let res = Response::from_string("Hello from test server!"); req.respond(res).unwrap(); } }

Shell session

$ cargo run -q 2020-07-07T15:00:28.055Z INFO trouble::server > Serving "/" 2020-07-07T15:00:28.056Z INFO trouble > Request successful: Hello from test server!

Wonderful.

Real-world HTTP requests (in the Paleoproterozoic Era)

The approach we used above is pretty naive though. Real-world HTTP requests don't always succeed. Sometimes they fail, and sometimes, it's not even your fault!

It's a story as old as time.

The year is 1800 million years ago.

Cool bear's hot tip

Okay, gonna sit this one out, good luck readers.

You are a string of beads connected by a very fine thread , and you're trying to break into the service industry. You hear about this internet thing, investigate, and before you know it, you've signed a contract with a CDN company with a minimum 10TB monthly commitment.

Getting in and out of trouble with Rust futures

You. (Scientist depiction)

Retallack

Things appear to be running smoothly, until you notice a series of failed requests in the log. Turns out the CDN you picked has POP in Laurentia , but not in Baltica !

Your Baltic customers are seeing request errors left and right, it's a disater. But you can't afford to break off your six-month engagement with your CDN, so you decide to solve the solution client-side.

Meanwhile, at your CDN's offices, they've just deployed their state of the art DDoS protection:

Rust code

// in `src/server.rs` fn run() { let server = Server::http("127.0.0.1:1729").unwrap(); let mut map: HashMap<_, usize> = HashMap::new(); for req in server.incoming_requests() { let count = *map .entry(req.remote_addr().ip()) .and_modify(|c| *c += 1) .or_default(); log::info!("Serving {:?}", req.url()); let res = if count < 2 { Response::from_string("The mainframe is warming up...").with_status_code(503) } else { Response::from_string("Hello from test server!") }; req.respond(res).unwrap(); } }

Your client is unprepared for such measures, as it doesn't even return an error:

Shell session

$ cargo run -q 2020-07-07T15:27:46.388Z INFO trouble::server > Serving "/" 2020-07-07T15:27:46.389Z INFO trouble > Request successful: The mainframe is warming up...

Luckily, it's pretty easy to address that

Rust code

// in `src/main.rs` // in `async fn main()` let text = reqwest::get("http://localhost:1729") .await? .error_for_status()? .text() .await?;

Shell session

$ cargo run -q 2020-07-07T15:29:24.127Z INFO trouble::server > Serving "/" Error: reqwest::Error { kind: Status(503), url: "http://localhost:1729/" }

..but that doesn't really solve your problem, now does it?

You decide to simply retry a request if it fails:

Rust code

// in `src/main.rs` #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { // omitted: server & logging setup async fn do_req() -> Result<(), Box<dyn Error>> { let text = reqwest::get("http://localhost:1729") .await? .error_for_status()? .text() .await?; log::info!("Request successful: {}", &text[..]); Ok(()) } for _ in 0..5 { if let Err(e) = do_req().await { log::error!("{}", e); } else { break; } } Ok(()) }

sh

$ cargo run -q 2020-07-07T15:51:54.177Z INFO trouble::server > Serving "/" 2020-07-07T15:51:54.178Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/) 2020-07-07T15:51:54.183Z INFO trouble::server > Serving "/" 2020-07-07T15:51:54.184Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/) 2020-07-07T15:51:54.190Z INFO trouble::server > Serving "/" 2020-07-07T15:51:54.190Z INFO trouble > Request successful: Hello from test server!

Better! For you.

But not for your CDN.

If at first you don't succeed, back off

Shortly after you roll out your client update, your Paleophone rings.

"Hello? Yes, this is Horodyskia..."

The conversation is short and to the point.

"High request volume, you say? I'll be right on it."

Your CDN rep is not happy. Not happy at all. In a bout of anger, they scolded you: "it's not the Archean anymore - get with the program!"

So, you look up "http request retry best practices", and quickly realize you're supposed to wait before retrying, to give the backend some time to recover.

Rust code

// in `src/main.rs` use std::time::Duration; // in `async fn main()` for _ in 0..5 { if let Err(e) = do_req().await { log::error!("{}", e); tokio::time::delay_for(Duration::from_secs(1)).await; } else { break; } }

Things seem a little better:

Shell session

$ cargo run -q 2020-07-07T16:01:30.501Z INFO trouble::server > Serving "/" 2020-07-07T16:01:30.502Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/) 2020-07-07T16:01:31.509Z INFO trouble::server > Serving "/" 2020-07-07T16:01:31.510Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/) 2020-07-07T16:01:32.517Z INFO trouble::server > Serving "/" 2020-07-07T16:01:32.518Z INFO trouble > Request successful: Hello from test server!

But you're not about to stop there. The CDN rep's diatribe has left you wounded. Your code? Bad? That's impossible. But just in case, you started a full code review.

And you realize there's a lot of code duplication. For every request you do, you have to use a for loop to retry.

You decide to make a re-usable facility to retry HTTP requests.

Rust code

// in `src/client.rs` use reqwest::{Error, IntoUrl, Method, Request, RequestBuilder, Response}; use std::time::Duration; pub struct Client { inner: reqwest::Client, } impl Client { pub fn new() -> Result<Self, Error> { let inner = reqwest::Client::builder() .user_agent("horo bot/1.0") .build()?; Ok(Self { inner }) } pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder { self.inner.request(method, url) } pub async fn execute(&self, req: Request) -> Result<Response, Error> { let mut tries: usize = 5; loop { let res = self .inner .execute(req.try_clone().unwrap()) .await .and_then(|r| r.error_for_status()); match res { Err(e) if tries > 1 => { tries -= 1; log::error!("{}", e); tokio::time::delay_for(Duration::from_secs(1)).await; } res => return res, } } } }

You're pleased with how easy it is to use:

Rust code

// in `src/main.rs` mod client; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { pretty_env_logger::formatted_timed_builder() .filter_level(log::LevelFilter::Info) .init(); server::start(); let client = client::Client::new()?; let req = client .request(reqwest::Method::GET, "http://localhost:1729") .build()?; let text = client.execute(req).await?.text().await?; log::info!("Request successful: {}", &text[..]); Ok(()) }

Sure, it panics if the request has a body. But bodies haven't been invented yet, so all is well in Proto-Laurasia.

As you re-read "Retrying HTTP requests - 10 best practices", you notice number 7 says to wait longer and longer between requests - and to add a random amount of time ("jitter", they call it), to avoid having many instances of your client synchronize their retry cycles and hammer the backend mercilessly.

"Well", you think to yourself (you don't possess any mouth-like appendages), that seems like something another organism has done before.

And sure enough, crates.io (which has always existed) has your back.

Shell session

$ cargo add backoff backoff-futures Updating 'https://github.com/rust-lang/crates.io-index' index Adding backoff v0.1.6 to dependencies Adding backoff-futures v0.3.0 to dependencies

It's great timing, too, as your CDN has decided to increase the number of "warm-up rounds" to 5.

Rust code

// in `src/server.rs` // in `fn run()` // in `for req in server.incoming_requests()` let res = if count < 5 {

You find that the backoff crates are rather easy to use, and you're happy:

Rust code

// in `src/client.rs` impl Client { // omitted: other methods pub async fn execute(&self, req: Request) -> Result<Response, Error> { let exec = || async { self.inner .execute(req.try_clone().unwrap()) .await .and_then(|r| r.error_for_status()) .map_err(backoff::Error::Transient) }; let mut backoff = backoff::ExponentialBackoff::default(); use backoff_futures::BackoffExt; exec.with_backoff(&mut backoff).await.map_err(|e| match e { backoff::Error::Permanent(e) | backoff::Error::Transient(e) => e, }) } }

It even seems to work!

Shell session

$ cargo run -q 2020-07-07T16:58:18.982Z INFO trouble::server > Serving "/" 2020-07-07T16:58:19.356Z INFO trouble::server > Serving "/" 2020-07-07T16:58:19.767Z INFO trouble::server > Serving "/" 2020-07-07T16:58:21.425Z INFO trouble::server > Serving "/" 2020-07-07T16:58:22.338Z INFO trouble::server > Serving "/" 2020-07-07T16:58:25.834Z INFO trouble::server > Serving "/" 2020-07-07T16:58:25.835Z INFO trouble > Request successful: Hello from test server!

Cool bear's hot tip

15 minutes in and not a single Futures-related problem.

Nicely done.

A Futures-related problem

Time has passed - it's time... for the Mesoproterozoic era.

It comes with its share of code churn. What used to be a standalone executable must now become a shared library, so it can be loaded as a "native add-on" for an Electron app.

The #[tokio::main] attribute can no longer be used - since there is no longer a main function. You have to manage the Runtime manually. And all the library calls are synchronous, so you have to spawn tasks onto the runtime and deal with the outcome later.

Rust code

// in `src/main.rs` use std::error::Error; use tokio::runtime::Runtime; mod client; mod server; fn main() -> Result<(), Box<dyn Error>> { pretty_env_logger::formatted_timed_builder() .filter_level(log::LevelFilter::Info) .init(); server::start(); let mut runtime = Runtime::new().unwrap(); let task = || async { let client = client::Client::new()?; let req = client .request(reqwest::Method::GET, "http://localhost:1729") .build()?; let text = client.execute(req).await?.text().await?; log::info!("Request successful: {}", &text[..]); let res: Result<_, Box<dyn Error>> = Ok(()); res }; let join_handle = runtime.spawn((|| async move { match task().await { Ok(_) => {} Err(e) => { log::error!("Something went wrong: {}", e); } } })()); runtime.block_on(join_handle).unwrap(); Ok(()) }

Cool bear's hot tip

There's uhhh.. there's still a main function.

Yeah cool bear, I know. There's still a main function. But do you really want me to go into details of exposing our functionality as a native node.js addon now?

Cool bear's hot tip

Ah. Maybe later.

Exactly.

The important thing is that it compiles and runs just fine, and as you can see, it..

Shell session

$ cargo run -q error: future cannot be sent between threads safely --> src/main.rs:29:31 | 29 | let join_handle = runtime.spawn((|| async { | ^^^^^ future created by async block is not `Send` | = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = std::result::Result<reqwest::async_impl::response::Response, backoff::error::Error<reqwest::error::Error>>>` note: future is not `Send` as it awaits another future which is not `Send` --> src/client.rs:33:9 | 33 | exec.with_backoff(&mut backoff).await.map_err(|e| match e { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here on type `std::pin::Pin<std::boxed::Box<dyn std::future::Future<Output = std::result::Result<reqwest::async_impl::response::Response, backoff::error::Error<reqwest::error::Error>>>>>`, which is not `Send`

...doesn't?

But it just worked. And you haven't touched that part of the code.

What happened?

rustc says, " Send is not implemented for this Future ". And that's because, this Future awaits another future , which itself is not Send .

What's the std::marker::Send trait for again?

Types that can be transferred across thread boundaries.

This trait is automatically implemented when the compiler determines it's appropriate.

An example of a non- Send type is the reference-counting pointer rc::Rc . If two threads attempt to clone Rc s that point to the same reference-counted value, they might try to update the reference count at the same time, which is undefined behavior because Rc doesn't use atomic operations. Its cousin sync::Arc does use atomic operations (incurring some overhead) and thus is Send .

See the Nomicon for more details.

Ah, that makes sense. I think.

We're using Runtime::spawn to "poll this future in the background", it would make sense that, since we've enabled the threaded executor (via the full feature), it would be sent from the main thread to another thread.

Cool bear's hot tip

Even though you end up using Runtime::block_on to wait on it from the same thread?

Yeah! Even then, it might run in a different thread. We need to know that we can send it to any thread we want, and that's what Send is about.

But what about our Future isn't Send ?

Let's try to see what the concrete type of our Future is. A cool trick to find the type of something is to try to assign it to a variable of type () :

Rust code

// in `src/main.rs` let f: () = task();

Shell session

$ cargo check -q error[E0308]: mismatched types --> src/main.rs:29:17 | 15 | let task = || async { | _________________________- 16 | | let client = client::Client::new()?; 17 | | 18 | | let req = client ... | 26 | | res 27 | | }; | |_____- the found generator 28 | 29 | let f: () = task(); | -- ^^^^^^ expected `()`, found opaque type | | | expected due to this | ::: /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/future/mod.rs:48:43 | 48 | pub const fn from_generator<T>(gen: T) -> impl Future<Output = T::Return> | ------------------------------- the found opaque type | = note: expected unit type `()` found opaque type `impl std::future::Future`

Ah. An opaque type. Well that trick doesn't work here, but it's a cool one anyway. Keep it in mind for synchronous code.

Let's try another trick:

Rust code

fn take_f<F, O>(f: F) where F: std::future::Future<Output = O> + Send, { } take_f(task());

Shell session

$ cargo check -q error: future cannot be sent between threads safely --> src/main.rs:35:5 | 29 | fn take_f<F, O>(f: F) | ------ required by a bound in this 30 | where 31 | F: std::future::Future<Output = O> + Send, | ---- required by this bound in `main::take_f` ... 35 | take_f(task()); | ^^^^^^ future created by async block is not `Send` | = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = std::result::Result<reqwest::async_ impl::response::Response, backoff::error::Error<reqwest::error::Error>>>` note: future is not `Send` as it awaits another future which is not `Send` --> src/client.rs:33:9 | 33 | exec.with_backoff(&mut backoff).await.map_err(|e| match e { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here on type `std::pin::Pin<std::boxed::Box<dyn std::future::Future<Output = std::result::Result<reqwest::async_impl::response::Response, backoff::error::Error<reqwest::error::Error>>>>>`, which is not `Send`

Variations on a theme, huh. But at least we can now climb up the chain, and see where the lack of Send really comes from.

For example, we could go in client.rs and mess with execute :

Rust code

// in `src/client.rs` impl Client { pub async fn execute(&self, req: Request) -> Result<Response, Error> { let exec = || async { self.inner .execute(req.try_clone().unwrap()) .await .and_then(|r| r.error_for_status()) .map_err(backoff::Error::Transient) }; // new: fn take_f<F, O>(f: F) where F: std::future::Future<Output = O> + Send, { } take_f(exec()); let mut backoff = backoff::ExponentialBackoff::default(); use backoff_futures::BackoffExt; exec.with_backoff(&mut backoff).await.map_err(|e| match e { backoff::Error::Permanent(e) | backoff::Error::Transient(e) => e, }) } }

This compiles just fine. So the future returned by exec() is Send . Which means Result<_, _> is Send , which means reqwest::Response and reqwest::Error are both Send .

Let's move down the chain a bit. We'll improve our checking function so that it returns the Future as-is:

Rust code

// in `src/client.rs` impl Client { pub async fn execute(&self, req: Request) -> Result<Response, Error> { let exec = || async { self.inner .execute(req.try_clone().unwrap()) .await .and_then(|r| r.error_for_status()) .map_err(backoff::Error::Transient) }; let mut backoff = backoff::ExponentialBackoff::default(); use backoff_futures::BackoffExt; let f = async { exec.with_backoff(&mut backoff).await.map_err(|e| match e { backoff::Error::Permanent(e) | backoff::Error::Transient(e) => e, }) }; fn check<F, O>(f: F) -> F where F: std::future::Future<Output = O> + Send, { f } check(f).await } }

Shell session

$ cargo check -q error: future cannot be sent between threads safely --> src/client.rs:45:9 | 39 | fn check<F, O>(f: F) -> F | ----- required by a bound in this 40 | where 41 | F: std::future::Future<Output = O> + Send, | ---- required by this bound in `client::Client::execute::{{closure}}#0::check` ... 45 | check(f).await | ^^^^^ future created by async block is not `Send`

AhAH! So backoff-futures is the problem.

Shell session

$ mkdir vendor $ git clone https://github.com/jakubadamw/backoff-futures vendor/backoff-futures Cloning into 'vendor/backoff-futures'... (etc.) $ git checkout 0.3.0 Note: switching to '0.3.0'. You are in 'detached HEAD' state. You can look around, make experimental (etc.)

Take a look around, git say. Alright, let's use ripgrep for that:

Shell session

$ rg -A 1 'Send' src/lib.rs 91:#[async_trait::async_trait(?Send)] 92-pub trait BackoffExt<T, E, Fut, F> { -- 123:#[async_trait::async_trait(?Send)] 124-impl<T, E, Fut, F> BackoffExt<T, E, Fut, F> for F

Those are the only two mentions of Send in the entire codebase.

You're starting to doubt that codebase ever worked, so, you decide to run the tests:

Shell session

$ cargo t -q running 5 tests ..... test result: ok. 5 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out running 1 test test src/lib.rs - (line 13) ... ok test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

Nope, everything's fine.

Alright, let's look at that async_trait then:

Rust code

// in `vendor/backoff-futures/src/lib.rs` #[async_trait::async_trait(?Send)] pub trait BackoffExt<T, E, Fut, F> { /// Returns a future that, when polled, will first ask `self` for a new future (with an output /// type `Result<T, backoff::Error<_>>` to produce the expected result. /// /// If the underlying future is ready with an `Err` value, the nature of the error /// (permanent/transient) will determine whether polling the future will employ the provided /// `backoff` strategy and will result in the work being retried. /// /// Specifically, [`backoff::Error::Permanent`] errors will be returned immediately. /// [`backoff::Error::Transient`] errors will, depending on the particular [`backoff::backoff::Backoff`], /// result in a retry attempt, most likely with a delay. /// /// If the underlying future is ready with an [`std::result::Result::Ok`] value, it will be returned immediately. async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>> where B: Backoff, T: 'async_trait, E: 'async_trait, Fut: 'async_trait; /// Same as [`BackoffExt::with_backoff`] but takes an extra `notify` closure that will be called every time /// a new backoff is employed on transient errors. The closure takes the new delay duration as an argument. async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>> where B: Backoff, N: FnMut(&Error<E>, Duration), T: 'async_trait, E: 'async_trait, Fut: 'async_trait; }

Wait. An async fn in a trait? I thought that wasn't stable yet?

Well... there's a crate for that .

And in that crate's documentation in the Non-threadsafe futures section, it says:

Not all async traits need futures that are dyn Future + Send . To avoid having Send and Sync bounds placed on the async trait methods, invoke the async trait macro as #[async_trait(?Send)] on both the trait and the impl blocks.

AhAH! So async-trait transforms async fn s into regular fn s, which by default return Send futures, but if you specify ?Send , they're not Send .

You take a moment to collect your thoughts. Things escalated quickly, you weren't prepared for this. But heroes rise to the occasion - even if, in that scenario, "rising" means "maintaining your own fork".

So you decide to get your beads dirty, and fix it:

Rust code

// in `vendor/backoff-futures/src/lib.rs` #[async_trait::async_trait] pub trait BackoffExt<T, E, Fut, F> { // etc. } #[async_trait::async_trait] impl<T, E, Fut, F> BackoffExt<T, E, Fut, F> for F where F: FnMut() -> Fut, Fut: Future<Output = Result<T, backoff::Error<E>>> { // etc. }

Shell session

$ # in vendor/backoff-futures $ cargo check -q cargo check -q error[E0277]: `F` cannot be sent between threads safely --> src/lib.rs:135:5 | 129 | async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>> | ------------------- within this `impl std::future::Future` ... 135 | / { 136 | | let backoff_struct = BackoffFutureBuilder { backoff, f: self }; 137 | | backoff_struct.fut(|_, _| {}).await 138 | | } | |_____^ `F` cannot be sent between threads safely | = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `F` = note: required because it appears within the type `[static generator@src/lib.rs:135:5: 138:6 _self:F, backoff:&mut B for<'r , 's, 't0, 't1, 't2, 't3> {std::future::ResumeTy, F, &'r mut B, BackoffFutureBuilder<'s, B, F, Fut, T, E>, [closure@src/lib.rs:13 7:28: 137:37], impl std::future::Future, ()}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@src/lib.rs:135: 5: 138:6 _self:F, backoff:&mut B for<'r, 's, 't0, 't1, 't2, 't3> {std::future::ResumeTy, F, &'r mut B, BackoffFutureBuilder<'s, B , F, Fut, T, E>, [closure@src/lib.rs:137:28: 137:37], impl std::future::Future, ()}]>` = note: required because it appears within the type `impl std::future::Future` = note: required because it appears within the type `impl std::future::Future` = note: required for the cast to the object type `dyn std::future::Future<Output = std::result::Result<T, backoff::error::Err or<E>>> + std::marker::Send` help: consider further restricting this bound | 126 | F: FnMut() -> Fut + std::marker::Send, | ^^^^^^^^^^^^^^^^^^^ (omitted: 10 more errors)

Okay, well, this should be easy. You just need to add a bunch of Send bounds, right?

Rust code

// in `vendor/backoff-futures/src/lib.rs` #[async_trait::async_trait] pub trait BackoffExt<T, E, Fut, F> { async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>> where B: Backoff + Send, // new! T: 'async_trait, E: 'async_trait, Fut: 'async_trait; /// Same as [`BackoffExt::with_backoff`] but takes an extra `notify` closure that will be called every time /// a new backoff is employed on transient errors. The closure takes the new delay duration as an argument. async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>> where B: Backoff + Send, // new! N: FnMut(&Error<E>, Duration) + Send, // new! T: 'async_trait, E: 'async_trait, Fut: 'async_trait; } #[async_trait::async_trait] impl<T, E, Fut, F> BackoffExt<T, E, Fut, F> for F where F: (FnMut() -> Fut) + Send, // new T: Send, // new E: Send, // new Fut: Future<Output = Result<T, backoff::Error<E>>> + Send, // new { async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>> where B: Backoff + Send, // new T: 'async_trait, E: 'async_trait, Fut: 'async_trait, { let backoff_struct = BackoffFutureBuilder { backoff, f: self }; backoff_struct.fut(|_, _| {}).await } async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>> where B: Backoff + Send, // new N: FnMut(&Error<E>, Duration) + Send, // new T: 'async_trait, E: 'async_trait, Fut: 'async_trait, { let backoff_struct = BackoffFutureBuilder { backoff, f: self }; backoff_struct.fut(notify).await } }

Cool bear's hot tip

One, two, three... ten! Ten Send bounds.

Yeah, that should do it:

sh

$ # still in vendor/backoff-futures $ cargo t -q running 5 tests ..... test result: ok. 5 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out running 1 test test src/lib.rs - (line 13) ... ok test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

Great! Before publishing your fork on crates.io, or thinking about upstreaming it, you decide to check that those changes actually solve your problem, though.

TOML markup

# in `Cargo.toml` (the root one) [patch.crates-io] backoff-futures = { path = "vendor/backoff-futures" }

Shell session

$ cargo run -q 2020-07-07T17:57:36.378Z INFO trouble::server > Serving "/" 2020-07-07T17:57:37.080Z INFO trouble::server > Serving "/" 2020-07-07T17:57:38.123Z INFO trouble::server > Serving "/" 2020-07-07T17:57:39.386Z INFO trouble::server > Serving "/" 2020-07-07T17:57:40.695Z INFO trouble::server > Serving "/" 2020-07-07T17:57:43.523Z INFO trouble::server > Serving "/" 2020-07-07T17:57:43.524Z INFO trouble > Request successful: Hello from test server!

Hooray! Wall of text: vanquished. Service: production-ready. Almost.

Right after you figure out which errors should be transient and which should be permanent .

Nevertheless, your codebase is in much better shape now. It's sure to last well into the Mesozoic era.

Closing words

I hope you enjoyed this little adventure in finding and figuring out a Rust futures issue. It's not always that colorful - but sometimes, it is!

And when it is, there's always something that can be done. Walls of rustc errors are not necessarily dead-ends. And everyone involved is already aware of the current usability issues, and working towards a smoother experience in general.

It might take some time . But it's in the works.

Cool bear's hot tip

GAT s or nothing!

We'll see, cool bear. We'll see.


以上所述就是小编给大家介绍的《Getting in and out of trouble with Rust futures》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Kafka权威指南

Kafka权威指南

Neha Narkhede、Gwen Shapira、Todd Palino / 薛命灯 / 人民邮电出版社 / 2017-12-26 / 69.00元

每个应用程序都会产生数据,包括日志消息、度量指标、用户活动记录、响应消息等。如何移动数据,几乎变得与数据本身一样重要。如果你是架构师、开发者或者产品工程师,同时也是Apache Kafka新手,那么这本实践指南将会帮助你成为流式平台上处理实时数据的专家。 本书由出身于LinkedIn的Kafka核心作者和一线技术人员共同执笔,详细介绍了如何部署Kafka集群、开发可靠的基于事件驱动的微服务,......一起来看看 《Kafka权威指南》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

MD5 加密
MD5 加密

MD5 加密工具

SHA 加密
SHA 加密

SHA 加密工具