Understanding Kotlin Coroutine

Coroutine is a mechanism in Kotlin to simplify writing concurrent programs. But it seems to be hard to understand. In this article, I will introduce my intuitive understanding of coroutines.

Ideas

In our daily work, our tasks may contain not only the action part but also the waiting part. Sometimes, we need to wait for an event such as the response from someone else or our other task to be finished before the task can continue. For the waiting, we can "just wait", do nothing while waiting and wasting this time.

A better option is to have multiple tasks in progress at the same time. We maintain a list of the tasks that are in progress. We may be doing one task while others are suspended. If the current task requires waiting, we suspend the task, then we may resume on another suspended task. In this way, we make use of the waiting time of one task by doing another task. We can also save time by waiting for multiple events at the same time.

This strategy can be generalized to a team. Each team member may be doing one task. The team shares the tasks list. When a team member reach the waiting part of their current task, they suspends their task, then keeps checking whether a suspended task on the board can be resumed, and resumes the task when found.

This is the main idea of coroutine. In computer programming, there are many waiting parts, which are known as IO(Input/Output). There are the requirements of having multiple tasks in progress, which is known as concurrency. A coroutine is like a task that can be suspended. The coroutine library provides dispatcher, which is like a team, to execute these coroutines concurrently, using a strategy similar to what is described above.

suspend

As mentioned earlier, the basis of a coroutine is a code execution that can be suspended. In Kotlin, we mark functions (or methods, higher-order functions) that can be suspended with suspend keyword. A suspend function can only be executed within another suspend function, because if one function's step may suspend, the function itself may suspend.

Then at the top level, the main function can be marked as suspend, and the suspend function can be executed in main.

Coroutine

With the suspend mechanism, we can execute a suspendable code as a concurrent coroutine.

First, add kotlinx.coroutines as a dependency. Then write the following code.

1
2
3
4
5
6
7
8
9
10
11
12
import kotlinx.coroutines.*

suspend fun main() {
coroutineScope {
for (i in 1..2000) {
launch {
delay(2000)
println("Hello from coroutine #$i")
}
}
}
}

In this code, coroutineScope creates a coroutine scope. Inside this scope, use launch to create a concurrent coroutine. The coroutineScope is like a section containing several subtasks. coroutineScope will wait for all subtasks being completed.

In launch we pause for 2 seconds and output a message. If these tasks are performed concurrently, all outputs will be generated simultaneously after about 2 seconds. You can run it to verify this.

async and await

Many tasks have a result. For these tasks, we may execute them concurrently and wait for their results later when needed. Suppose, we have two operations that read in an integer, and one that outputs an integer. We simulate a slower IO with delay

1
2
3
4
5
6
7
8
9
10
11
suspend fun slowInput1(): Long {
delay(1000)
return 1
}
suspend fun slowInput2(): Long {
delay(1000)
return 2
}
suspend fun slowOutput(_value: Long) {
delay(1000)
}

We now want to write a function that adds the results of two inputs and outputs it.

1
2
3
suspend fun outputSum() {
slowOutput(slowInput1() + slowInput2())
}

This does solve the problem. However, this function execution takes about 3 seconds. It does slowInput1 first, waits for it to get the result, then does slowInput2, waits for it to get the result, adds the two results, executes slowOutput, waits for it to finish.

A more optimal solution is that we can wait for the result of slowInput1 and slowInput2 at the same time. After they are all finished, add their results and execute slowOutput. We can do this with the following code

1
2
3
4
5
6
7
8
9
suspend fun outputSum() {
coroutineScope {
val input1: Deferred<Long> = async {
slowInput1()
}
val input2 = slowInput2()
slowOutput(input1.await() + input2)
}
}

Here, async means start a concurrent coroutine. This makes slowInput1 and the following slowInput2 execute concurrently. I indicated that the value of the async block is a Deferred<Long> object. Deferred<T> represents a concurrent coroutine whose result is T. After slowInput2 has been calculated, we need the result of that concurrently executed slowInput1. We use await to "wait for this coroutine to finish and get the result". Because of the concurrency, this implementation only takes two seconds.

Some people may ask, why not let slowOutput also execute concurrently? Because the argument to slowOutput depends on the result of the first two operations. It can't start until this result is obtained. But slowInput1 and slowInput2 do not need each other's results, so can be done at the same time. Also, concurrency doesn't always increase efficiency. For example, if slowInput1 and slowInput2 are doing network IO to the same target, then in the case of a certain bandwidth, concurrency may not prompt efficiency. Whether to adopt concurrency requires a specific analysis of specific issues.

Structured concurrency

As mentioned earlier, coroutineScope waits for all coroutines to complete. This is because, in most case, we want to exit the program after each coroutine created is executed in the main function; and in other scenarios of creating a coroutine, it represents a subtask of a step, and this step needs to wait for all subtasks to complete before it is completed. A philosophy of Kotlin design is to make common scenarios simple and easy to get right. Therefore, Kotlin designed coroutineScope to be the most convenient way to create coroutines. [1]

Of course, in some cases, we want this coroutine to not be limited to a scope. You can do this by having an object of type CoroutineScope and calling the launch method on it, which is not detailed here. The point is, we don't accidentally forget to wait for a sub-coroutine to complete, and only let the coroutine run out of scope if you're sure that's the desired behavior.

Asynchronous IO

As mentioned at the beginning, IO is the "waiting part". The IO operation should be suspend. But not all IO can cooperate with this mechanism. Synchronous IO cannot suspend the coroutine, but the executor can only wait for the result without doing anything else. When performing synchronous IO, they cannot switch to other task, but has to waste this idle time. To solve this issue, many libraries provide asynchronous IO.

Asynchronous IO library

The JVM platform has many libraries that provide asynchronous IO, such as Netty and Vert.x. These libraries use the class that is similar to Deferred<T> represents an asynchronous IO. Some libraries use the standard library's CompletableFuture, some use the type from RxJava, and some define such a class by themselves, usually named Future<T> or Promise<T>.

To make these libraries work with Kotlin's coroutine, some libraries provide extension methods similar to await, so that you can wait for the completion of this IO to get the result, just like Deferred<T>. For example, if you want to use await on the CompletableFuture in the standard library, you can use kotlinx-coroutines-jdk8; if you use RxJava3, you can use kotlinx-coroutines-rx3; if you use Vert.x, You can use vertx-lang-kotlin-coroutines. For other libraries, you can try searching library name + Kotlin coroutine to find such a library that provides coroutine support.

If you use a framework such as Ktor, they will wrap these asynchronous IO and provide the corresponding suspend function, so you don't have to worry about these details.

IO Dispatcher

For some IO operations it is difficult to find the asynchronous version, in which case calling them synchronously is inevitable. In this case, a good strategy is to call it in the following way.

1
2
3
launch(Dispatchers.IO) { // or async(Dispatchers.IO)
// doing synchronous IO
}.join() // or await()

It will hand over the code to the IO dispatcher for execution. Recall that the dispatcher is "the team that executes the tasks (coroutines). IO dispatcher is a "specialized team" to perform synchronous IO. After launching the synchronous IO to the IO dispatcher, it uses join to wait for it to finish which can suspend the coroutine.

It should be noted that although executing synchronous IO on the IO dispatcher can avoid blocking execution, the IO dispatcher will also have a limit on the number of concurrencies. Asynchronous IO should be used whenever possible.

Cancellation

Sometimes we want to cancel an ongoing task. For example, when a subtask of an operation fails, we want to cancel the other subtasks and throw the failure exception. However, a coroutine will not end at the moment of cancellation, for example, in the following code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
suspend fun main() {
coroutineScope {
launch {
while (true) {
println("Running...")
// Here we use Thread.sleep (instead of delay) to simulate a long CPU (instead of IO) computation.
Thread.sleep(2000)
}
}
launch {
throw Exception("Something goes wrong")
}
}
}

The second launch throws an exception, but the program is not terminated. This is because that the execution of a task won't be stopped until it suspends. A task can be cancelled only when it is suspended. If a task is cancelled, a CancellationException exception will be thrown on resume. Notice that it only happens on resume, which means it needs to be suspended before.

To make the first coroutine being cancelled correctly, we can add a yield function call in the while-loop of the first coroutine.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
suspend fun main() {
coroutineScope {
launch {
while (true) {
println("Running...")
Thread.sleep(2000)
yield()
}
}
launch {
throw Exception("Something goes wrong")
}
}
}

This yield suspends the coroutine but it is immediately resumable. The difference it makes is, when resuming after yield, it will throw the cancellation exception if it has been cancelled. Running this program, it crashes with an exception after about 2 seconds.

Other uses of suspend

Another great thing about Kotlin's suspend mechanism is that it has other uses besides coroutines and concurrency. Below I give two typical examples: sequence and DeepRecursiveFunction.

sequence

In Kotlin, we can generate an object of type Sequence through sequence {}, and use the yield function in the code block to generate each element of the sequence, such as

1
2
3
4
5
6
7
8
9
10
11
12
fun main() {
val s = sequence {
for (i in 1..10) {
// Simulate a time-consuming operation
Thread.sleep(2000)
yield(i)
}
}
for (elem in s) {
println("Get element $elem from the sequence")
}
}

Compared to building with MutableList or buildList, a major difference of sequence is that it is lazy. That is, only when you need an element, the code to get that element is executed. Execute the above program, you will find that instead of suddenly outputting all elements after a long pause, it outputs one element every 2 seconds. This shows that the block of sequence is not executed all at once, but a little bit each time an element is taken from the sequence.

This is possible with the suspend mechanism. It will continue the execution when fetching elements from sequence, and suspend when encountering yield, and then use the parameter value of yield as the element value obtained from the sequence. This is also an application of the suspend mechanism.

DeepRecursiveFunction[2]

Suppose we have a recursive function, for simplicity, here uses an example of a recursive counting function.

1
2
fun count(n: Int): Int =
if (n <= 0) n else { 1 + count(n - 1) }

The count function should return the same value as the argument. However, when n is large, a StackOverflowError will be generated due to a too deep recursive call.

In Kotlin, we can use DeepRecursiveFunction to solve this. Rewrite the function as

1
2
3
val count = DeepRecursiveFunction<Int, Int> { n ->
if (n <= 0) n else { 1 + callRecursive(n - 1) }
}

At this time, if a large parameter is used, it will run for a while and give the result without exception.

It works by treating each function call or recursive call as a task. When the task executes to the recursive call, suspend the task and start executing the recursively called task. When the recursively called task is finished, continue to execute the task corresponding to the caller. And these tasks are stored on the heap instead of the stack, so there is no stack overflow. This is thanks to the suspend functionality.

Summarize

This article briefly introduces the idea of coroutines and discusses structured concurrency, cancellation, and other uses of suspend. Because I'm too lazy I don't want this article to be too long, so I didn't go into all the details. I hope this article dispels your fear of coroutines.

Metaphors and explanations used in this article

  • Coroutine: A task that is executed concurrently and can be suspended.
  • Suspend: Marks that function execution can be suspended.
  • Dispatcher: A "team" to execute coroutines.
  • Structured concurrency: Concurrent coroutines are mostly subtasks of an operation. The structured coroutine scope completes when all coroutines launched in the scope are completed. If the coroutine scope body or one of the launched coroutine generates an exception, the other coroutines are cancelled.
  • Cancellation: Literally. But note that the cancellation will only take effect after it is suspended.

  1. If you use other languages or threads to implement similar functions, you must wait for them to end after creating coroutines/threads. Otherwise, as soon as the main function ends, these subtasks also end. ↩︎

  2. At the time of this writing, this class is still in experimental state. The @OptIn(ExperimentalStdlibApi::class) tag needs to be added to the function definition and call. ↩︎