Resource
Allocation and release of resources is not easy, especially when we have multiple resources that depend on each other. The Resource DSL adds the ability to install resources and ensure proper finalization even in the face of exceptions and cancellations. Arrow's Resource co-operated with Structured Concurrency and KotlinX Coroutines.
- Graceful Resource Handling by Simon Vergauwen
- Graceful Shutdown with Structured Concurrency by Simon Vergauwen
Resource management is part of the arrow-fx-coroutines
library. The separate arrow-autoclose
library provides a similar API but without integration with Kotlin's coroutine mechanism.
Understanding the problem
The following program is not safe because it is prone to leak dataSource
and userProcessor
when an exception or cancellation signal occurs while using the service.
class UserProcessor {
fun start(): Unit = println("Creating UserProcessor")
fun shutdown(): Unit = println("Shutting down UserProcessor")
}
class DataSource {
fun connect(): Unit = println("Connecting dataSource")
fun close(): Unit = println("Closed dataSource")
}
class Service(val db: DataSource, val userProcessor: UserProcessor) {
suspend fun processData(): List<String> =
throw RuntimeException("I'm going to leak resources by not closing them")
}
For example, the following application would leak resources.
suspend fun example() {
val userProcessor = UserProcessor().also { it.start() }
val dataSource = DataSource().also { it.connect() }
val service = Service(dataSource, userProcessor)
service.processData()
dataSource.close()
userProcessor.shutdown()
}
If we were using Kotlin JVM, we might rely on Closeable
or AutoCloseable
and rewrite our code.
suspend fun example() {
UserProcessor().use { userProcessor ->
userProcessor.start()
DataSource().use { dataSource ->
dataSource.connect()
Service(dataSource, userProcessor).processData()
}
}
}
However, while we fixed the closing of UserProcessor
and DataSource
, there are still issues with this code:
- It requires implementing
Closeable
orAutoCloseable
, which is only possible for Kotlin JVM but not available for Multiplatform. - Requires implementing an interface or wrapping external types with something like
class CloseableOf<A>(val type: A): Closeable
. - Requires nesting of different resources in callback tree, not composable.
- Enforces
close
method name, renamedUserProcessor#shutdown
toclose
- Cannot run
suspend
functions withinfun close(): Unit
. - No exit signal; we don't know if we exited successfully, with an error or cancellation.
Resource solves these issues. The main idea is that each resource has three
stages: 1️⃣ acquiring the resource, 2️⃣ using the resource, and 3️⃣ releasing the
resource. With Resource
, we bundle steps (1) and (3), and the implementation
ensures that everything works correctly, even in the event of exceptions or
cancellations.
Correct release of resources when the application is terminating is important
in several scenarios.
SuspendApp improves on
Resource
to gracefully deal with shutdown and termination.
Dealing with resources properly
You can use Arrow's Resource in two ways:
- Using
resourceScope
and functions withResourceScope
as its receiver. - Wrapping the entire resource allocation and release as a
Resource<A>
value, which we lateruse
in a larger block.
Using resourceScope
The ResourceScope
DSL allows you to install resources and safely interact with them.
In fact, that's the only operation you need to learn about: install
takes both
the acquisition and release steps as arguments. The result of this function is
whatever was acquired, plus the promise of running the finalizer at the end of
the block.
The Resource DSL gives you enough flexibility to perform different actions
depending on how the execution finished: successful completion, exceptions,
or cancellation. The second argument to the finalizer is of type ExitCase
and represents the reason why the finalizer is run.
The code below shows our example
rewritten to use resourceScope
. Note that
we acquire our UserProcessor
and DataSource
in parallel, using the parZip
operation in Arrow. This means that their start
and connect
methods can run in parallel.
suspend fun ResourceScope.userProcessor(): UserProcessor =
install({ UserProcessor().also { it.start() } }) { p, _ -> p.shutdown() }
suspend fun ResourceScope.dataSource(): DataSource =
install({ DataSource().also { it.connect() } }) { ds, _ -> ds.close() }
suspend fun example(): Unit = resourceScope {
val service = parZip({ userProcessor() }, { dataSource() }) { userProcessor, ds ->
Service(ds, userProcessor)
}
val data = service.processData()
println(data)
}
The code above also showcases a very common pattern of resource acquisition:
running the constructor, followed by calling some start method using Kotlin's
also
scope function.
To achieve its behavior, install
invokes the acquire
and release
step
as NonCancellable.
If a cancellation signal or an exception is received during acquire
, the
resource is assumed to not have been acquired and thus will not trigger the
release function; any composed resources that are already acquired are guaranteed
to release as expected.
Interfacing with Java
If you're running on the JVM, Arrow provides built-in integration with
AutoCloseable
in the form of the closeable
function.
Using Resource
The usage of resource
is very similar to install
. The main difference
is that the result is a value of type Resource<T>
, where T
is the type of
the resource to acquire. But such a value doesn't run the acquisition step,
it's simply a recipe describing how that's done; to actually acquire the
resource, you need to call .bind()
inside a resourceScope
.
val userProcessor: Resource<UserProcessor> = resource({
UserProcessor().also { it.start() }
}) { p, _ -> p.shutdown() }
val dataSource: Resource<DataSource> = resource({
DataSource().also { it.connect() }
}) { ds, exitCase ->
println("Releasing $ds with exit: $exitCase")
withContext(Dispatchers.IO) { ds.close() }
}
val service: Resource<Service> = resource {
Service(dataSource.bind(), userProcessor.bind())
}
suspend fun example(): Unit = resourceScope {
val data = service.bind().processData()
println(data)
}
Why provide two ways to accomplish the same goal?
Although resourceScope
provides nicer syntax in general, some usage patterns
like acquiring several resources become easier when the steps are saved in
an actual class.
The actual magic is that Resource
is nothing more than a type alias for
parameter-less function using ResourceScope
,
typealias Resource<A> = suspend ResourceScope.() -> A
Although the primary usage pattern is to give resource
the acquisition and
release steps directly, there's another way to define a Resource<T>
.
Arrow provides a resource
for more complex scenarios that takes a block
with ResourceScope
as a receiver. That allows calling install
as required.
val userProcessor: Resource<UserProcessor> = resource {
val x: UserProcessor = install(
{ UserProcessor().also { it.start() } },
{ processor, _ -> processor.shutdown() }
)
x
}
Integration with typed errors
Resource management cooperates with typed error builders.
It's important to be aware that the order in which we open the scopes
affects the behavior. To be more concrete, let's consider the two possible
nestings of resourceScope
and either
.
-
When
either
is in the outermost position andresourceScope
is inside of it, a bind that crosses theresourceScope
results in the release finalizer being called withCancelled
.either<String, Int> {
resourceScope {
val a = install({ }) { _, ex -> println("Closing A: $ex") }
raise("Boom!")
} // Closing A: ExitCase.Cancelled
} // Either.Left(Boom!) -
With reverse nesting order of
either
andresourceScope
, then resources are released with a normal state since nothing "failed."resourceScope {
either<String, Int> {
val a = install({ }) { _,ex -> println("Closing A: $ex") }
raise("Boom!")
} // Either.Left(Boom!)
} // Closing A: ExitCase.Completed
We remark that, in both cases, resources are correctly released. If you're
finalizer works in the same way for every possible ExitCase
, then there's no
visible difference between both.
If you want to know more, this conversation in the Kotlin Slack goes into more detail.