A Fiber is a concurrency primitive for describing parallel operations or multi-tasking. Concurrently started tasks can either be joined or cancelled, and these are the only two operators available on Fiber.

Using Fiber, we can describe parallel operations such as parallelMap relatively easily. Note the operation written below does not support proper cancellation. When the resulting IO is cancelled, it does not propagate this cancellation back to the underlying IO.

import arrow.fx.*
import kotlinx.coroutines.Dispatchers.Default
import arrow.fx.extensions.fx
import arrow.fx.typeclasses.Fiber
import arrow.fx.IO

fun <A, B, C> parallelMap(first: IO<A>,
                     second: IO<B>,
                     f: (A, B) -> C): IO<C> =
  IO.fx {
    val (fiberOne: Fiber<ForIO, A>) = first.fork(Default)
    val (fiberTwo: Fiber<ForIO, B>) = second.fork(Default)
    f(!fiberOne.join(), !fiberTwo.join())

val first = IO<Unit> { Thread.sleep(5000) }.map {
  println("Hi, I am first")

val second = IO<Unit> { Thread.sleep(5000) }.map {
  println("Hi, I am second")
parallelMap(first, second, Int::plus).await()

//Hi, I am second
//Hi, I am first

We could fix this snippet to support proper cancellation by using bracket instead of flatMap, which allows us to register an operation to run on cancellation, error, or completion.


fun <A, B, C> parallelMap2(first: IO<A>,
                          second: IO<B>,
                          f: (A, B) -> C): IO<C> =
      first.fork(Default).bracket(use = { (joinA, _) ->
          second.fork(Default).bracket(use = { (joinB, _) ->
            joinA.flatMap { a ->
     { b -> f(a, b) }
          }, release = { (_, cancelB) -> cancelB })
        }, release = { (_, cancelA) -> cancelA })

Do you like Arrow?

Arrow Org