//arrow-fx-stm/arrow.fx.stm/STM

STM

common interface STM

Consistent and safe concurrent state updates

Software transactional memory, or STM, is an abstraction for concurrent state modification. With STM one can write code that concurrently accesses state and that can easily be composed without exposing details of how it ensures safety guarantees. Programs running within an STM transaction will neither deadlock nor have race-conditions.



The api of [STM](index.html) is based on the haskell package [stm](https://hackage.haskell.org/package/stm) and the implementation is based on the GHC implementation for fine-grained locks.


The base building blocks of STM are TVar’s and the primitives retry, orElse and catch.

STM Datastructures

There are several datastructures built on top of TVar’s already provided out of the box:

  • [TQueue](../-t-queue/index.html): A transactional mutable queue
  • [TMVar](../-t-m-var/index.html): A mutable transactional variable that may be empty
  • [TSet](../-t-set/index.html), [TMap](../-t-map/index.html): Transactional Set and Map
  • [TArray](../-t-array/index.html): Array of [TVar](../-t-var/index.html)'s
  • [TSemaphore](../-t-semaphore/index.html): Transactional semaphore
  • [TVar](../-t-var/index.html): A transactional mutable variable

All of these structures (excluding TVar) are built upon TVar’s and the STM primitives and implementing other datastructures with STM can be done by composing the existing structures.

Reading and writing to concurrent state:

In order to modify transactional datastructures we have to be inside the STM context. This is achieved either by defining our functions with STM as the receiver or using stm to create lambda functions with STM as the receiver.

Running a transaction is then done using atomically:

import arrow.fx.stm.atomically
import arrow.fx.stm.TVar
import arrow.fx.stm.STM

//sampleStart
fun STM.transfer(from: TVar<Int>, to: TVar<Int>, amount: Int): Unit {
  withdraw(from, amount)
  deposit(to, amount)
}

fun STM.deposit(acc: TVar<Int>, amount: Int): Unit {
  val current = acc.read()
  acc.write(current + amount)
  // or the shorthand acc.modify { it + amount }
}

fun STM.withdraw(acc: TVar<Int>, amount: Int): Unit {
  val current = acc.read()
  if (current - amount >= 0) acc.write(current + amount)
  else throw IllegalStateException("Not enough money in the account!")
}
//sampleEnd

suspend fun main() {
  val acc1 = TVar.new(500)
  val acc2 = TVar.new(300)
  println("Balance account 1: ${acc1.unsafeRead()}")
  println("Balance account 2: ${acc2.unsafeRead()}")
  println("Performing transaction")
  atomically { transfer(acc1, acc2, 50) }
  println("Balance account 1: ${acc1.unsafeRead()}")
  println("Balance account 2: ${acc2.unsafeRead()}")
}

This example shows a banking service moving money from one account to the other with STM. Should the first account not have enough money we throw an exception. This code is guaranteed to never deadlock and to never produce an invalid state by committing after the read state has changed concurrently.



Note: A transaction that sees an invalid state (a [TVar](../-t-var/index.html) that was read has been changed concurrently) will restart and try again. This usually means we rerun the function entirely, therefore it is recommended to keep transactions small and to never use code that has side-effects inside. However no kotlin interface can actually keep you from doing side effects inside STM. Using side-effects such as access to resources, logging or network access comes with severe disadvantages:


  • Transactions may be aborted at any time so accessing resources may never trigger finalizers
  • Transactions may rerun an arbitrary amount of times before finishing and thus all effects will rerun.

Retrying manually

It is sometimes beneficial to manually abort the current transaction if, for example, an invalid state has been read. E.g. a TQueue had no elements to read. The aborted transaction will automatically restart once any previously accessed variable has changed.

This is achieved by the primitive retry:

import arrow.fx.stm.atomically
import arrow.fx.stm.TVar
import arrow.fx.stm.STM
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.async
import kotlinx.coroutines.delay

//sampleStart
fun STM.transfer(from: TVar<Int>, to: TVar<Int>, amount: Int): Unit {
  withdraw(from, amount)
  deposit(to, amount)
}

fun STM.deposit(acc: TVar<Int>, amount: Int): Unit {
  val current = acc.read()
  acc.write(current + amount)
  // or the shorthand acc.modify { it + amount }
}

fun STM.withdraw(acc: TVar<Int>, amount: Int): Unit {
  val current = acc.read()
  if (current - amount >= 0) acc.write(current + amount)
  else retry() // we now retry if there is not enough money in the account
  // this can also be achieved by using `check(current - amount >= 0); acc.write(it + amount)`
}
//sampleEnd

fun main(): Unit = runBlocking {
  val acc1 = TVar.new(0)
  val acc2 = TVar.new(300)
  println("Balance account 1: ${acc1.unsafeRead()}")
  println("Balance account 2: ${acc2.unsafeRead()}")
  async {
    println("Sending money - Searching")
    delay(2000)
    println("Sending money - Found some")
    atomically { acc1.write(100_000_000) }
  }
  println("Performing transaction")
  atomically {
    println("Trying to transfer")
    transfer(acc1, acc2, 50)
  }
  println("Balance account 1: ${acc1.unsafeRead()}")
  println("Balance account 2: ${acc2.unsafeRead()}")
}

Here in this (silly) example we changed withdraw to use retry and thus wait until enough money is in the account, which after a few seconds just happens to be the case.

retry can be used to implement a lot of complex transactions and many datastructures like TMVar or TQueue use to to great effect.

Branching with orElse

orElse is another important primitive which allows a user to detect if a branch called retry and then use a fallback instead. If the fallback retries as well the whole transaction retries.

import kotlinx.coroutines.runBlocking
import arrow.fx.stm.atomically
import arrow.fx.stm.TVar
import arrow.fx.stm.STM
import arrow.fx.stm.stm

//sampleStart
fun STM.transaction(v: TVar<Int>): Int? =
  stm {
    val result = v.read()
    check(result in 0..10)
    result
  } orElse { null }
//sampleEnd

fun main(): Unit = runBlocking {
  val v = TVar.new(100)
  println("Value is ${v.unsafeRead()}")
  atomically { transaction(v) }
    .also { println("Transaction returned $it") }
  println("Set value to 5")
  println("Value is ${v.unsafeRead()}")
  atomically { v.write(5) }
  atomically { transaction(v) }
    .also { println("Transaction returned $it") }
}

This example uses stm which is a helper just like the stdlib function suspend to ease use of an infix function like orElse. In this transaction, when the value inside the variable is not in the correct range, the transaction retries (due to check calling retry). If it is in the correct range it simply returns the value. orElse here intercepts a call to retry and executes the alternative which simply returns null.

Exceptions

Throwing inside STM will let the exception bubble up to either a catch handler or to atomically which will rethrow it.



Note: Using `try {...} catch (e: Exception) {...}` is not encouraged because any state change inside `try` will not be undone when an exception occurs! The recommended way of catching exceptions is to use [catch](catch.html) which properly rolls back the transaction!


Further reading:

  • [Composable memory transactions, by Tim Harris, Simon Marlow, Simon Peyton Jones, and Maurice Herlihy, in ACM Conference on Principles and Practice of Parallel Programming 2005.](https://www.microsoft.com/en-us/research/publication/composable-memory-transactions/)

Functions

Name Summary
acquire common open fun TSemaphore.acquire()
Acquire 1 permit from a TSemaphore.
common open fun TSemaphore.acquire(n: Int)
Acquire n permit from a TSemaphore.
available common open fun TSemaphore.available(): Int
Returns the currently available number of permits in a TSemaphore.
catch common abstract fun <A> catch(f: STM.() -> A, onError: STM.(Throwable) -> A): A
Run f and handle any exception thrown with onError.
flush common open fun <A> TQueue<A>.flush(): List<A>
Drains all entries of a TQueue into a single list.
fold common open fun <A, B> TArray<A>.fold(init: B, f: (B, A) -> B): B
Fold a TArray to a single value.
get common open operator fun <A> TArray<A>.get(i: Int): A
Read a variable from the TArray.
common open operator fun <K, V> TMap<K, V>.get(k: K): V?
Alias of STM.lookupkotlin:ank:playground import arrow.fx.stm.TMap import arrow.fx.stm.atomically<br>suspend fun main() { //sampleStart val tmap = TMap.new<Int, String>() val result = atomically { tmap[1] = "Hello" tmap[2] = "World"<br> tmap[2] } //sampleEnd println("Result $result") }<br>If the key is not present [STM.get](get.html) will not retry, instead it returns `null`.<br>
insert common open fun <A> TSet<A>.insert(a: A)
Adds an element to the set.
common open fun <K, V> TMap<K, V>.insert(k: K, v: V)
Add a key value pair to the mapkotlin:ank:playground import arrow.fx.stm.TMap import arrow.fx.stm.atomically<br>suspend fun main() { //sampleStart val tmap = TMap.new<Int, String>() atomically { tmap.insert(10, "Hello") } //sampleEnd }
isEmpty common open fun <A> TMVar<A>.isEmpty(): Boolean
Check if a TMVar is empty.
common open fun <A> TQueue<A>.isEmpty(): Boolean
Check if a TQueue is empty.
isNotEmpty common open fun <A> TMVar<A>.isNotEmpty(): Boolean
Check if a TMVar is not empty.
common open fun <A> TQueue<A>.isNotEmpty(): Boolean
Check if a TQueue is not empty.
lookup common open fun <K, V> TMap<K, V>.lookup(k: K): V?
Lookup a value at the specific key kkotlin:ank:playground import arrow.fx.stm.TMap import arrow.fx.stm.atomically<br>suspend fun main() { //sampleStart val tmap = TMap.new<Int, String>() val result = atomically { tmap[1] = "Hello" tmap[2] = "World"<br> tmap.lookup(1) } //sampleEnd println("Result $result") }<br>If the key is not present [STM.lookup](lookup.html) will not retry, instead it returns `null`.<br>
member common open fun <K, V> TMap<K, V>.member(k: K): Boolean
Check if a key k is in the mapkotlin:ank:playground import arrow.fx.stm.TMap import arrow.fx.stm.atomically<br>suspend fun main() { //sampleStart val tmap = TMap.new<Int, String>() atomically { tmap[1] = "Hello"<br> tmap.remove(1) } //sampleEnd }This function never retries.
common open fun <A> TSet<A>.member(a: A): Boolean
Check if an element is already in the setkotlin:ank:playground import arrow.fx.stm.TSet import arrow.fx.stm.atomically<br>suspend fun main() { //sampleStart val tset = TSet.new<String>() val result = atomically { tset.insert("Hello") tset.member("Hello") } //sampleEnd println("Result $result") }
modify common open fun <A> TVar<A>.modify(f: (A) -> A)
Modify the value of a TVarkotlin:ank:playground import arrow.fx.stm.TVar import arrow.fx.stm.atomically<br>suspend fun main() { //sampleStart val tvar = TVar.new(10) val result = atomically { tvar.modify { it * 2 } } //sampleEnd println(result) }modify(f) = write(f(read()))
newTVar common open fun <A> newTVar(a: A): TVar<A>
Create a new TVar inside a transaction, because TVar.new is not possible inside STM transactions.
orElse common abstract infix fun <A> STM.() -> A.orElse(other: STM.() -> A): A
Run the given transaction and fallback to the other one if the first one calls retry.
peek common open fun <A> TQueue<A>.peek(): A
Read the front element of a TQueue without removing it.
plusAssign common open operator fun <K, V> TMap<K, V>.plusAssign(kv: Pair<K, V>)
Add a key value pair to the mapkotlin:ank:playground import arrow.fx.stm.TMap import arrow.fx.stm.atomically<br>suspend fun main() { //sampleStart val tmap = TMap.new<Int, String>() atomically { tmap += (1 to "Hello") } //sampleEnd }
common open operator fun <A> TQueue<A>.plusAssign(a: A)
Append an element to the TQueue.
common open operator fun <A> TSet<A>.plusAssign(a: A)
Adds an element to the set.
put common open fun <A> TMVar<A>.put(a: A)
Put a value into an empty TMVar.
read common open fun <A> TMVar<A>.read(): A
Read a value from a TMVar without removing it.
common open fun <A> TQueue<A>.read(): A
Remove the front element from the TQueue or retry if the TQueue is empty.
common abstract fun <A> TVar<A>.read(): A
Read the value from a TVar.
release common open fun TSemaphore.release()
Release a permit back to the TSemaphore.
common open fun TSemaphore.release(n: Int)
Release n permits back to the TSemaphore.
remove common open fun <K, V> TMap<K, V>.remove(k: K)
Remove a key value pair from a mapkotlin:ank:playground import arrow.fx.stm.TMap import arrow.fx.stm.atomically<br>suspend fun main() { //sampleStart val tmap = TMap.new<Int, String>() atomically { tmap[1] = "Hello" tmap.remove(1) } //sampleEnd }
common open fun <A> TSet<A>.remove(a: A)
Remove an element from the set.
removeAll common open fun <A> TQueue<A>.removeAll(pred: (A) -> Boolean)
Filter a TQueue, removing all elements for which pred returns false.
retry common abstract fun retry(): Nothing
Abort and retry the current transaction.
set common open operator fun <A> TArray<A>.set(i: Int, a: A)
Set a variable in the TArray.
common open operator fun <K, V> TMap<K, V>.set(k: K, v: V)
Alias for STM.insertkotlin:ank:playground import arrow.fx.stm.TMap import arrow.fx.stm.atomically<br>suspend fun main() { //sampleStart val tmap = TMap.new<Int, String>() atomically { tmap[1] = "Hello" } //sampleEnd }
size common open fun <A> TQueue<A>.size(): Int
Return the current number of elements in a TQueuekotlin:ank:playground import arrow.fx.stm.TQueue import arrow.fx.stm.atomically<br>suspend fun main() { //sampleStart val tq = TQueue.new<Int>() val result = atomically { tq.size() } //sampleEnd println("Result $result") }This function never retries.
swap common open fun <A> TMVar<A>.swap(a: A): A
Swap the content of a TMVar or retry if it is empty.
common open fun <A> TVar<A>.swap(a: A): A
Swap the content of the TVarkotlin:ank:playground import arrow.fx.stm.TVar import arrow.fx.stm.atomically<br>suspend fun main() { //sampleStart val tvar = TVar.new(10) val result = atomically { tvar.swap(20) } //sampleEnd println("Result $result") println("New value ${tvar.unsafeRead()}") }
take common open fun <A> TMVar<A>.take(): A
Read the value from a TMVar and empty it.
transform common open fun <A> TArray<A>.transform(f: (A) -> A)
Modify each element in a TArray by applying f.
tryAcquire common open fun TSemaphore.tryAcquire(): Boolean
open fun TSemaphore.tryAcquire(n: Int): Boolean
Like TSemaphore.acquire except that it returns whether or not acquisition was successful.
tryPeek common open fun <A> TQueue<A>.tryPeek(): A?
Same as TQueue.peek except it returns null if the TQueue is empty.
tryPut common open fun <A> TMVar<A>.tryPut(a: A): Boolean
Same as TMVar.put except that it returns true or false if was successful or it retried.
tryRead common open fun <A> TMVar<A>.tryRead(): A?
Same as TMVar.read except that it returns null if the TMVar is empty and thus never retries.
common open fun <A> TQueue<A>.tryRead(): A?
Same as TQueue.read except it returns null if the TQueue is empty.
tryTake common open fun <A> TMVar<A>.tryTake(): A?
Same as TMVar.take except it returns null if the TMVar is empty and thus never retries.
update common open fun <K, V> TMap<K, V>.update(k: K, fn: (V) -> V)
Update a value at a key if it exists.
write common open fun <A> TQueue<A>.write(a: A)
Append an element to the TQueue.
common abstract fun <A> TVar<A>.write(a: A)
Set the value of a TVar.
writeFront common open fun <A> TQueue<A>.writeFront(a: A)
Prepend an element to the TQueue.

Extensions

Name Summary
check common fun STM.check(b: Boolean)
Retry if b is false otherwise does nothing.
newEmptyTMVar common fun <A> STM.newEmptyTMVar(): TMVar<A>
newTArray common fun <A> STM.newTArray(size: Int, f: (Int) -> A): TArray<A>
fun <A> STM.newTArray(size: Int, a: A): TArray<A>
fun <A> STM.newTArray(vararg arr: A): TArray<A>
fun <A> STM.newTArray(xs: Iterable<A>): TArray<A>
newTMap common fun <K, V> STM.newTMap(fn: (K) -> Int): TMap<K, V>
fun <K, V> STM.newTMap(): TMap<K, V>
newTMVar common fun <A> STM.newTMVar(a: A): TMVar<A>
newTQueue common fun <A> STM.newTQueue(): TQueue<A>
newTSem common fun STM.newTSem(initial: Int): TSemaphore
newTSet common fun <A> STM.newTSet(fn: (A) -> Int): TSet<A>
fun <A> STM.newTSet(): TSet<A>

Do you like Arrow?

Arrow Org
<