Understanding future and async/await in Rust

Future and async/await are the mechanism in Rust to simplify writing concurrent programs. But it seems to be hard to understand. In this article, I will introduce my intuitive understanding of Future and async/await in Rust.

Ideas

In our daily work, our tasks may contain not only the action part but also the waiting part. Sometimes, we need to wait for the response from someone else or our other task to finish before the task can continue. We can "just wait", do nothing while waiting and wasting this time.

To improve efficiency, we may have multiple tasks in progress at the same time. If the current task reaches the waiting part, we may switch to another task, and go back later. In this way, we make use of the waiting time of one task by doing other tasks.

We can even build a team to do these tasks. Each team member may be doing one task. When a team member reach the waiting part of their current task, they can switch to any task that can be continued but is not being executed by anyone.

The concept of asynchronous and concurrent are based on this idea. If some tasks are in progress at the same time, we say these tasks are executed concurrently. We call the task that can be executed concurrently with others an asynchronous task. In Rust, we use future to represent an asynchronous task, and we call such a way to wait for an asynchronous task await.

Future

A future has a type that implements the trait Future. This trait has an associated type Output as the task output type. As we explained earlier, a future represents an asynchronous task, which can be an IO operation. We call the IO functions that return a future asynchronous IO, while the traditional IO functions that only return after the IO operations are done without allowing you to do other things synchronous IO. There are some libraries that provide asynchronous IO functions, which will be discussed later.

async/await

Now we want to build a more complex asynchronous task that includes waiting for some other tasks. Manually implementing the Future trait to achieve this is not an easy job. Luckily Rust provides a convienient way to write a function that returns a future by using async and await.

When you mark the function with async, it means this function is an asynchronous task and it returns a future. The function's actual return type will be a anonymous type that implements Future trait with the Output as the declared return type. For example, in the following code.

1
2
3
4
5
6
7
async fn an_async_fn() -> String {
// function implementation
}

fn calling_async_fn() {
let value = an_async_fn();
}

The value has a type that implements Future<Output=String>.

The major benefit of marking function async is that you can use await in that function to wait for the result of another task. You use await like accessing a field, which has a type of the future's output. For example, in the following code.

1
2
3
4
5
6
7
8
9
10
11
12
13
fn an_IO_operation() -> impl Future<Output = String> { /* ... */ }

async fn task1() -> usize {
let value1: String = an_IO_operation().await;

// Note that in async function, even its actual return type is a future, in the function you just return the declared output type value.
value1.len()
}

async fn task2() {
let value2 = task1().await;
// other operations
}

In task1, we use await to wait for the IO result, and obtain it as a String, then we use its length as the task result.

In task2, we use await on task1(), because an async function returns a future.

Beside using async on the function, you can also create a async block by adding async before the block { }. You can use await inside the async block, and the type of the whole block will be a future whose output is the type of the last expression of the block. For example, in the following code.

1
2
3
4
5
6
fn async_block_example() {
let async_block = async {
// some `await`s
32i32
}
}

The async_block will has a type that implements Future<Output=i32>.

Runtime

Now we have async function that returns a future, how should we actually run it? You may think we can just call the async function in main, like this.

1
2
3
4
5
6
7
async fn io() {
println!("IO started")
}

fn main() {
io();
}

Unfortunately this won't work. If you run the program nothing will be printed. It is because that the async function just returns the task, it does not run the task. To run the task, you need to assign it to a runtime. A runtime is like a person or a team (thread-pool) that you can assign the task to. They will execute the task concurrently.

Tokio is probably the most popular runtime in Rust. It also provides basic asynchornous IO operations. By adding tokio to the dependencies with full feature (tokio = { version = "1.17", features = ["full"] }), we can add async on the main function with #[tokio::main] macro. Then, we can use await inside it.

1
2
3
4
5
6
7
async fn io() {
println!("IO started")
}
#[tokio::main]
async fn main() {
io().await;
}

If you run this program, the IO started will be printed. The macro tokio::main will generate the main function that does the following things.

  1. Initialize the runtime.
  2. Assign the main function you write to the runtime.
  3. Exit the program when the task is done.

You can also assign a task by using tokio::spawn. For example, in the following code, we spawn a lot of tasks that wait for 2 seconds and print a message.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#[tokio::main]
async fn main() {
let mut futures = Vec::with_capacity(1000);
for i in 1..=1000 {
// Because in the async block the value `i` is used,
// the future need to either borrow `i` or take its ownership.
// The keyword `move` here is to indicate to take its ownership.
let f = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
println!("Hello from the task: {}", i)
});
futures.push(f);
}
for future in futures {
future.await.ok();
}
}

Note that we still use the tokio::main macro for two reasons: the macro generates the runtime initialization code which is required for spawning, and the program won't wait for the spawned tasks to finish but only the async main with tokio::main macro. We need wait for each future at the end of the main function so that program exits only when all tasks are done.

Running the program, you should see all the message roughly 2 seconds later. All the messages being printed nearly at the same time proves that the runtime executes these tasks concurrently and all the waitings for sleep happen at the same time.

Concurrent

Suppose we have two input operations and an output operation. We use sleep to simulate a long-run IO.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Dependency: anyhow = "1.0"
async fn input1() -> anyhow::Result<String> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok("input1".to_string())
}

async fn input2() -> anyhow::Result<String> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok("input2".to_string())
}

async fn output(_: &str) -> anyhow::Result<()> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
}

We now want to write a function that adds the results of two inputs then outputs it.

1
2
3
async fn task() -> anyhow::Result<()> {
output(&(input1().await? + &input2().await?)).await
}

This does solve the problem. However, the task takes about 3 seconds. It does input1 first, waits for it to fnish, then does input2, waits for it to finish, then adds the two results, waits for output to finish.

A more optimal solution is to wait for input1 and input2 at the same time. After they are all finished, add their results and execute output.

A possible solution is to assign input1 as a individual task to the runtime by tokio::spawn, then wait for input2.

1
2
3
4
5
6
7
async fn task() -> anyhow::Result<()> {
let input1 = tokio::spawn(input1());
let input2 = input2().await?;
// The `tokio::spawn` has an output that wraps the output of the spawned future into a `Result`.
// So we need two `?`, one to get the future's output, one to get the string.
output(&(input1.await?? + &input2)).await
}

This time, the future of task only takes 2 seconds to finish.

It seems to be a good solution, but it has an issue that may not be easy to find. Let's suppose input2 fails under a certain situation. We simulate it by changing input2 into

1
2
3
async fn input2() -> anyhow::Result<String> {
Err(anyhow::anyhow!("Oh no"))
}

Then, let's suppose we have a large task executing request.

1
2
3
4
5
6
7
8
#[tokio::main]
async fn main() {
loop {
if let Err(e) = task().await {
// In real-world situation we may want to respond the error.
}
}
}

Run the program, and monitor the memory usage. You will notice that the memory usage keep growing. You may think it's because we do task for a lot of times, but we await each time, so at any time there is only one task being executed.

Well, there is only one task at a time, but remember we spawn the input1! Every time task is executed, an input1 is assigned to the runtime for one seconds, while task itself finished immediately because input2 fails. And these input1 tasks accumulate and take the memory.

It may not be easy to debug because it looks like a memory leak, but it isn't. If you spawns a future of a subtask, you should make sure that the spawned subtask finished before the future finished, even if there is an error.

Luckily, we can achieve concurrent without spawning at all! The futures crate provides many useful tools to work with futures, including a macro to wait for multiple futures at the same time. Add the futures = 0.3" in your dependency, then we implement task as the following.

1
2
3
4
async fn task() -> anyhow::Result<()> {
let (input1, input2) = futures::try_join!(input1(), input2())?;
output(&(input1 + &input2)).await
}

Here we use futures::try_join! macro to wait for input1 and input2 at the same time. This macro expects each future argument has a Result<T, E> where the error types are the same. The macro has the type Result<(T1, T2, ...), E>. If all futures' outputs are Ok, the macro gives an Ok of all future's Ok values as a tuple. Otherwise if any future is Err, the macro gives you the error while other futures are cancelled. We will talk about cancellation later.

Using this way, if input1 and input2 both succeed and take 1 second, the task future takes only 2 seconds. If one of them fails, keeps executing task won't cause increasingly memory usage.

Some people may ask, why not also wait for output at the same time? Because the argument to output depends on the result of the two reads. It can't start until this result is obtained. But input1 and input2 do not need each other's results, so can be done at the same time. Also, waiting multiple futures at the same time doesn't always increase efficiency. For example, if input1 and input2 are doing network IO to the same target, then in the case of a certain bandwidth, concurrency may not prompt efficiency. Whether to adopt concurrency requires a specific analysis of specific issues.

Running synchronous IO

For some IO operations it is difficult to find the asynchronous version. If you directly call the synchronous IO function, the executor cannot switch to other task while waiting. Most runtime, including tokio, has a specific "team" (thread-pool) to run the synchronous IO. In tokio we can assign the synchronous IO to that team by tokio::task::spawn_blocking.

1
2
3
4
async {
// ...
tokio::task::spawn_blocking(|| synchronous()).await
}

The task executor will hand over the synchronous IO to the specific team, then we wait for their response. And the executor themselves can switch to other task while waiting.

It should be noted that even using spawn_blocking, there is a limit on the number of concurrencies on the synchronous IO. Asynchronous IO should be used whenever possible.

Cancellation

Sometimes we want to cancel an ongoing task. For example, when a subtask of an operation fails, we want to cancel the other subtasks. Canceling an asynchronous task is actually easy. We just don't switch back to the task, but drop it instead.

This may lead to some unexpected (but definitely defined) behaviors. For example, when you are sending a stream of data through TCP connection, it may be cancelled when part of the data is sent. There is a good article which explains this potential issue. You should be careful when cancelling is possible, for example, when using futures::try_join. If you want to achieve concurrency without cancelling, you can use futures::join.

Summarize

This article briefly introduces the future and async/await in Rust. Because I'm too lazy I don't want this article to be too long, so I didn't go into all the details. I hope this article dispels your fear of coroutines.

Explanations used in this article

  • Concurrent: if several tasks are being progressed at the same time, these tasks are executed concurrently.
  • Asynchronous task: A task that can be executed concurrently with others.
  • Future: represents an asynchronous task.
  • async: keyword to mark a function or a block that creates a future. You can use await inside.
  • await: wait for a future, while allowing the executor to switch to other task.
  • cancellation: a task can be cancelled by not being progressed anymore.issues.