High-level concurrency
Coroutines are one of the most interesting features of Kotlin. However, the "coroutines standard library" sometimes falls short, especially when dealing with many suspended computations. Arrow provides those few additional functions that have proven useful in Kotlin code and other programming communities.
Arrow Fx makes it easier to follow the Structured Concurrency rules, even when the logic grows more complex.
High-level concurrency is part of the arrow-fx-coroutines
library.
Independently, in parallel
We often have independent computations that we want to perform in parallel.
For example, if we need to fetch a value from the database and download a file
from another service, there's no reason why we shouldn't do them concurrently.
We can use parZip
(PARallel ZIP) to combine the execution of both computations.
suspend fun getUser(id: UserId): User =
parZip(
{ getUserName(id) },
{ getAvatar(id) }
) { name, avatar -> User(name, avatar) }
The code above showcases how parZip
is used: we have a sequence of arguments
representing each of the computations to perform, and, at the end, one final
block (usually written in trailing form) that specifies what to do with the
results of those computations. In this case, the two arguments obtain the name
and avatar, and the block puts them together in the User
type.
Using parZip
is essential not only for its high-level view on concurrency.
Its implementation also takes care of the complex task of propagating exceptions
and canceling running computations whenever one of the tasks fails.
In the code above, we had a fixed sequence of computations to perform in parallel.
In other cases, those computations depend on some form of collection; for example,
we want to obtain the name of all a user's friends. Arrow provides parMap
(PARallel MAP)
for that use case.
suspend fun getFriendNames(id: UserId): List<User> =
getFriendIds(id).parMap { getUserName(it) }
One potential problem with parMap
is that we may have too much concurrency
if the amount of elements in the collection is too significant. To fight against this
problem, Arrow provides a version of parMap
with an additional parameter that
tells how many computations should be dispatched in parallel.
Await-all scopes
Although parZip
gives the most high-level view of the code, clearly specifying
which tasks are independent of each other, it has the drawback of requiring a particular
style of writing your computations. Arrow provides another tool based on async
,
where the code is written using the usual async
/.await()
idioms.
suspend fun getUser(id: UserId): User = awaitAll {
val name = async { getUserName(id) }
val avatar = async { getAvatar(id) }
User(name.await(), avatar.await())
}
As the name suggests, within this awaitAll
block, every time you call .await()
all of the async
computations that were registered until that point are
awaited. If any of those throws an exception, the whole block is canceled, as
per the rules of structured concurrency. In general, writing a sequence of independent
async
computations within awaitAll
is equivalent to giving those computations
as arguments to parZip
.
Flows
The parMap
function is also provided for Flow
.
If the concurrency factor is more than 1, then inner flows are collected by this operator concurrently.
When this factor is one, calling parMap
is identical to calling map
on the flow.
Additional performance can be gained if we don't impose the same ordering on
the mapping output as the one in the source flow. Just call parMapUnordered
in that case. As with parMap
, the concurrency factor defines how many
computations should be executed concurrently at most.
Racing
The parX
operators describe the cases in which we are interested in the result
of every computation we perform. But imagine the scenario in which we want to
download a file, but we try two servers simultaneously for resilience purposes. Once we get the file from one server, we're not really interested in the
rest. This is an example of racing two computations.
Arrow provides functions that perform racing over 2 or 3 computations, with the option of customizing the coroutine context.
suspend fun file(server1: String, server2: String) =
raceN(
{ downloadFrom(server1) },
{ downloadFrom(server2) }
).merge()
The example above shows a typical pattern combined with raceN
.
The result of the function above is Either<A, B>
, with each type
corresponding to one branch in raceN
. Since we have two computations that
return the same type here and don't care which one "wins," we conflate both into
a single value.
Integration with typed errors
Arrow's typed errors can seamlessly integrate with the Arrow Fx Coroutines operators while supporting the patterns of structured concurrency. The subtleties lie in the ordering of the DSLs and how they affect the cancellation of scopes of structured concurrency -and error handling. So you must understand how cancellation works in Structured Concurrency.
suspend fun logCancellation(): Unit = try {
println("Sleeping for 500 milliseconds ...")
delay(500)
} catch (e: CancellationException) {
println("Sleep was cancelled early!")
throw e
}
When we nest the Raise
DSL inside the Arrow Fx Coroutines operators lambdas, the errors will remain inside the lambdas. Thus, they will not affect any of the regular behavior.
For example, if we compute Either
values inside the parZip
, any occurred typed error will not affect the other computations.
suspend fun example() {
val triple = parZip(
{ either<String, Unit> { logCancellation() } },
{ either<String, Unit> { delay(100); raise("Error") } },
{ either<String, Unit> { logCancellation() } }
) { a, b, c -> Triple(a, b, c) }
println(triple)
}
Sleeping for 500 milliseconds ...
Sleeping for 500 milliseconds ...
(Either.Right(kotlin.Unit), Either.Left(Error), Either.Right(kotlin.Unit))
Using typed errors with KotlinX Flow is prone to leaking the raise
DSL scope and should be used carefully.
More information can be found in the typed errors documentation.
Cancellation on Raise
In contrast, when we nest Arrow Fx Coroutines operators inside the Raise
DSL, the errors will be observed by Structured Concurrency.
typed errors follow the same rules as Structured Concurrency, and behave the same as CancellationException
since they short-circuit the computation.
As shown above, parZip
allows running independent tasks in parallel. If any of the tasks fail, the other tasks will get canceled.
The same semantics are also guaranteed when composing parZip
with typed errors.
The example below shows 3 task
running in parallel, and according to the task
implementation, the TaskId
2 will fail.
suspend fun example() {
val res = either {
parZip(
{ logCancellation() } ,
{ delay(100); raise("Error") },
{ logCancellation() }
) { a, b, c -> Triple(a, b, c) }
}
println(res)
}
In the output, we can see that tasks 1
and 3
started, but 2
raised an error that triggered the cancellation of the other two tasks.
After tasks 1
and 3
are canceled, we see that the result of raise
is returned and prints the error message.
Sleeping for 500 milliseconds ...
Sleeping for 500 milliseconds ...
Sleep was cancelled early!
Sleep was cancelled early!
Either.Left(Error)
Similarly, we can apply the same pattern to parMap
when working with collections, where we want all tasks to be canceled if any of them fails.
suspend fun Raise<String>.failOnEven(i: Int): Unit {
ensure(i % 2 != 0) { delay(100); "Error" }
logCancellation()
}
suspend fun example() {
val res = either {
listOf(1, 2, 3, 4).parMap { failOnEven(it) }
}
println(res)
}
The example transforms, or maps, every element of an Iterable
[1, 2, 3, 4]
in parallel using parMap
and failOnEven
.
Since failOnEven
raises an error when the Int
is even, it fails for inputs 2 and 4, and the other two coroutines are canceled.
Sleeping for 500 milliseconds ...
Sleeping for 500 milliseconds ...
Sleep was cancelled early!
Sleep was cancelled early!
Either.Left(Error)
Accumulating typed errors in parallel
Arrow Fx Coroutines also provides a way to accumulate errors in parallel.
If we want to run tasks in parallel but accumulate all errors instead of short-circuiting, we can use parMapOrAccumulate
.
It works the same as parMap
from our previous example, but instead of canceling the other coroutines when one fails, it accumulates the errors.
So no matter how many coroutines fail, all of them will run to completion.
suspend fun example() {
val res = listOf(1, 2, 3, 4)
.parMapOrAccumulate { failOnEven(it) }
println(res)
}
Sleeping for 500 milliseconds ...
Sleeping for 500 milliseconds ...
Either.Left(NonEmptyList(Error, Error))