//arrow-fx-stm/arrow.fx.stm/TQueue

TQueue

common data class TQueue<A>

A TQueue is a transactional unbounded queue which can be written to and read from concurrently.

The implementation uses two TVar’s containing lists. One for read and one for write access. Due to the semantics of STM this means a write to the queue will never invalidate or block a read and vice versa, making highly concurrent use possible.



In practice, if the read variable is empty, the two must swap contents but this operation is infrequent and thus can be ignored.


Creating a TQueue

Creating an empty queue can be done by using either STM.newTQueue or TQueue.new depending on whether or not you are in a transaction or not.

Writing to the TQueue

Writing to the end of the queue is done by using STM.write:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
  //sampleStart
  val tq = TQueue.new<Int>()
  atomically {
    tq.write(2)
    // or alternatively
    tq += 4
  }
  //sampleEnd
  println("Items in queue ${atomically { tq.flush() }}")
}

It is also possible to write to the front of the queue, but since that accesses the read variable it can lead to worse overall performance:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
  //sampleStart
  val tq = TQueue.new<Int>()
  atomically {
    tq.write(1)
    tq.writeFront(2)
  }
  //sampleEnd
  println("Items in queue ${atomically { tq.flush() }}")
}

Reading items from a TQueue

There are several different ways to read from a TQueue, the most common one being STM.read:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
  //sampleStart
  val tq = TQueue.new<Int>()
  val result = atomically {
    tq.write(2)
    tq.read()
  }
  //sampleEnd
  println("Result $result")
  println("Items in queue ${atomically { tq.flush() }}")
}

Should the queue be empty calling STM.read will cause the transaction to retry and thus wait for items to be added to the queue. This can be avoided using STM.tryRead instead:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
  //sampleStart
  val tq = TQueue.new<Int>()
  val result = atomically {
    tq.tryRead()
  }
  //sampleEnd
  println("Result $result")
  println("Items in queue ${atomically { tq.flush() }}")
}

STM.read also removes the read item from the queue. Alternatively STM.peek will leave the queue unchanged on a read:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
  //sampleStart
  val tq = TQueue.new<Int>()
  val result = atomically {
    tq.write(2)

    tq.peek()
  }
  //sampleEnd
  println("Result $result")
  println("Items in queue ${atomically { tq.flush() }}")
}

As with STM.read will retry should the queue be empty. The alternative STM.tryPeek is there to avoid that:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
  //sampleStart
  val tq = TQueue.new<Int>()
  val result = atomically {
    tq.tryPeek()
  }
  //sampleEnd
  println("Result $result")
  println("Items in queue ${atomically { tq.flush() }}")
}

It is also possible to read the entire list in one go using STM.flush:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
  //sampleStart
  val tq = TQueue.new<Int>()
  val result = atomically {
    tq.write(2)
    tq.write(4)

    tq.flush()
  }
  //sampleEnd
  println("Result $result")
  println("Items in queue ${atomically { tq.flush() }}")
}

Checking a queues size

Checking if a queue is empty can be done by using either STM.isEmpty or STM.isNotEmpty:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
  //sampleStart
  val tq = TQueue.new<Int>()
  val result = atomically {
    tq.isEmpty()
  }
  //sampleEnd
  println("Result $result")
}

Retrieving the actual size of a list can be done using STM.size:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
  //sampleStart
  val tq = TQueue.new<Int>()
  val result = atomically {
    tq.size()
  }
  //sampleEnd
  println("Result $result")
}


All three of these methods have to access both the write and read end of a [TQueue](index.html) and thus can increase contention. Use them sparingly!


Removing elements from a TQueue

It is also possible to remove elements from a TQueue using STM.removeAll:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
  //sampleStart
  val tq = TQueue.new<Int>()
  atomically {
    tq.write(0)
    tq.removeAll { it != 0 }
  }
  //sampleEnd
  println("Items in queue ${atomically { tq.flush() }}")
}


This method also access both ends of the queue and thus should be used infrequently to avoid contention.


Types

Name Summary
Companion common object Companion

Do you like Arrow?

Arrow Org
<