advanced
A Fiber
is a concurrency primitive for describing parallel operations or multi-tasking.
Concurrently started tasks can either be joined or canceled and this are the only two operators available on Fiber
.
Using Fiber
we can verily easily describe parallel operations such as parallelMap
.
Note the operation written below does not support proper cancellation,
when the resulting IO
is canceled it does not propagate this cancellation back to the underlying IO
.
import arrow.fx.*
import kotlinx.coroutines.Dispatchers.Default
import arrow.fx.extensions.fx
import arrow.fx.typeclasses.Fiber
import arrow.fx.IO
fun <A, B, C> parallelMap(first: IO<A>,
second: IO<B>,
f: (A, B) -> C): IO<C> =
IO.fx {
val (fiberOne: Fiber<ForIO, A>) = first.fork(Default)
val (fiberTwo: Fiber<ForIO, B>) = second.fork(Default)
f(!fiberOne.join(), !fiberTwo.join())
}
val first = IO<Unit> { Thread.sleep(5000) }.map {
println("Hi, I am first")
1
}
val second = IO<Unit> { Thread.sleep(5000) }.map {
println("Hi, I am second")
2
}
parallelMap(first, second, Int::plus).await()
//Hi, I am second
//Hi, I am first
//3
We could fix this snippet to support proper cancellation by using bracket
instead of flatMap
,
which allows us to register an operation to run on cancellation, error or completion.
import arrow.fx.extensions.io.monad.flatMap
import arrow.fx.extensions.io.monad.map
fun <A, B, C> parallelMap2(first: IO<A>,
second: IO<B>,
f: (A, B) -> C): IO<C> =
first.fork(Default).bracket(use = { (joinA, _) ->
second.fork(Default).bracket(use = { (joinB, _) ->
joinA.flatMap { a ->
joinB.map { b -> f(a, b) }
}
}, release = { (_, cancelB) -> cancelB })
}, release = { (_, cancelA) -> cancelA })