内容简介:Programming languages have different methods of representing asynchronous operations. The way Rust handles concurrency should be familiar if you’ve ever usedasync/await in JavaScript. The keywords are the same and the fundamental model is similar, except t
Programming languages have different methods of representing asynchronous operations. The way Rust handles concurrency should be familiar if you’ve ever usedasync/await in JavaScript. The keywords are the same and the fundamental model is similar, except that in Rust, deferred computations are called futures instead of promises. What is not so familiar is that you need to pick a runtime to actually run your asynchronous code.
Rust targets everything from bare-metal, embedded devices to programs running on advanced operating systems and, like C++, focuses on zero-cost abstractions. This impacts what is and isn’t included in the standard library.
Once you have the know-how, you’ll find it’s really not difficult to get started with async in Rust. If you’re writing an asynchronous program in Rust for the first time or just need to use an asynchronous library and don’t know where to start, this guide is for you. I’ll try to get you going as quickly as possible while introducing you to all the essentials you should know.
Essentials
An async application should pull in at least two crates from Rusts ecosystem:
-
futures
, an official Rust crate that lives in therust-lang
repository - A runtime of your choosing, such as Tokio , async_std , smol , etc.
Some people don’t want to pull in more dependencies than they need to, but these are as essential as the chrono
or log
crates. The only difference is that these focus on async instead.
We’ll use tokio
for the purpose of this tutorial. You should get to know at least one runtime and focus on that first. You can check out other runtimes later. You’ll likely notice that they have a lot in common in terms of functionality, even if the API or implementations may differ.
To provide the best experience when working with async in Rust, you should enable some features
. The dependencies
section of your Cargo.toml
should look like this:
[dependencies] futures = { version = "0.3.*" } tokio = {version = "0.2.*", features = ["full"] }
Your main.rs
should look as follows.
use futures::prelude::*; use tokio::prelude::*; fn main() { todo!(); }
Runtimes
Contrary to what you might be used to with other languages, Rust doesn’t have a built-in runtime. We won’t discuss the pros and cons of that here, but you’ll need to make a choice and pull that in as a dependency.
Some libraries require you to use a specific runtime because they rely on runtime internals to provide a solid API for you to use or wrap their own API around an existing runtime. One example is the actix_web
web framework, which wraps its own API around tokio
.
Most of the time, you can choose any runtime you want. But no matter which runtime you choose, there are three basic operations you should figure out how to do before you start coding:
We made a custom demo for.
Click here to check it out
.
Future
You can complete most tasks if you know these basic operations. Let’s walk through all three using Tokio
as an example.
1. Starting the runtime
You can explicitly instantiate the runtime and spawn a future onto it. The future you spawn will be the main entry point for your program, so think of it like an asynchronous main
function.
async fn app() { todo!() } fn main() { let mut rt = tokio::runtime::Runtime::new().unwrap(); let future = app(); rt.block_on(future); }
You could also use the shorter version. which basically does the same thing.
#[tokio::main] async fn main() { }
2. Spawning a Future
on the runtime
This comes in handy when you want to run futures concurrently (i.e., tasks that are progressing simultaneously).
use tokio::task; async fn our_async_program() { todo!(); } async fn app() { let concurrent_future = task::spawn(our_async_program()); todo!() }
3. Spawning blocking or CPU-intensive tasks
This is a common problem when writing async code in general. If you want to take advantage of having a runtime that runs your code concurrently, you should avoid blocking or running CPU-intensive code in Futures
themselves. Most of the code you write in async Rust will actually be executed in a Future
, whic is important to be aware of.
Most runtimes provide a way to offload this work to a different thread, which helps you avoid blocking the thread that is actually driving your futures to completion. In tokio
, you can do this via task::spawn_blocking
.
Using our example, we can do the following.
use tokio::runtime::task; fn fib_cpu_intensive(n: u32) -> u32 { match n { 0 => 0, 1 => 1, n => fib_cpu_intensive(n - 1) + fib_cpu_intensive(n - 2), } } async fn app() { let threadpool_future = task::spawn_blocking(||fib_cpu_intensive(30)); todo!() }
Each runtime has a slightly different API to accomplish these tasks, but they all support them. If you know what to look for, you’ll have an easier time getting started.
An async project starter template
At this point where, we can work with async in Rust with almost the same ease with which we write normal synchronous code, but let’s venture a little further and cover some things that might come in handy later on.
I’ll provide a template you can use to start applications where you know you’ll need to write async code. I like to instantiate the runtime explicitly, which is what we’ll do in the template below.
use futures::prelude::*; use tokio::prelude::*; use tokio::task; use log::*; // Just a generic Result type to ease error handling for us. Errors in multithreaded // async contexts needs some extra restrictions type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; async fn app() -> Result<()> { // I treat this as the `main` function of the async part of our program. todo!() } fn main() { env_logger::init(); let mut rt = tokio::runtime::Runtime::new().unwrap(); match rt.block_on(app()) { Ok(_) => info!("Done"), Err(e) => error!("An error ocurred: {}", e), }; }
I’ve pulled in a few more crates, including:
tokio log env_logger
Our Cargo.toml
now looks like this:
[dependencies] futures = { version = "0.3.*"} tokio = {version = "0.2.*", features = ["full"] } log = "0.4.*" env_logger = "0.7.*"
Remember that env_logger
relies on the RUST_LOG
environment variable to determine the log level.
Most of my async application projects start with a main.rs
and a Cargo.toml
like the ones presented above. I can add better error handling or logging if necessary as the project evolves. A popular crate for handling errors in applications is Anyhow . async-log is useful for improving logging in async context.
Throughout this tutorial, we’ll use this basic template for all our code. However, you might notice that I’ve adapted the log output slightly to better suit what I wanted to show. If you want to follow along, you should change your logging initialization in main.rs
to the following.
let start = std::time::Instant::now(); env_logger::Builder::from_default_env().format(move |buf, rec| { let t = start.elapsed().as_secs_f32(); writeln!(buf, "{:.03} [{}] - {}", t, rec.level(),rec.args()) }).init();
Async functions in Rust
Async functions in Rust differ somewhat from what you’re used to. When you learned Rust, you probably noticed how it’s very precise about what types the argument of a function has and what type the function returns.
Async functions differ in one important way: all your return types are “wrapped” into a Future
.
You might read the documentation about Futures
in Rust and think your async function needs to look like this:
async fn our_async_program() -> impl Future<Output = Result<String>> { let res = future::ok("Hello world".to_string()).await?; Ok(res) }
This is wrong! If you’re doing this, you’re overthinking it. An async
function already wraps the return type, so you can write functions the way you’re used to.
This is what you actually want:
async fn our_async_program() -> Result<String> { let res = future::ok("Hello world".to_string()).await?; Ok(res) }
future::ok
is one of the convenience methods we get from the futures
crate. It wraps a value in a future that returns ready
immediately.
This might seem a bit strange since Rust is usually extremely rigorous when it comes to declaring the correct types, but it’s actually a huge ergonomic boost because it automatically wraps the return types from our async
functions.
You’ll often see examples using async blocks, such as async { ... }
. These are similar to async functions in that they return a special kind of future that wraps whatever we return from the closure. One drawback with these closures is that you’ll have to jump through some hoops to return errors from them via ?
. The return types can be difficult to reason about, which can cause some unneeded confusion when you’re starting out writing async Rust.
My suggestion is to use async functions if you can, especially if you intend to return anything from the future — at least until you’re comfortable with the different return types and how async in Rust works.
Making a web request
Futures in Rust are lazy. By default, they won’t do anything before they’re polled the first time. The future gets polled when you await
it.
For example, if you call a function that returns a future at the start of your program but don’t await
it before the end of the program, the actual request will not be made before you reach the point where you await it (in the end).
Let’s put what we’ve learned so far into practice.
Reqwest is a popular client library for creating web requests. We’ll use that together with Slowwly endpoint , which enables us to define a delay for the server response and gives us little more determinism in our concurrency, making it easier to reason about it.
Let’s add reqwest
to our Cargo.toml
by adding reqwest = "0.10.*"
to the [dependencies]
section.
Create a few requests and see what happens.
fn slowwly(delay_ms: u32) -> reqwest::Url { let url = format!( "http://slowwly.robertomurray.co.uk/delay/{}/url/http://www.google.co.uk", delay_ms, ); reqwest::Url::parse(&url).unwrap() } async fn app() -> Result<()> { info!("Starting program!"); let _resp1 = reqwest::get(slowwly(1000)).await?; info!("Got response 1"); let _resp2 = reqwest::get(slowwly(1000)).await?; info!("Got response 2"); Ok(()) }
Running this gives us the following output.
1.264 [INFO] - Got response 1 2.467 [INFO] - Got response 2 2.468 [INFO] - Done
The time is in seconds/milliseconds. At 1.246, we got the first response from our endpoint (remember, we asked for a delay of one second on the first request). Roughly one second later, at 2.467, we got the second response. The whole program took 2.468 seconds to run.
So, our program is working, but this is not really concurrent, is it? Honestly, it’s not much better than a complicated synchronous program.
Let’s actually take advantage of our async runtime and run the requests concurrently.
async fn request(n: usize) -> Result<()> { reqwest::get(slowwly(1000)).await?; info!("Got response {}", n); Ok(()) } async fn app() -> Result<()> { let resp1 = task::spawn(request(1)); let resp2 = task::spawn(request(2)); let _ = resp1.await??; let _ = resp2.await??; Ok(()) }
At this point, we should refactor our request out to a separate function for two reasons:
- We want our logging after we get the results from our request, so we need to wrap both the request and the logging in a task that we spawn on to the runtime and await
- The alternative would be to use an
async { }
block. But although it’s possible to specify a return type for ourResult<()>
, it’s pretty awkward , so we’ll avoid that for the purpose of this tutorial
If we run our code, we should get this:
1.247 [INFO] - Got response 2 1.256 [INFO] - Got response 1 1.257 [INFO] - Done
That looks better. Our second request finishes at 1.247 and our first at 1.256. The whole program takes 1.257 seconds, which is less than half the time it took in the first example.
Using spawn
enables us to run our requests concurrently. Since Tokio defaults to a multithreaded runtime, tasks spawned this way can also run in parallel on different cores.
CPU-intensive tasks
Now that we got our program to run concurrently, we can combine some CPU-intensive tasks with some I/O-bound tasks and create a more complex scenario.
Let’s expand our example slightly by making 10 requests and doing some analysis of each response as we get them. We’re super excited to see the ratio of ones versus zeroes in the bytes we get from the response, so we’ll return a count for ones and zeros and report the ratio in the end.
use futures::future::join_all; // Now we want to both fetch some data and do some CPU intensive analysis on it async fn get_and_analyze(n: usize) -> Result<(u64, u64)> { let response: reqwest::Response = reqwest::get(slowwly(1000)).await?; info!("Dataset {}", n); // we get the body of the request let txt = response.text().await?; // We send our analysis work to a thread where there is no runtime running // so we don't block the runtime by analyzing the data let res= task::spawn_blocking(move ||analyze(txt)).await?; info!("Processed {}", n); Ok(res) } // Counting the number of ones and zeros in the bytes we get. fn analyze(txt: &str) -> (u64, u64) { let txt = txt.as_bytes(); // Let's spend as much time as we can and count them in two passes let ones = txt.iter().fold(0u64, |acc, b: &u8| acc + b.count_ones() as u64); let zeros = txt.iter().fold(0u64, |acc, b: &u8| acc + b.count_zeros() as u64); (ones, zeros) } async fn app() -> Result<()> { // This is new. We can collect futures in a collection. Nice to know! let mut futures = vec![]; for i in 1..=10 { let fut = task::spawn(get_and_analyze(i)); futures.push(fut); } let results = join_all(futures).await; let mut total_ones = 0; let mut total_zeros = 0; // Returning errors using `?` in iterators can be a bit difficult. Using a // simple for loop to inspect and work with our results can often be more // ergonomic for result in results { // `spawn_blocking` returns a `JoinResult` we need to unwrap first let ones_res: Result<(u64, u64)> = result?; let (ones, zeros) = ones_res?; total_ones += ones; total_zeros += zeros; } info!("Ratio of ones/zeros: {:.02}",total_ones as f64 / total_zeros as f64); Ok(()) }
A few things to note:
- The
Futures
crate has many convenient tools, includingjoin_all
, which treats a collection of futures as a single future and drives them all to completion, and theFuturesUnordered
API from the same crate. - We can collect futures in a normal
Vec
- There can be a lot of error unwrapping when writing async code. This is normal, but can be a bit confusing sometimes. A tool such as
rust-analyzer
can help you keep track of what errors are returned. An error handling crate such as Anyhow can also help here
Let’s take a look at what our program outputs.
1.270 [INFO] - Dataset 7 1.275 [INFO] - Dataset 3 1.285 [INFO] - Dataset 2 1.285 [INFO] - Dataset 4 1.291 [INFO] - Dataset 9 1.297 [INFO] - Dataset 1 1.301 [INFO] - Dataset 5 1.308 [INFO] - Dataset 6 1.312 [INFO] - Dataset 8 1.322 [INFO] - Dataset 10 1.374 [INFO] - Processed 7 1.377 [INFO] - Processed 3 1.378 [INFO] - Processed 4 1.378 [INFO] - Processed 2 1.380 [INFO] - Processed 9 1.384 [INFO] - Processed 1 1.385 [INFO] - Processed 5 1.391 [INFO] - Processed 8 1.391 [INFO] - Processed 6 1.397 [INFO] - Processed 10 1.397 [INFO] - Ratio of ones/zeros: 0.95 1.397 [INFO] - Done
Since we send off all our datasets immediately and each one takes a second to return, all our responses come back in as datasets almost simultaneously. Each response is then sent for analysis on a thread pool.
We can see that datasets are not necessarily finished processing in the order in which they come in since they are processed on separate CPU cores.
To spawn or not to spawn?
Since we should spawn both futures and CPU-bound and blocking tasks, what can we write in our async functions?
Normally, you can write most of your code without worrying too much about that, but blocking and CPU-intensive tasks should make you stop and consider whether you should refactor that part of your code so it can be spawned on the thread pool designed for handling these.
If you encounter a situation where you might need one of the following modules, you should check if your run-time has an async alternative for the task you want to perform.
std::sync std::thread std::fs std::net
If your runtime doesn’t have an equivalent, you can use spawn_blocking
and do the operation in a thread pool like you would do with CPU-intensive tasks and await
the result.
In general, functions that call in to your OS and might result in the OS parking the thread you’re calling from are especially harmful to concurrency since you’ll park the executor as well.
Using thread::sleep
is a prime example of a function you should avoid in an async context for this exact reason. It’s tempting to use sleep
to delay an operation, but it’s not a good idea because the OS will park the calling thread for the whole duration. Most runtimes have a way to cause a delay or sleep a task that does not block the executor, thereby disabling all other tasks running on that thread as well.
For CPU-bound tasks, the lines are much blurrier. In general, I would encourage you to not be paranoid about this. Just remember it’s easier to refactor back if you decide to reduce the number of calls to spawn_blocking
than the other way around.
When in doubt, spawn.
By now you should be prepared to write async Rust, and I hope you’ll find it easier to get started on your next async project.
LogRocket: Full visibility into your web apps
LogRocket is a frontend application monitoring solution that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.
In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page apps.
Try it for free以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Scalable Internet Architectures
Theo Schlossnagle / Sams Publishing / 2006-7-31 / USD 49.99
As a developer, you are aware of the increasing concern amongst developers and site architects that websites be able to handle the vast number of visitors that flood the Internet on a daily basis. Sc......一起来看看 《Scalable Internet Architectures》 这本书的介绍吧!