Louis CAD
Louis CAD doing software

Louis CAD doing software

Coroutines racing! Why, and how.

Coroutines racing! Why, and how.

Louis CAD

Published on Nov 8, 2021

7 min read

Over the years, I found myself needing the following:

Run multiple related coroutines (i.e. asynchronous operations), and when one completes, cancel/stop the other ones.

As usual, and especially since we're in Kotlin, I have been looking for the easiest way to do it correctly, and efficiently.

Within the kotlinx.coroutines first party library, I didn't find the high-level API I was looking for, though the building blocks to make it were definitely there.

Before we look at the solution I've been using, let's see some use cases for coroutines racing.

Why race in the first place

The kind of use case that I found to be the most obvious is allowing triggering something via multiple means. For example, we could have a manual and an automatic way of enabling something. Or we could have a local and remote way of turning off something else. Or, more simply, we could have two buttons that do the same thing. Maybe you can think of one or two other plausible scenarios fitting this kind of use case.

Another kind of use case is having multiple terminal operations. For example, during an onboarding step in an app, or an app feature, we could expect the user to request watching a quick video introduction, while being able to skip it midway.

There are of course many other reasons we would want to race coroutines.

Before moving on to the how, I want to emphasize the desired behavior: when one of the racing coroutines completes, we want all other race contenders to be cancelled.

Our example use case

For the rest of this article, and for the sake of simplicity, we'll settle on the triggering use case.

A triggering operation can be abstracted away as a suspending function with such a signature:

suspend fun awaitTrigger()

As you can guess, we would expect that function to return/resume when it's time to trigger the thing.

Now, for the multiple trigger means, we could imagine that we actually have two other suspending functions: awaitAutomaticTrigger(), and awaitManualTrigger().

How can we race these two functions?

A basic approach

If you want to race two coroutines, here's one way you could come up with:

suspend fun awaitTrigger() {
    coroutineScope {
        val automaticTrigger = launch {
            awaitAutomaticTrigger()
            // Aaargh, can't access `manualTrigger` here!
        }
        val manualTrigger = launch {
            awaitManualTrigger()
            automaticTrigger.cancel()
        }
        // `coroutineScope { … }` suspends until all children are complete or cancelled,
        // so no need to call `join()`, nor go with an extra indirection.
    }
}

As you can see, the most straightforward approach, that works for just two racers, already has a shortcoming (as you can see in the "Aaargh" comment). One workaround is declaring manualTrigger as a lateinit var. Works, but not so neat, and it's now underlined by our IDE 😔.

What would be an ideal API?

Well, we can never really know what the absolute best API could be, but let's at least imagine something easier to use than the previous snippet that would serve the same purpose.

That API would need to surface the intent of racing multiple suspend calls, with the cancelling behavior for the non-winners.

So, it'd need the verb "race" in its name, and it'd need to take multiple suspending functions as a parameter? How could it look for our triggering use case?

suspend awaitTrigger() = raceOf({
    awaitAutomaticTrigger()
}, {
    awaitManualTrigger()
})

So, we went from 11 lines of code, including 8 significant, to 5 lines of code, including 3 significant. Quite cool?

If you analyze the two snippets, you'll also see that for the first approach, unless we rework it, the number of lines of code, and the potential mistakes one can make along the way, grows exponentially as we add more racers, while for this raceOf API, it grows linearly at a rate of only 2 lines of code per racer, with a single one significant.

That API, while very simple, has one shortcoming: the number of racing coroutines is fixed at compile time, just like all vararg parameters, which is what raceOf takes. It's possible to use the spread operator after converting a list of suspend functions to a typed array, but it's not optimal, and it doesn't allow late racers.

To support a dynamic amount of coroutines, and late racers, we can imagine another API. A race { … } function would bring a RacingScope where you could call launchRacer { … } inside to, well, launch racers.

Here's how it'd look for the triggering use case:

suspend awaitTrigger() = race {
    launchRacer { awaitAutomaticTrigger() }
    launchRacer { awaitManualTrigger() }
}

Interestingly, it's even less code than our previous approach, and each additional racer can take as little as 1 line of code in total.

How could that coroutines racing API be implemented?

The first time I wanted to race coroutines with a neat API was about 3 years ago. I also wanted it to support returning the value of the winning racer/coroutine.

My first idea was to make a suspending extension function on List<Deferred> that I named race, and I implemented it with one CompletableDeferred instance and a loop:

suspend fun <T> List<Deferred<T>>.race(): T = coroutineScope {
    val winningValue = CompletableDeferred<T>()
    this@race.forEach { racer ->
        launch {
            val winningCandidate = racer.await()
            this@race.forEach { if (it != racer) it.cancel() }
            winningValue.complete(winningCandidate)
        }
    }
    winningValue.await()
}

I then found it inconvenient to have to create the Deferred instances via calls to async { … } that had to be done inside of another coroutineScope { … } block, so I put that boilerplate inside a function named raceOf:

suspend fun <T> race(vararg racers: suspend CoroutineScope.() -> T): T = coroutineScope {
    val list: List<Deferred<T>> = racers.map { racer -> async { racer() } }
    val winningValue = CompletableDeferred<T>()
    list.forEach { racer ->
        launch {
            val winningCandidate = racer.await()
            list.forEach { racer.cancel() } // Cancelling a complete Job is no-op.
            winningValue.complete(winningCandidate)
        }
    }
    winningValue.await()
}

Thanks to the help of folks on Kotlin's Slack and some searching, I then found that I could also use the select clause for that:

/**
 * Races all the [racers] concurrently. Once the winner completes, all other racers are cancelled,
 * then the value of the winner is returned.
 */
suspend fun <T> raceOf(vararg racers: suspend CoroutineScope.() -> T): T = coroutineScope {
    select<T> {
        val racersAsyncList = racers.map { async(start = CoroutineStart.UNDISPATCHED, block = it) }
        racersAsyncList.forEachByIndex { racer: Deferred<T> ->
            racer.onAwait { resultOfWinner: T ->
                racersAsyncList.forEachByIndex { deferred: Deferred<T> -> deferred.cancel() }
                return@onAwait resultOfWinner
            }
        }
    }
}

Both implementations work fine.

It's very possible one is more performant than the other. I didn't benchmark because I am not putting these implementations under a high stress (i.e. high number of racers, or calling them at a high frequency), and they have never been the cause of any performance or stability issue. If you have such a demanding use case, feel free to do the benchmark and your testing approach, and share/link it in the comments below, I'll be interested in seeing it! I'd bet the select based implementation is the most efficient, but I wouldn't put much on table for that mostly intuition-based guess.

Sharing that work widely

Since I thought I might not be the only one with this use case, I put the select based implementation of raceOf into one of the Splitties libraries, and that Kotlin multiplatform library also includes the race function for dynamic number of racers that I was referring to earlier in this article (permalink to its implementation here).

You can find the host library module here, with the entire source code, the documentation, and setup info to add the library to your project.

However, having this in a third-party library limits the accessibility of these coroutines racing facilities which I believe could be useful in many Kotlin projects. That's why I submitted an issue in the kotlinx.coroutines GitHub repository to ask for that use case to be addressed in the first-party library from the Kotlin team. Feel free to add a 👍 reaction to it so the priority gets high enough for it to be considered before we are all dead!

I hope you enjoyed reading and learned something helpful. Have a great Kotlin… well, have a great everything!

 
Share this