Arrow aims to enhance the user experience when using Project Reactor. While providing other datatypes that are capable of handling effects, like IO, the style of programming encouraged by the library allows users to generify behavior for any existing abstractions.
One of these abstractions is Project Reactor, a library that, like RxJava, offers reactive streams.
val flux = Flux.just(7, 4, 11 ,3)
.map { it + 1 }
.filter { it % 2 == 0 }
.scan { acc, value -> acc + value }
.collectList()
.subscribeOn(Schedulers.parallel())
.block()
//[8, 20, 24]
The largest quality of life improvement when using Flux streams in Arrow is the introduction of the Monad Comprehension. This library construct allows expressing asynchronous Flux sequences as synchronous code using binding/bind.
To wrap any existing Flux in its Arrow Wrapper counterpart, you can use the extension function k()
.
import arrow.fx.reactor.*
import reactor.core.publisher.*
val flux = Flux.just(1, 2, 3, 4, 5).k()
flux
// FluxK(flux=FluxArray)
val mono = Mono.just(1).k()
mono
// MonoK(mono=MonoJust)
You can return to their regular forms using the function value()
.
flux.value()
// FluxArray
mono.value()
// MonoJust
The library provides instances of MonadError
and MonadDefer
.
Async
allows you to generify over datatypes that can run asynchronous code. You can use it with FluxK
or MonoK
.
fun <F> getSongUrlAsync(MS: MonadDefer<F>) =
MS { getSongUrl() }
val songFlux: FluxKOf<Url> = getSongUrlAsync(FluxK.monadDefer())
val songMono: MonoKOf<Url> = getSongUrlAsync(MonoK.monadDefer())
MonadThrow
can be used to start a Monad Comprehension using the method fx.monadThrow
, with all its benefits.
Let’s take an example and convert it to a comprehension. We’ll create an observable that loads a song from a remote location, and then reports the current play % every 100 milliseconds until the percentage reaches 100%:
getSongUrlAsync()
.map { songUrl -> MediaPlayer.load(songUrl) }
.flatMap {
val totalTime = musicPlayer.getTotaltime()
Flux.interval(Duration.ofMillis(100))
.flatMap {
Flux.create { musicPlayer.getCurrentTime() }
.map { tick -> (tick / totalTime * 100).toInt() }
}
.takeUntil { percent -> percent >= 100 }
}
When rewritten using fx.monadThrow
, it becomes:
import arrow.fx.reactor.*
import arrow.typeclasses.*
import arrow.fx.reactor.extensions.fluxk.monadThrow.monadThrow
FluxK.monadThrow().fx.monadThrow {
val (songUrl) = getSongUrlAsync()
val musicPlayer = MediaPlayer.load(songUrl)
val totalTime = musicPlayer.getTotaltime()
val end = DirectProcessor.create<Unit>()
Flux.interval(Duration.ofMillis(100)).takeUntilOther(end).bind()
val (tick) = musicPlayer.getCurrentTime()
val percent = (tick / totalTime * 100).toInt()
if (percent >= 100) {
end.onNext(Unit)
}
percent
}
Note that any unexpected exception, like ArithmeticException
when totalTime
is 0, is automatically caught and wrapped inside the flux.
Flux streams created with comprehensions like fx.monadThrow
behave the same way regular flux streams do, including cancellation by disposing the subscription.
val disposable =
songFlux.value()
.subscribe({ println("Song $it") }, { System.err.println("Error $it") })
disposable.dispose()
While MonadDefer
usually guarantees stack safety, this does not apply for the reactor wrapper types.
This is a limitation on reactor’s side. See the corresponding GitHub issue.
To overcome this limitation and run code in a stack in a safe way, one can make use of fx.stackSafe
, which is provided for every instance of Monad
when you have arrow-free
included.
import arrow.Kind
import arrow.fx.reactor.MonoK
import arrow.fx.reactor.ForMonoK
import arrow.fx.reactor.fix
import arrow.fx.reactor.extensions.monok.monad.monad
import arrow.free.stackSafe
fun main() {
//sampleStart
// This will not result in a stack overflow
val result = MonoK.monad().fx.stackSafe {
(1..50000).fold(just(0)) { acc: Kind<ForMonoK, Int>, x: Int ->
just(acc.bind() + 1)
}.bind()
}.run(MonoK.monad())
//sampleEnd
println(result.fix().mono.block()!!)
}
import arrow.fx.IO
// This will result in a stack overflow
IO {
MonoK.monad().fx.monad {
(1..50000).fold(just(0)) { acc: Kind<ForMonoK, Int>, x: Int ->
just(acc.bind() + 1)
}.bind()
}.fix().mono.block()
}.attempt().unsafeRunSync()
// Left(java.lang.StackOverflowError)
Do you like Arrow?
✖