KState-Saga
Coroutine-friendly saga pattern implementation for distributed transactions.
Overview
KState-Saga is a lightweight, coroutine-friendly library for implementing the saga pattern in Kotlin applications. It provides async/await support for distributed transactions with compensating actions, making it ideal for microservices and distributed systems.
Key Features
Coroutine Support: All operations are suspend functions for non-blocking async execution
Type-Safe DSL: Fluent, type-safe API for defining saga workflows
Compensation Logic: Automatic rollback with LIFO compensation execution
Stateful Sagas: Typed state management with "smart compensation"
Event Monitoring: Comprehensive lifecycle events for observability
Zero Coupling: No external dependencies beyond Kotlin coroutines
Saga Types
KState-Saga provides two saga implementations:
| Feature | SagaExecutor | StatefulSagaExecutor |
|---|---|---|
| State Management | Context object only | Typed shared state + context |
| Compensation Access | Step result only | Step result + saga state |
| Smart Compensation | Manual via context | Built-in via state property |
| Best For | Simple workflows | Complex multi-step transactions |
Installation
Gradle (Kotlin DSL)
dependencies {
implementation("ca.acendas:kstate-saga:1.6.1")
}Gradle (Groovy)
dependencies {
implementation 'ca.acendas:kstate-saga:1.6.1'
}Maven
<dependency>
<groupId>ca.acendas</groupId>
<artifactId>kstate-saga</artifactId>
<version>1.6.1</version>
</dependency>Quick Start
import ca.acendas.kstate.saga.*
import kotlinx.coroutines.*
// Define your context and result types
data class OrderContext(val items: List<String>, val amount: Double)
data class OrderResult(val orderId: String)
// Create a saga with async operations
val orderSaga = sagaExecutor<OrderContext, OrderResult> {
step("reserve-inventory") { context ->
// Async inventory reservation
inventoryService.reserve(context.items)
}
compensate { result ->
// Async compensation
inventoryService.release(result.reservationId)
}
step("charge-payment") { context ->
// Async payment processing
paymentService.charge(context.amount)
}
compensate { result ->
// Async refund
paymentService.refund(result.transactionId)
}
idempotent(true)
}
// Execute the saga within a coroutine
runBlocking {
val result = orderSaga.execute(OrderContext(
items = listOf("item1", "item2"),
amount = 99.99
))
when (result) {
is SagaResult.Completed -> println("Order completed: ${result.value}")
is SagaResult.Aborted -> println("Order aborted: ${result.error}")
is SagaResult.CompensationFailure -> println("Critical failure")
}
}Core Components
SagaExecutor
Orchestrates saga execution with suspend functions:
interface SagaExecutor<C, R> {
suspend fun execute(context: C): SagaResult<R>
fun addMonitor(monitor: SagaMonitor)
fun removeMonitor(monitor: SagaMonitor)
}SagaStep
Defines a saga step with async forward and compensation logic:
data class SagaStep<C, R>(
val name: String,
val forward: suspend (C) -> R,
val compensation: (suspend (R) -> Unit)? = null,
val idempotent: Boolean = false
)SagaResult
Sealed type representing saga outcomes:
sealed class SagaResult<out T> {
data class Completed<T>(val value: T) : SagaResult<T>()
data class Aborted(val error: SagaExecutionError) : SagaResult<Nothing>()
data class CompensationFailure(
val originalError: SagaExecutionError,
val compensationErrors: List<SagaExecutionError>
) : SagaResult<Nothing>()
}Advanced Features
Monitoring and Observability
Add monitors to track saga execution:
val saga = sagaExecutor<Context, Result> {
// ... steps ...
monitor { event ->
when (event) {
is SagaEvent.StepStarted -> logger.info("Starting ${event.stepName}")
is SagaEvent.StepCompleted -> logger.info("Completed ${event.stepName}")
is SagaEvent.StepFailed -> logger.error("Failed ${event.stepName}", event.error)
else -> logger.debug("Event: $event")
}
}
}Idempotent Steps
Mark steps as idempotent for safe retries:
step("process-payment") { context ->
paymentService.processIdempotent(context.amount, context.idempotencyKey)
}
idempotent(true)Stateful Sagas - Smart Compensation
StatefulSagaExecutor provides typed state management that enables "smart compensation" - where compensation logic can make decisions based on the execution progress of later steps.
Outcome-Based Control Flow
Steps use explicit outcomes that immediately exit the step lambda:
completes with result- Step succeeded, continue to next stepfails with reason- Step failed, trigger compensation (supports String or Enum)skip with reason- Skip remaining steps, return last result (no compensation)
Important: These DSL constructs automatically exit the step - no return statement needed:
first call PaymentSteps.AUTHORIZE with { cart ->
val result = authorizePayment(cart)
if (!result.approved) {
fails with FailureReason.CARD_DECLINED // Immediately exits & triggers compensation
// Code here is UNREACHABLE - no return needed!
}
// Only runs if approved
completes with result
}The Problem with Stateless Sagas
In traditional sagas, compensation logic only knows about its own step's result:
// Problem: Step 1's compensation doesn't know if Step 3 captured the escrow
first call "take-payment" with { cart ->
val payment = takePayment(cart.amount)
PaymentResult(payment.id)
} otherwise { result ->
// How do we know if escrow was captured by a later step?
// We don't! We have to guess or use external state.
}The Solution: Stateful Sagas with Outcome-Based Flow
StatefulSagaExecutor solves this by providing shared, typed state with explicit outcomes:
// Define your state
data class PaymentState(
val paymentAmount: Long = 0L,
val escrowCaptured: Boolean = false,
val ticketsPrinted: Boolean = false
)
// Define type-safe enums for step names and reasons
enum class PaymentSteps { TAKE_PAYMENT, PRINT_TICKETS, CAPTURE_ESCROW, SUBMIT }
enum class FailureReason { PAYMENT_DECLINED, PRINTER_JAM, BACKEND_ERROR }
enum class SkipReason { ALREADY_PROCESSED, DUPLICATE_REQUEST }
// Create a stateful saga with outcome-based control flow
val saga = sagaExecutor<Cart, Transaction, PaymentState>(PaymentState()) {
first call PaymentSteps.TAKE_PAYMENT with { cart ->
// Check for duplicate - skip without compensation
if (cart.alreadyProcessed) skip with SkipReason.ALREADY_PROCESSED
val payment = cashManager.takePayment(cart.totalCents)
// Explicit failure with enum reason
if (payment == null) fails with FailureReason.PAYMENT_DECLINED
updateState { it.copy(paymentAmount = payment.amount) }
completes with createTransaction(cart, payment)
} otherwise {
// Smart compensation: `state` sees ALL updates including from later steps
when {
state.escrowCaptured -> {
// Step 3 captured funds - must dispense physical refund
cashManager.dispenseCashRefund(state.paymentAmount)
}
state.paymentAmount > 0 -> {
// Funds in escrow but not captured - just return them
cashManager.returnEscrow()
}
else -> {
// No payment received - just disable acceptors
cashManager.disableAcceptors()
}
}
}
then call PaymentSteps.PRINT_TICKETS with { cart ->
val printResult = printer.printTickets(cart.items)
if (!printResult.success) fails with FailureReason.PRINTER_JAM
updateState { it.copy(ticketsPrinted = true) }
completes with result!! // Pass through previous result
} otherwise {
if (state.ticketsPrinted) {
ticketService.voidTickets(result!!.id)
}
}
then call PaymentSteps.CAPTURE_ESCROW with { cart ->
cashManager.captureEscrow()
updateState { it.copy(escrowCaptured = true) }
completes with result!!
} otherwise {
// This runs before Step 1's compensation
cashManager.prepareForRefund()
}
then call PaymentSteps.SUBMIT with { cart ->
val submitResult = gateway.submitTransaction(result!!)
if (!submitResult.success) fails with FailureReason.BACKEND_ERROR
completes with result!!
}
watching events { event ->
when (event) {
is SagaEvent.StepSkipped -> logger.info("Skipped: ${event.stepName} - ${event.reason}")
else -> logger.info("Saga: $event")
}
}
}How Smart Compensation Works
When a saga fails, compensations run in reverse order (LIFO):
Step 1: Take Payment ✓ (completed)
Step 2: Print Tickets ✓ (completed)
Step 3: Capture Escrow ✓ (completed)
Step 4: Submit Backend ✗ (FAILED)
Compensation order:
1. Step 3 compensation (escrowCaptured = true)
2. Step 2 compensation (escrowCaptured = true, ticketsPrinted = true)
3. Step 1 compensation (escrowCaptured = true) → dispenses refund!Step 1's compensation sees state.escrowCaptured = true because Step 3 updated the state before the failure occurred.
StatefulStepScope API
Forward actions receive a StatefulStepScope with outcome builders that immediately exit:
first call PaymentSteps.VALIDATE with { context ->
// Read current state
val current = state
// Update state atomically
updateState { it.copy(paymentAmount = 100L) }
// Access previous step's result (null for first step)
val previousResult = result
// Explicit failure - IMMEDIATELY exits and triggers compensation
if (!context.valid) fails with FailureReason.INVALID_ORDER
// Code after fails/skip/completes is UNREACHABLE
// Explicit skip - IMMEDIATELY exits, returns last result, no compensation
if (context.alreadyProcessed) skip with SkipReason.DUPLICATE
// Explicit success - IMMEDIATELY exits and continues to next step
completes with MyResult("success")
}Note: Unlike traditional Kotlin lambdas, you don't need return@with after fails with, skip with, or completes with. These DSL constructs use internal signals to immediately exit the step.
Outcome Types
| Outcome | Effect | Compensation |
|---|---|---|
completes with result | Continue to next step | N/A |
fails with reason | Abort saga | Triggered (LIFO order) |
skip with reason | Return last result | Not triggered |
CompensationScope API
Compensation actions receive a CompensationScope with:
otherwise {
// Result from THIS step's forward action
val stepResult = result
// Current state (includes updates from ALL completed steps)
val currentState = state
// Make smart decisions
when {
state.escrowCaptured -> refund()
state.paymentAmount > 0 -> returnEscrow()
else -> cleanup()
}
}StatefulSagaResult
Stateful saga execution returns StatefulSagaResult<R, S>:
when (val result = saga.execute(cart)) {
is StatefulSagaResult.Completed -> {
// All steps succeeded
val transaction = result.value
val finalState = result.finalState // State after all steps
println("Success: $transaction, state: $finalState")
}
is StatefulSagaResult.Aborted -> {
// Step failed, compensations succeeded
val error = result.error
val stateAtFailure = result.finalState
println("Failed at ${error.stepName}: ${error.message}")
}
is StatefulSagaResult.CompensationFailure -> {
// Step failed AND compensation failed - critical!
val originalError = result.originalError
val compensationErrors = result.compensationErrors
val stateAtFailure = result.finalState
alertOps("Manual intervention required!")
}
}Real-World Example: Payment Transaction
Here's a complete example modeling a payment kiosk transaction:
data class PaymentState(
val transactionSeqNum: String? = null,
val paymentAmount: Long = 0L,
val escrowCaptured: Boolean = false,
val ticketsPrinted: Boolean = false
)
data class Cart(val items: List<Item>, val totalCents: Long)
data class Transaction(val id: String, val cart: Cart)
suspend fun processPayment(cart: Cart): Result<Transaction> {
val saga = sagaExecutor<Cart, Transaction, PaymentState>(PaymentState()) {
// Step 1: Authorize card payment
first call "Authorize Payment" with { cart ->
val result = cardReader.purchase(cart.totalCents)
updateState { it.copy(transactionSeqNum = result.seqNum) }
Transaction(result.transactionId, cart)
} otherwise {
state.transactionSeqNum?.let { seqNum ->
cardReader.reversePurchase(seqNum)
}
cardReader.disable()
}
// Step 2: Print tickets
then call "Print Tickets" with { cart ->
cart.items.forEach { item ->
printer.printTicket(item)
}
updateState { it.copy(ticketsPrinted = true) }
result!!
} otherwise {
if (state.ticketsPrinted) {
ticketService.voidTickets(result!!.id)
}
}
// Step 3: Capture payment
then call "Capture Payment" with { cart ->
cardReader.acceptPurchase(result!!.id, cart.totalCents)
updateState { it.copy(escrowCaptured = true) }
result!!
} otherwise {
if (state.escrowCaptured) {
cardReader.reversePurchase(state.transactionSeqNum!!)
}
}
// Step 4: Submit to backend
then call "Submit Backend" with { cart ->
gateway.submitTransaction(result!!)
result!!
} otherwise {
gateway.notifyTransactionFailure(result!!.id, "Saga rollback")
}
watching events { event ->
logger.debug("Saga event: $event")
}
}
return when (val sagaResult = saga.execute(cart)) {
is StatefulSagaResult.Completed -> Result.success(sagaResult.value)
is StatefulSagaResult.Aborted -> Result.failure(
Exception("Transaction failed at ${sagaResult.error.stepName}")
)
is StatefulSagaResult.CompensationFailure -> {
alertOps("Compensation failed - manual intervention required")
Result.failure(Exception("Critical failure: ${sagaResult.originalError.message}"))
}
}
}When to Use StatefulSaga vs Regular Saga
| Use Case | Recommended |
|---|---|
| Simple 2-3 step workflows | SagaExecutor |
| No compensation dependencies | SagaExecutor |
| Compensation depends on later steps | StatefulSagaExecutor |
| Payment/financial transactions | StatefulSagaExecutor |
| Hardware state tracking | StatefulSagaExecutor |
| Complex rollback logic | StatefulSagaExecutor |
Coroutine Integration
KState-Saga is designed to work seamlessly with Kotlin coroutines:
// Use different dispatchers for different steps
suspend fun executeOrder() = withContext(Dispatchers.IO) {
val saga = sagaExecutor<OrderContext, OrderResult> {
step("io-operation") { context ->
withContext(Dispatchers.IO) {
// I/O bound operation
database.saveOrder(context)
}
}
step("cpu-operation") { context ->
withContext(Dispatchers.Default) {
// CPU bound operation
processOrderData(context)
}
}
}
saga.execute(orderContext)
}Migration from KState
If you're migrating from the original KState library with saga support:
Add kstate-saga dependency
Update saga calls to use
runBlockingor launch within coroutinesYour existing saga definitions will work with minimal changes
// Before (KState with saga)
val result = saga.execute(context)
// After (KState-Saga)
runBlocking {
val result = saga.execute(context)
}Documentation
Version
Current version: v1.6.1
License
MIT License - see LICENSE