KState-Saga

Coroutine-friendly saga pattern implementation for distributed transactions.

Overview

KState-Saga is a lightweight, coroutine-friendly library for implementing the saga pattern in Kotlin applications. It provides async/await support for distributed transactions with compensating actions, making it ideal for microservices and distributed systems.

Key Features

  • Coroutine Support: All operations are suspend functions for non-blocking async execution

  • Type-Safe DSL: Fluent, type-safe API for defining saga workflows

  • Compensation Logic: Automatic rollback with LIFO compensation execution

  • Stateful Sagas: Typed state management with "smart compensation"

  • Event Monitoring: Comprehensive lifecycle events for observability

  • Zero Coupling: No external dependencies beyond Kotlin coroutines

Saga Types

KState-Saga provides two saga implementations:

FeatureSagaExecutorStatefulSagaExecutor
State ManagementContext object onlyTyped shared state + context
Compensation AccessStep result onlyStep result + saga state
Smart CompensationManual via contextBuilt-in via state property
Best ForSimple workflowsComplex multi-step transactions

Installation

Gradle (Kotlin DSL)

dependencies {
implementation("ca.acendas:kstate-saga:1.6.1")
}

Gradle (Groovy)

dependencies {
implementation 'ca.acendas:kstate-saga:1.6.1'
}

Maven

<dependency>
<groupId>ca.acendas</groupId>
<artifactId>kstate-saga</artifactId>
<version>1.6.1</version>
</dependency>

Quick Start

import ca.acendas.kstate.saga.*
import kotlinx.coroutines.*

// Define your context and result types
data class OrderContext(val items: List<String>, val amount: Double)
data class OrderResult(val orderId: String)

// Create a saga with async operations
val orderSaga = sagaExecutor<OrderContext, OrderResult> {
step("reserve-inventory") { context ->
// Async inventory reservation
inventoryService.reserve(context.items)
}
compensate { result ->
// Async compensation
inventoryService.release(result.reservationId)
}

step("charge-payment") { context ->
// Async payment processing
paymentService.charge(context.amount)
}
compensate { result ->
// Async refund
paymentService.refund(result.transactionId)
}
idempotent(true)
}

// Execute the saga within a coroutine
runBlocking {
val result = orderSaga.execute(OrderContext(
items = listOf("item1", "item2"),
amount = 99.99
))

when (result) {
is SagaResult.Completed -> println("Order completed: ${result.value}")
is SagaResult.Aborted -> println("Order aborted: ${result.error}")
is SagaResult.CompensationFailure -> println("Critical failure")
}
}

Core Components

SagaExecutor

Orchestrates saga execution with suspend functions:

interface SagaExecutor<C, R> {
suspend fun execute(context: C): SagaResult<R>
fun addMonitor(monitor: SagaMonitor)
fun removeMonitor(monitor: SagaMonitor)
}

SagaStep

Defines a saga step with async forward and compensation logic:

data class SagaStep<C, R>(
val name: String,
val forward: suspend (C) -> R,
val compensation: (suspend (R) -> Unit)? = null,
val idempotent: Boolean = false
)

SagaResult

Sealed type representing saga outcomes:

sealed class SagaResult<out T> {
data class Completed<T>(val value: T) : SagaResult<T>()
data class Aborted(val error: SagaExecutionError) : SagaResult<Nothing>()
data class CompensationFailure(
val originalError: SagaExecutionError,
val compensationErrors: List<SagaExecutionError>
) : SagaResult<Nothing>()
}

Advanced Features

Monitoring and Observability

Add monitors to track saga execution:

val saga = sagaExecutor<Context, Result> {
// ... steps ...

monitor { event ->
when (event) {
is SagaEvent.StepStarted -> logger.info("Starting ${event.stepName}")
is SagaEvent.StepCompleted -> logger.info("Completed ${event.stepName}")
is SagaEvent.StepFailed -> logger.error("Failed ${event.stepName}", event.error)
else -> logger.debug("Event: $event")
}
}
}

Idempotent Steps

Mark steps as idempotent for safe retries:

step("process-payment") { context ->
paymentService.processIdempotent(context.amount, context.idempotencyKey)
}
idempotent(true)

Stateful Sagas - Smart Compensation

StatefulSagaExecutor provides typed state management that enables "smart compensation" - where compensation logic can make decisions based on the execution progress of later steps.

Outcome-Based Control Flow

Steps use explicit outcomes that immediately exit the step lambda:

  • completes with result - Step succeeded, continue to next step

  • fails with reason - Step failed, trigger compensation (supports String or Enum)

  • skip with reason - Skip remaining steps, return last result (no compensation)

Important: These DSL constructs automatically exit the step - no return statement needed:

first call PaymentSteps.AUTHORIZE with { cart ->
val result = authorizePayment(cart)

if (!result.approved) {
fails with FailureReason.CARD_DECLINED // Immediately exits & triggers compensation
// Code here is UNREACHABLE - no return needed!
}

// Only runs if approved
completes with result
}

The Problem with Stateless Sagas

In traditional sagas, compensation logic only knows about its own step's result:

// Problem: Step 1's compensation doesn't know if Step 3 captured the escrow
first call "take-payment" with { cart ->
val payment = takePayment(cart.amount)
PaymentResult(payment.id)
} otherwise { result ->
// How do we know if escrow was captured by a later step?
// We don't! We have to guess or use external state.
}

The Solution: Stateful Sagas with Outcome-Based Flow

StatefulSagaExecutor solves this by providing shared, typed state with explicit outcomes:

// Define your state
data class PaymentState(
val paymentAmount: Long = 0L,
val escrowCaptured: Boolean = false,
val ticketsPrinted: Boolean = false
)

// Define type-safe enums for step names and reasons
enum class PaymentSteps { TAKE_PAYMENT, PRINT_TICKETS, CAPTURE_ESCROW, SUBMIT }
enum class FailureReason { PAYMENT_DECLINED, PRINTER_JAM, BACKEND_ERROR }
enum class SkipReason { ALREADY_PROCESSED, DUPLICATE_REQUEST }

// Create a stateful saga with outcome-based control flow
val saga = sagaExecutor<Cart, Transaction, PaymentState>(PaymentState()) {
first call PaymentSteps.TAKE_PAYMENT with { cart ->
// Check for duplicate - skip without compensation
if (cart.alreadyProcessed) skip with SkipReason.ALREADY_PROCESSED

val payment = cashManager.takePayment(cart.totalCents)

// Explicit failure with enum reason
if (payment == null) fails with FailureReason.PAYMENT_DECLINED

updateState { it.copy(paymentAmount = payment.amount) }
completes with createTransaction(cart, payment)
} otherwise {
// Smart compensation: `state` sees ALL updates including from later steps
when {
state.escrowCaptured -> {
// Step 3 captured funds - must dispense physical refund
cashManager.dispenseCashRefund(state.paymentAmount)
}
state.paymentAmount > 0 -> {
// Funds in escrow but not captured - just return them
cashManager.returnEscrow()
}
else -> {
// No payment received - just disable acceptors
cashManager.disableAcceptors()
}
}
}

then call PaymentSteps.PRINT_TICKETS with { cart ->
val printResult = printer.printTickets(cart.items)
if (!printResult.success) fails with FailureReason.PRINTER_JAM

updateState { it.copy(ticketsPrinted = true) }
completes with result!! // Pass through previous result
} otherwise {
if (state.ticketsPrinted) {
ticketService.voidTickets(result!!.id)
}
}

then call PaymentSteps.CAPTURE_ESCROW with { cart ->
cashManager.captureEscrow()
updateState { it.copy(escrowCaptured = true) }
completes with result!!
} otherwise {
// This runs before Step 1's compensation
cashManager.prepareForRefund()
}

then call PaymentSteps.SUBMIT with { cart ->
val submitResult = gateway.submitTransaction(result!!)
if (!submitResult.success) fails with FailureReason.BACKEND_ERROR

completes with result!!
}

watching events { event ->
when (event) {
is SagaEvent.StepSkipped -> logger.info("Skipped: ${event.stepName} - ${event.reason}")
else -> logger.info("Saga: $event")
}
}
}

How Smart Compensation Works

When a saga fails, compensations run in reverse order (LIFO):

Step 1: Take Payment     ✓ (completed)
Step 2: Print Tickets ✓ (completed)
Step 3: Capture Escrow ✓ (completed)
Step 4: Submit Backend ✗ (FAILED)

Compensation order:
1. Step 3 compensation (escrowCaptured = true)
2. Step 2 compensation (escrowCaptured = true, ticketsPrinted = true)
3. Step 1 compensation (escrowCaptured = true) → dispenses refund!

Step 1's compensation sees state.escrowCaptured = true because Step 3 updated the state before the failure occurred.

StatefulStepScope API

Forward actions receive a StatefulStepScope with outcome builders that immediately exit:

first call PaymentSteps.VALIDATE with { context ->
// Read current state
val current = state

// Update state atomically
updateState { it.copy(paymentAmount = 100L) }

// Access previous step's result (null for first step)
val previousResult = result

// Explicit failure - IMMEDIATELY exits and triggers compensation
if (!context.valid) fails with FailureReason.INVALID_ORDER
// Code after fails/skip/completes is UNREACHABLE

// Explicit skip - IMMEDIATELY exits, returns last result, no compensation
if (context.alreadyProcessed) skip with SkipReason.DUPLICATE

// Explicit success - IMMEDIATELY exits and continues to next step
completes with MyResult("success")
}

Note: Unlike traditional Kotlin lambdas, you don't need return@with after fails with, skip with, or completes with. These DSL constructs use internal signals to immediately exit the step.

Outcome Types

OutcomeEffectCompensation
completes with resultContinue to next stepN/A
fails with reasonAbort sagaTriggered (LIFO order)
skip with reasonReturn last resultNot triggered

CompensationScope API

Compensation actions receive a CompensationScope with:

otherwise {
// Result from THIS step's forward action
val stepResult = result

// Current state (includes updates from ALL completed steps)
val currentState = state

// Make smart decisions
when {
state.escrowCaptured -> refund()
state.paymentAmount > 0 -> returnEscrow()
else -> cleanup()
}
}

StatefulSagaResult

Stateful saga execution returns StatefulSagaResult<R, S>:

when (val result = saga.execute(cart)) {
is StatefulSagaResult.Completed -> {
// All steps succeeded
val transaction = result.value
val finalState = result.finalState // State after all steps
println("Success: $transaction, state: $finalState")
}

is StatefulSagaResult.Aborted -> {
// Step failed, compensations succeeded
val error = result.error
val stateAtFailure = result.finalState
println("Failed at ${error.stepName}: ${error.message}")
}

is StatefulSagaResult.CompensationFailure -> {
// Step failed AND compensation failed - critical!
val originalError = result.originalError
val compensationErrors = result.compensationErrors
val stateAtFailure = result.finalState
alertOps("Manual intervention required!")
}
}

Real-World Example: Payment Transaction

Here's a complete example modeling a payment kiosk transaction:

data class PaymentState(
val transactionSeqNum: String? = null,
val paymentAmount: Long = 0L,
val escrowCaptured: Boolean = false,
val ticketsPrinted: Boolean = false
)

data class Cart(val items: List<Item>, val totalCents: Long)
data class Transaction(val id: String, val cart: Cart)

suspend fun processPayment(cart: Cart): Result<Transaction> {
val saga = sagaExecutor<Cart, Transaction, PaymentState>(PaymentState()) {
// Step 1: Authorize card payment
first call "Authorize Payment" with { cart ->
val result = cardReader.purchase(cart.totalCents)
updateState { it.copy(transactionSeqNum = result.seqNum) }
Transaction(result.transactionId, cart)
} otherwise {
state.transactionSeqNum?.let { seqNum ->
cardReader.reversePurchase(seqNum)
}
cardReader.disable()
}

// Step 2: Print tickets
then call "Print Tickets" with { cart ->
cart.items.forEach { item ->
printer.printTicket(item)
}
updateState { it.copy(ticketsPrinted = true) }
result!!
} otherwise {
if (state.ticketsPrinted) {
ticketService.voidTickets(result!!.id)
}
}

// Step 3: Capture payment
then call "Capture Payment" with { cart ->
cardReader.acceptPurchase(result!!.id, cart.totalCents)
updateState { it.copy(escrowCaptured = true) }
result!!
} otherwise {
if (state.escrowCaptured) {
cardReader.reversePurchase(state.transactionSeqNum!!)
}
}

// Step 4: Submit to backend
then call "Submit Backend" with { cart ->
gateway.submitTransaction(result!!)
result!!
} otherwise {
gateway.notifyTransactionFailure(result!!.id, "Saga rollback")
}

watching events { event ->
logger.debug("Saga event: $event")
}
}

return when (val sagaResult = saga.execute(cart)) {
is StatefulSagaResult.Completed -> Result.success(sagaResult.value)
is StatefulSagaResult.Aborted -> Result.failure(
Exception("Transaction failed at ${sagaResult.error.stepName}")
)
is StatefulSagaResult.CompensationFailure -> {
alertOps("Compensation failed - manual intervention required")
Result.failure(Exception("Critical failure: ${sagaResult.originalError.message}"))
}
}
}

When to Use StatefulSaga vs Regular Saga

Use CaseRecommended
Simple 2-3 step workflowsSagaExecutor
No compensation dependenciesSagaExecutor
Compensation depends on later stepsStatefulSagaExecutor
Payment/financial transactionsStatefulSagaExecutor
Hardware state trackingStatefulSagaExecutor
Complex rollback logicStatefulSagaExecutor

Coroutine Integration

KState-Saga is designed to work seamlessly with Kotlin coroutines:

// Use different dispatchers for different steps
suspend fun executeOrder() = withContext(Dispatchers.IO) {
val saga = sagaExecutor<OrderContext, OrderResult> {
step("io-operation") { context ->
withContext(Dispatchers.IO) {
// I/O bound operation
database.saveOrder(context)
}
}

step("cpu-operation") { context ->
withContext(Dispatchers.Default) {
// CPU bound operation
processOrderData(context)
}
}
}

saga.execute(orderContext)
}

Migration from KState

If you're migrating from the original KState library with saga support:

  1. Add kstate-saga dependency

  2. Update saga calls to use runBlocking or launch within coroutines

  3. Your existing saga definitions will work with minimal changes

// Before (KState with saga)
val result = saga.execute(context)

// After (KState-Saga)
runBlocking {
val result = saga.execute(context)
}

Documentation

Version

Current version: v1.6.1

License

MIT License - see LICENSE

Packages

Link copied to clipboard