Skip to main content

High-level concurrency

Coroutines are one of the most interesting features of Kotlin. However, the "coroutines standard library" sometimes falls short, especially when dealing with many suspended computations. Arrow provides those few additional functions that have proven useful in Kotlin code and other programming communities.

info

Arrow Fx makes it easier to follow the Structured Concurrency rules, even when the logic grows more complex.

Where to find it

High-level concurrency is part of the arrow-fx-coroutines library.

Independently, in parallel

We often have independent computations that we want to perform in parallel. For example, if we need to fetch a value from the database and download a file from another service, there's no reason why we shouldn't do them concurrently. We can use parZip (PARallel ZIP) to combine the execution of both computations.

suspend fun getUser(id: UserId): User =
parZip(
{ getUserName(id) },
{ getAvatar(id) }
) { name, avatar -> User(name, avatar) }

The code above showcases how parZip is used: we have a sequence of arguments representing each of the computations to perform, and, at the end, one final block (usually written in trailing form) that specifies what to do with the results of those computations. In this case, the two arguments obtain the name and avatar, and the block puts them together in the User type.

tip

Using parZip is essential not only for its high-level view on concurrency. Its implementation also takes care of the complex task of propagating exceptions and canceling running computations whenever one of the tasks fails.

In the code above, we had a fixed sequence of computations to perform in parallel. In other cases, those computations depend on some form of collection; for example, we want to obtain the name of all a user's friends. Arrow provides parMap (PARallel MAP) for that use case.

suspend fun getFriendNames(id: UserId): List<User> =
getFriendIds(id).parMap { getUserName(it) }

One potential problem with parMap is that we may have too much concurrency if the amount of elements in the collection is too significant. To fight against this problem, Arrow provides a version of parMap with an additional parameter that tells how many computations should be dispatched in parallel.

Await-all scopes

Although parZip gives the most high-level view of the code, clearly specifying which tasks are independent of each other, it has the drawback of requiring a particular style of writing your computations. Arrow provides another tool based on async, where the code is written using the usual async/.await() idioms.

suspend fun getUser(id: UserId): User = awaitAll {
val name = async { getUserName(id) }
val avatar = async { getAvatar(id) }
User(name.await(), avatar.await())
}

As the name suggests, within this awaitAll block, every time you call .await() all of the async computations that were registered until that point are awaited. If any of those throws an exception, the whole block is canceled, as per the rules of structured concurrency. In general, writing a sequence of independent async computations within awaitAll is equivalent to giving those computations as arguments to parZip.

Flows

The parMap function is also provided for Flow. If the concurrency factor is more than 1, then inner flows are collected by this operator concurrently. When this factor is one, calling parMap is identical to calling map on the flow.

Additional performance can be gained if we don't impose the same ordering on the mapping output as the one in the source flow. Just call parMapUnordered in that case. As with parMap, the concurrency factor defines how many computations should be executed concurrently at most.

Racing

The parX operators describe the cases in which we are interested in the result of every computation we perform. But imagine the scenario in which we want to download a file, but we try two servers simultaneously for resilience purposes. Once we get the file from one server, we're not really interested in the rest. This is an example of racing two computations.

Arrow provides functions that perform racing over 2 or 3 computations, with the option of customizing the coroutine context.

suspend fun file(server1: String, server2: String) =
raceN(
{ downloadFrom(server1) },
{ downloadFrom(server2) }
).merge()

The example above shows a typical pattern combined with raceN. The result of the function above is Either<A, B>, with each type corresponding to one branch in raceN. Since we have two computations that return the same type here and don't care which one "wins," we conflate both into a single value.

Integration with typed errors

Arrow's typed errors can seamlessly integrate with the Arrow Fx Coroutines operators while supporting the patterns of structured concurrency. The subtleties lie in the ordering of the DSLs and how they affect the cancellation of scopes of structured concurrency -and error handling. So you must understand how cancellation works in Structured Concurrency.

suspend fun logCancellation(): Unit = try {
println("Sleeping for 500 milliseconds ...")
delay(500)
} catch (e: CancellationException) {
println("Sleep was cancelled early!")
throw e
}

When we nest the Raise DSL inside the Arrow Fx Coroutines operators lambdas, the errors will remain inside the lambdas. Thus, they will not affect any of the regular behavior. For example, if we compute Either values inside the parZip, any occurred typed error will not affect the other computations.

suspend fun example() {
val triple = parZip(
{ either<String, Unit> { logCancellation() } },
{ either<String, Unit> { delay(100); raise("Error") } },
{ either<String, Unit> { logCancellation() } }
) { a, b, c -> Triple(a, b, c) }
println(triple)
}
Sleeping for 500 milliseconds ...
Sleeping for 500 milliseconds ...
(Either.Right(kotlin.Unit), Either.Left(Error), Either.Right(kotlin.Unit))
danger

Using typed errors with KotlinX Flow is prone to leaking the raise DSL scope and should be used carefully. More information can be found in the typed errors documentation.

Cancellation on Raise

In contrast, when we nest Arrow Fx Coroutines operators inside the Raise DSL, the errors will be observed by Structured Concurrency. typed errors follow the same rules as Structured Concurrency, and behave the same as CancellationException since they short-circuit the computation.

As shown above, parZip allows running independent tasks in parallel. If any of the tasks fail, the other tasks will get canceled. The same semantics are also guaranteed when composing parZip with typed errors.

The example below shows 3 task running in parallel, and according to the task implementation, the TaskId 2 will fail.

suspend fun example() {
val res = either {
parZip(
{ logCancellation() } ,
{ delay(100); raise("Error") },
{ logCancellation() }
) { a, b, c -> Triple(a, b, c) }
}
println(res)
}

In the output, we can see that tasks 1 and 3 started, but 2 raised an error that triggered the cancellation of the other two tasks. After tasks 1 and 3 are canceled, we see that the result of raise is returned and prints the error message.

Sleeping for 500 milliseconds ...
Sleeping for 500 milliseconds ...
Sleep was cancelled early!
Sleep was cancelled early!
Either.Left(Error)

Similarly, we can apply the same pattern to parMap when working with collections, where we want all tasks to be canceled if any of them fails.

suspend fun Raise<String>.failOnEven(i: Int): Unit {
ensure(i % 2 != 0) { delay(100); "Error" }
logCancellation()
}

suspend fun example() {
val res = either {
listOf(1, 2, 3, 4).parMap { failOnEven(it) }
}
println(res)
}

The example transforms, or maps, every element of an Iterable [1, 2, 3, 4] in parallel using parMap and failOnEven. Since failOnEven raises an error when the Int is even, it fails for inputs 2 and 4, and the other two coroutines are canceled.

Sleeping for 500 milliseconds ...
Sleeping for 500 milliseconds ...
Sleep was cancelled early!
Sleep was cancelled early!
Either.Left(Error)

Accumulating typed errors in parallel

Arrow Fx Coroutines also provides a way to accumulate errors in parallel. If we want to run tasks in parallel but accumulate all errors instead of short-circuiting, we can use parMapOrAccumulate. It works the same as parMap from our previous example, but instead of canceling the other coroutines when one fails, it accumulates the errors. So no matter how many coroutines fail, all of them will run to completion.

suspend fun example() {
val res = listOf(1, 2, 3, 4)
.parMapOrAccumulate { failOnEven(it) }
println(res)
}
Sleeping for 500 milliseconds ...
Sleeping for 500 milliseconds ...
Either.Left(NonEmptyList(Error, Error))