//arrow-fx-coroutines/arrow.fx.coroutines/CircuitBreaker

CircuitBreaker

common class CircuitBreaker

A CircuitBreaker is used to protect resources or services from being overloaded When a service is being overloaded, interacting with it more will only worsen its overloaded state. Especially when combined with retry mechanisms such as Schedule, in some cases simply using a back-off retry policy might not be sufficient during peak traffics.

To allow such overloaded resources from overloading, CircuitBreaker can help you protect the service by failing-fast. Thus CircuitBreaker helps us to achieve stability and prevent cascading failures in distributed systems.

CircuitBreaker has three CircuitBreaker.State:

  1. [Closed](-state/-closed/index.html): This is its normal state, where requests are being made. The state in which [CircuitBreaker](index.html) starts.
    • When an exception occurs it increments the failure counter
    • A successful request will reset the failure counter to zero
    • When the failure counter reaches the maxFailures threshold, the breaker is tripped into the [Open](-state/-open/index.html) state
  2. [Open](-state/-open/index.html): The [CircuitBreaker](index.html) will short-circuit/fail-fast all requests
    • All requests short-circuit/fail-fast with `ExecutionRejected`
    • If a request is made after the configured resetTimeout passes, the [CircuitBreaker](index.html) is tripped into the a [HalfOpen](-state/-half-open/index.html) state, allowing one request to go through as a test.
  3. [HalfOpen](-state/-half-open/index.html): The [CircuitBreaker](index.html) is in this state while it's allowing a request to go through, as as a `test request`
    • All other requests made while `test request` is still running will short-circuit/fail-fast.
    • If the `test request` succeeds then the [CircuitBreaker](index.html) is tripped back into [Closed](-state/-closed/index.html), with the resetTimeout and the failures count also reset to initial values.
    • If the `test request` fails, then the [CircuitBreaker](index.html) is tripped back into [Open](-state/-open/index.html), the resetTimeout is multiplied by the exponentialBackoffFactor, up to the configured maxResetTimeout.

Let’s say we’d want to create a CircuitBreaker that only allows us to call a remote service twice, and then whenever more than two requests fail with an exception, the circuit breaker starts short-circuiting failing-fast.

import arrow.core.Either
import arrow.core.flatten
import arrow.fx.coroutines.CircuitBreaker
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.delay

@ExperimentalTime
suspend fun main(): Unit {
//sampleStart
  val circuitBreaker = CircuitBreaker.of(
    maxFailures = 2,
    resetTimeout = Duration.seconds(2),
    exponentialBackoffFactor = 1.2,
    maxResetTimeout = Duration.seconds(60),
  )
  circuitBreaker.protectOrThrow { "I am in Closed: ${circuitBreaker.state()}" }.also(::println)

  println("Service getting overloaded . . .")

  Either.catch { circuitBreaker.protectOrThrow { throw RuntimeException("Service overloaded") } }.also(::println)
  Either.catch { circuitBreaker.protectOrThrow { throw RuntimeException("Service overloaded") } }.also(::println)
  circuitBreaker.protectEither { }.also { println("I am Open and short-circuit with ${it}. ${circuitBreaker.state()}") }

  println("Service recovering . . .").also { delay(2000) }

  circuitBreaker.protectOrThrow { "I am running test-request in HalfOpen: ${circuitBreaker.state()}" }.also(::println)
  println("I am back to normal state closed ${circuitBreaker.state()}")
//sampleEnd
}

A common pattern to make fault-tolerant/resilient systems is to compose a CircuitBreaker with a backing-off policy retry Schedule to guarantee not overloading the resource and the client interacting with it. but also not the client that is interacting with the resource. Below you can see how the simple retry function will result in Either.Left<CircuitBreaker.RejectedExecution>, but when we combine it with another schedule, it will always call the CircuitBreaker on times that it could’ve entered the HalfOpen state. The reason why Schedule is not sufficient to make your system resilient is because you also have to take into account parallel calls to your functions, ; In contrast, a CircuitBreaker can track failures of every function call or even different functions to the same resource or service.

import arrow.core.Either
import arrow.fx.coroutines.CircuitBreaker
import arrow.fx.coroutines.Schedule
import arrow.fx.coroutines.retry
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.delay

@ExperimentalTime
suspend fun main(): Unit {
  suspend fun apiCall(): Unit {
    println("apiCall . . .")
    throw RuntimeException("Overloaded service")
  }

  //sampleStart
  val circuitBreaker = CircuitBreaker.of(
    maxFailures = 2,
    resetTimeout = seconds(2),
    exponentialBackoffFactor = 2.0, // enable exponentialBackoffFactor
    maxResetTimeout = seconds(60), // limit exponential back-off time
  )

  suspend fun <A> resilient(schedule: Schedule<Throwable, *>, f: suspend () -> A): A =
    schedule.retry { circuitBreaker.protectOrThrow(f) }

  Either.catch {
    resilient(Schedule.recurs(5), ::apiCall)
  }.let { println("recurs(5) apiCall twice and 4x short-circuit result from CircuitBreaker: $it") }

  delay(2000)
  println("CircuitBreaker ready to half-open")

  // Retry once and when the CircuitBreaker opens after 2 failures then retry with exponential back-off with same time as CircuitBreaker's resetTimeout
  val fiveTimesWithBackOff = Schedule.recurs<Throwable>(1) andThen
    Schedule.exponential(seconds(2)) and Schedule.recurs(5)

  Either.catch {
    resilient(fiveTimesWithBackOff, ::apiCall)
  }.let { println("exponential(seconds(2)) and recurs(5) always retries with actual apiCall: $it") }
  //sampleEnd
}

Types

Name Summary
Companion common object Companion
ExecutionRejected common class ExecutionRejected(reason: String, state: CircuitBreaker.State) : Throwable
State common sealed class State
The initial state when initializing a CircuitBreaker is Closed.

Functions

Name Summary
awaitClose common suspend fun awaitClose()
Awaits for this CircuitBreaker to be CircuitBreaker.State.Closed.
doOnClosed common fun doOnClosed(callback: suspend () -> Unit): CircuitBreaker
Returns a new circuit breaker that wraps the state of the source and that will fire the given callback upon the circuit breaker transitioning to the CircuitBreaker.Closed state.
doOnHalfOpen common fun doOnHalfOpen(callback: suspend () -> Unit): CircuitBreaker
Returns a new circuit breaker that wraps the state of the source and that will fire the given callback upon the circuit breaker transitioning to the CircuitBreaker.HalfOpen state.
doOnOpen common fun doOnOpen(callback: suspend () -> Unit): CircuitBreaker
Returns a new circuit breaker that wraps the state of the source and that will fire the given callback upon the circuit breaker transitioning to the CircuitBreaker.Open state.
doOnRejectedTask common fun doOnRejectedTask(callback: suspend () -> Unit): CircuitBreaker
Returns a new circuit breaker that wraps the state of the source and that upon a task being rejected will execute the given callback.
protectEither common suspend fun <A> protectEither(fa: suspend () -> A): Either<CircuitBreaker.ExecutionRejected, A>
Returns a new task that upon execution will execute the given task, but with the protection of this circuit breaker. If an exception in fa occurs, other than an ExecutionRejected exception, it will be rethrown.
protectOrThrow common suspend tailrec fun <A> protectOrThrow(fa: suspend () -> A): A
Returns a new task that upon execution will execute the given task, but with the protection of this circuit breaker. If an exception in fa occurs it will be rethrown
state common suspend fun state(): CircuitBreaker.State
Returns the current CircuitBreaker.State, meant for debugging purposes.

Do you like Arrow?

Arrow Org
<