A practical guide to async in Rust

栏目: IT技术 · 发布时间: 4年前

内容简介:Programming languages have different methods of representing asynchronous operations. The way Rust handles concurrency should be familiar if you’ve ever usedasync/await in JavaScript. The keywords are the same and the fundamental model is similar, except t

Programming languages have different methods of representing asynchronous operations. The way Rust handles concurrency should be familiar if you’ve ever usedasync/await in JavaScript. The keywords are the same and the fundamental model is similar, except that in Rust, deferred computations are called futures instead of promises. What is not so familiar is that you need to pick a runtime to actually run your asynchronous code.

Rust targets everything from bare-metal, embedded devices to programs running on advanced operating systems and, like C++, focuses on zero-cost abstractions. This impacts what is and isn’t included in the standard library.

Once you have the know-how, you’ll find it’s really not difficult to get started with async in Rust. If you’re writing an asynchronous program in Rust for the first time or just need to use an asynchronous library and don’t know where to start, this guide is for you. I’ll try to get you going as quickly as possible while introducing you to all the essentials you should know.

Essentials

‌An async application should pull in at least two crates from Rusts ecosystem:

  1. futures , an official Rust crate that lives in the rust-lang repository
  2. A runtime of your choosing, such as Tokio , async_std , smol , etc.

Some people don’t want to pull in more dependencies than they need to, but these are as essential as the chrono or log crates. The only difference is that these focus on async instead.

We’ll use tokio for the purpose of this tutorial. You should get to know at least one runtime and focus on that first. You can check out other runtimes later. You’ll likely notice that they have a lot in common in terms of functionality, even if the API or implementations may differ.

To provide the best experience when working with async in Rust, you should enable some features . The dependencies section of your Cargo.toml should look like this:

[dependencies]
futures = { version = "0.3.*" }
tokio = {version = "0.2.*", features = ["full"] }

Your main.rs should look as follows.

use futures::prelude::*;
use tokio::prelude::*;

fn main() {
    todo!();
}

Runtimes

‌Contrary to what you might be used to with other languages, Rust doesn’t have a built-in runtime. We won’t discuss the pros and cons of that here, but you’ll need to make a choice and pull that in as a dependency.

Some libraries require you to use a specific runtime because they rely on runtime internals to provide a solid API for you to use or wrap their own API around an existing runtime. One example is the actix_web web framework, which wraps its own API around tokio .

Most of the time, you can choose any runtime you want. But no matter which runtime you choose, there are three basic operations you should figure out how to do before you start coding:‌

We made a custom demo for.

Click here to check it out

.

Future

‌You can complete most tasks if you know these basic operations. Let’s walk through all three using Tokio as an example.

1. Starting the runtime

You can explicitly instantiate the runtime and spawn a future onto it. The future you spawn will be the main entry point for your program, so think of it like an asynchronous main function.

async fn app() {
    todo!()
}

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    let future = app();
    rt.block_on(future);
}

You could also use the shorter version. which basically does the same thing.

#[tokio::main]
async fn main() {

}

‌2. Spawning a Future on the runtime

This comes in handy when you want to run futures concurrently (i.e., tasks that are progressing simultaneously).

use tokio::task;

async fn our_async_program() {
    todo!();
}

async fn app() {
    let concurrent_future = task::spawn(our_async_program());
    todo!()
}

‌3. Spawning blocking or CPU-intensive tasks

This is a common problem when writing async code in general. If you want to take advantage of having a runtime that runs your code concurrently, you should avoid blocking or running CPU-intensive code in Futures themselves. Most of the code you write in async Rust will actually be executed in a Future , whic is important to be aware of.

Most runtimes provide a way to offload this work to a different thread, which helps you avoid blocking the thread that is actually driving your futures to completion. In tokio , you can do this via task::spawn_blocking .

Using our example, we can do the following.

use tokio::runtime::task;

fn fib_cpu_intensive(n: u32) -> u32 {
    match n {
        0 => 0,
        1 => 1,
        n => fib_cpu_intensive(n - 1) + fib_cpu_intensive(n - 2),
    }
}

async fn app() {
    let threadpool_future = task::spawn_blocking(||fib_cpu_intensive(30));
    todo!()
}

Each runtime has a slightly different API to accomplish these tasks, but they all support them. If you know what to look for, you’ll have an easier time getting started.

An async project starter template

At this point where, we can work with async in Rust with almost the same ease with which we write normal synchronous code, but let’s venture a little further and cover some things that might come in handy later on.

I’ll provide a template you can use to start applications where you know you’ll need to write async code. I like to instantiate the runtime explicitly, which is what we’ll do in the template below.

use futures::prelude::*;
use tokio::prelude::*;
use tokio::task;
use log::*;

// Just a generic Result type to ease error handling for us. Errors in multithreaded
// async contexts needs some extra restrictions
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

async fn app() -> Result<()> {
    // I treat this as the `main` function of the async part of our program. 
    todo!()
}

fn main() {
    env_logger::init();
    let mut rt = tokio::runtime::Runtime::new().unwrap();

    match rt.block_on(app()) {
        Ok(_) => info!("Done"),
        Err(e) => error!("An error ocurred: {}", e),
    };
}

‌I’ve pulled in a few more crates, including:

tokio
log
env_logger

‌Our Cargo.toml now looks like this:

[dependencies]
futures = { version = "0.3.*"}
tokio = {version = "0.2.*", features = ["full"] }
log = "0.4.*"
env_logger = "0.7.*"

Remember that env_logger relies on the RUST_LOG environment variable to determine the log level.

Most of my async application projects start with a main.rs and a Cargo.toml like the ones presented above. I can add better error handling or logging if necessary as the project evolves. A popular crate for handling errors in applications is Anyhow . async-log is useful for improving logging in async context.

Throughout this tutorial, we’ll use this basic template for all our code. However, you might notice that I’ve adapted the log output slightly to better suit what I wanted to show. If you want to follow along, you should change your logging initialization in main.rs to the following.

let start = std::time::Instant::now();
env_logger::Builder::from_default_env().format(move |buf, rec| {
    let t = start.elapsed().as_secs_f32();
    writeln!(buf, "{:.03} [{}] - {}", t, rec.level(),rec.args())
}).init();

Async functions in Rust

‌Async functions in Rust differ somewhat from what you’re used to. When you learned Rust, you probably noticed how it’s very precise about what types the argument of a function has and what type the function returns.

Async functions differ in one important way: all your return types are “wrapped” into a Future .

You might read the documentation about Futures in Rust and think your async function needs to look like this:

async fn our_async_program() -> impl Future<Output = Result<String>> {
    let res = future::ok("Hello world".to_string()).await?;
    Ok(res)
}

‌This is wrong! If you’re doing this, you’re overthinking it. An async function already wraps the return type, so you can write functions the way you’re used to.

This is what you actually want:

async fn our_async_program() -> Result<String> {
    let res = future::ok("Hello world".to_string()).await?;
    Ok(res)
}

future::ok is one of the convenience methods we get from the futures crate. It wraps a value in a future that returns ready immediately.

This might seem a bit strange since Rust is usually extremely rigorous when it comes to declaring the correct types, but it’s actually a huge ergonomic boost because it automatically wraps the return types from our async functions.

You’ll often see examples using async blocks, such as async { ... } . These are similar to async functions in that they return a special kind of future that wraps whatever we return from the closure. One drawback with these closures is that you’ll have to jump through some hoops to return errors from them via ? . The return types can be difficult to reason about, which can cause some unneeded confusion when you’re starting out writing async Rust.

My suggestion is to use async functions if you can, especially if you intend to return anything from the future — at least until you’re comfortable with the different return types and how async in Rust works.

Making a web request

Futures in Rust are lazy. By default, they won’t do anything before they’re polled the first time. The future gets polled when you await it.

For example, if you call a function that returns a future at the start of your program but don’t await it before the end of the program, the actual request will not be made before you reach the point where you await it (in the end).

Let’s put what we’ve learned so far into practice.

Reqwest is a popular client library for creating web requests. We’ll use that together with Slowwly endpoint , which enables us to define a delay for the server response and gives us little more determinism in our concurrency, making it easier to reason about it.

Let’s add reqwest to our Cargo.toml by adding reqwest = "0.10.*" to the [dependencies] section.

Create a few requests and see what happens.

fn slowwly(delay_ms: u32) -> reqwest::Url {
    let url = format!(
    "http://slowwly.robertomurray.co.uk/delay/{}/url/http://www.google.co.uk", 
    delay_ms,
    );
    reqwest::Url::parse(&url).unwrap()
}

async fn app() -> Result<()> {
    info!("Starting program!");
    let _resp1 = reqwest::get(slowwly(1000)).await?;
    info!("Got response 1");
    let _resp2 = reqwest::get(slowwly(1000)).await?;
    info!("Got response 2");
    Ok(())
}

‌Running this gives us the following output.

1.264 [INFO] - Got response 1
2.467 [INFO] - Got response 2
2.468 [INFO] - Done

‌The time is in seconds/milliseconds. At 1.246, we got the first response from our endpoint (remember, we asked for a delay of one second on the first request). Roughly one second later, at 2.467, we got the second response. The whole program took 2.468 seconds to run.

So, our program is working, but this is not really concurrent, is it? Honestly, it’s not much better than a complicated synchronous program.

Let’s actually take advantage of our async runtime and run the requests concurrently.

async fn request(n: usize) -> Result<()> {
    reqwest::get(slowwly(1000)).await?;
    info!("Got response {}", n);
    Ok(())
}

async fn app() -> Result<()> {
    let resp1 = task::spawn(request(1));
    let resp2 = task::spawn(request(2));

    let _ = resp1.await??;
    let _ = resp2.await??;

    Ok(())
}

At this point, we should refactor our request out to a separate function for two reasons:

  1. We want our logging after we get the results from our request, so we need to wrap both the request and the logging in a task that we spawn on to the runtime and await
  2. The alternative would be to use an async { } block. But although it’s possible to specify a return type for our Result<()> , it’s pretty awkward , so we’ll avoid that for the purpose of this tutorial

‌If we run our code, we should get this:

1.247 [INFO] - Got response 2
1.256 [INFO] - Got response 1
1.257 [INFO] - Done

That looks better. Our second request finishes at 1.247 and our first at 1.256. The whole program takes 1.257 seconds, which is less than half the time it took in the first example.

Using spawn enables us to run our requests concurrently. Since Tokio defaults to a multithreaded runtime, tasks spawned this way can also run in parallel on different cores.

CPU-intensive tasks

Now that we got our program to run concurrently, we can combine some CPU-intensive tasks with some I/O-bound tasks and create a more complex scenario.

Let’s expand our example slightly by making 10 requests and doing some analysis of each response as we get them. We’re super excited to see the ratio of ones versus zeroes in the bytes we get from the response, so we’ll return a count for ones and zeros and report the ratio in the end.

use futures::future::join_all;

// Now we want to both fetch some data and do some CPU intensive analysis on it
async fn get_and_analyze(n: usize) -> Result<(u64, u64)> {
    let response: reqwest::Response = reqwest::get(slowwly(1000)).await?;
    info!("Dataset {}", n);

    // we get the body of the request
    let txt = response.text().await?;

    // We send our analysis work to a thread where there is no runtime running
    // so we don't block the runtime by analyzing the data
    let res= task::spawn_blocking(move ||analyze(txt)).await?;
    info!("Processed {}", n);
    Ok(res)
}

// Counting the number of ones and zeros in the bytes we get.
fn analyze(txt: &str) -> (u64, u64) {
    let txt = txt.as_bytes();
    // Let's spend as much time as we can and count them in two passes
    let ones = txt.iter().fold(0u64, |acc, b: &u8| acc + b.count_ones() as u64);
    let zeros = txt.iter().fold(0u64, |acc, b: &u8| acc + b.count_zeros() as u64);
    (ones, zeros)
}

async fn app() -> Result<()> {

    // This is new. We can collect futures in a collection. Nice to know!
    let mut futures = vec![];

    for i in 1..=10 {
        let fut = task::spawn(get_and_analyze(i));
        futures.push(fut);
    }

    let results = join_all(futures).await;

    let mut total_ones = 0;
    let mut total_zeros = 0;

    // Returning errors using `?` in iterators can be a bit difficult. Using a
    // simple for loop to inspect and work with our results can often be more
    // ergonomic
    for result in results {
        // `spawn_blocking` returns a `JoinResult` we need to unwrap first
        let ones_res: Result<(u64, u64)> = result?;
        let (ones, zeros) = ones_res?;
        total_ones += ones;
        total_zeros += zeros;
    }

    info!("Ratio of ones/zeros: {:.02}",total_ones as f64 / total_zeros as f64);
    Ok(())
}

A few things to note:

  • The Futures crate has many convenient tools, including join_all , which treats a collection of futures as a single future and drives them all to completion, and the FuturesUnordered API from the same crate.
  • We can collect futures in a normal Vec
  • There can be a lot of error unwrapping when writing async code. This is normal, but can be a bit confusing sometimes. A tool such as rust-analyzer can help you keep track of what errors are returned. An error handling crate such as Anyhow can also help here‌

Let’s take a look at what our program outputs.

1.270 [INFO] - Dataset 7
1.275 [INFO] - Dataset 3
1.285 [INFO] - Dataset 2
1.285 [INFO] - Dataset 4
1.291 [INFO] - Dataset 9
1.297 [INFO] - Dataset 1
1.301 [INFO] - Dataset 5
1.308 [INFO] - Dataset 6
1.312 [INFO] - Dataset 8
1.322 [INFO] - Dataset 10
1.374 [INFO] - Processed 7
1.377 [INFO] - Processed 3
1.378 [INFO] - Processed 4
1.378 [INFO] - Processed 2
1.380 [INFO] - Processed 9
1.384 [INFO] - Processed 1
1.385 [INFO] - Processed 5
1.391 [INFO] - Processed 8
1.391 [INFO] - Processed 6
1.397 [INFO] - Processed 10
1.397 [INFO] - Ratio of ones/zeros: 0.95
1.397 [INFO] - Done

‌Since we send off all our datasets immediately and each one takes a second to return, all our responses come back in as datasets almost simultaneously. Each response is then sent for analysis on a thread pool.

We can see that datasets are not necessarily finished processing in the order in which they come in since they are processed on separate CPU cores.

To spawn or not to spawn?

Since we should spawn both futures and CPU-bound and blocking tasks, what can we write in our async functions?

Normally, you can write most of your code without worrying too much about that, but blocking and CPU-intensive tasks should make you stop and consider whether you should refactor that part of your code so it can be spawned on the thread pool designed for handling these.

If you encounter a situation where you might need one of the following modules, you should check if your run-time has an async alternative for the task you want to perform.

std::sync
std::thread
std::fs
std::net

If your runtime doesn’t have an equivalent, you can use spawn_blocking and do the operation in a thread pool like you would do with CPU-intensive tasks and await the result.

In general, functions that call in to your OS and might result in the OS parking the thread you’re calling from are especially harmful to concurrency since you’ll park the executor as well.

Using thread::sleep is a prime example of a function you should avoid in an async context for this exact reason. It’s tempting to use sleep to delay an operation, but it’s not a good idea because the OS will park the calling thread for the whole duration. Most runtimes have a way to cause a delay or sleep a task that does not block the executor, thereby disabling all other tasks running on that thread as well.

For CPU-bound tasks, the lines are much blurrier. In general, I would encourage you to not be paranoid about this. Just remember it’s easier to refactor back if you decide to reduce the number of calls to spawn_blocking than the other way around.

When in doubt, spawn.

By now you should be prepared to write async Rust, and I hope you’ll find it easier to get started on your next async project.

LogRocket: Full visibility into your web apps

A practical guide to async in Rust

LogRocket is a frontend application monitoring solution that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.

In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page apps.

Try it for free

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Scalable Internet Architectures

Scalable Internet Architectures

Theo Schlossnagle / Sams Publishing / 2006-7-31 / USD 49.99

As a developer, you are aware of the increasing concern amongst developers and site architects that websites be able to handle the vast number of visitors that flood the Internet on a daily basis. Sc......一起来看看 《Scalable Internet Architectures》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

在线进制转换器
在线进制转换器

各进制数互转换器