FlowBasedSagaExecutor

class FlowBasedSagaExecutor<C, R>(delegate: SagaExecutor<C, R>) : SagaExecutor<C, R>

Flow-based saga executor that emits events as a reactive stream.

This executor wraps a standard SagaExecutor and provides Flow-based event streaming for reactive consumption of saga lifecycle events.

Usage Example

val executor = sagaExecutor<OrderContext, OrderResult> {
step("reserve") { context -> /* ... */}
step("charge") { context -> /* ... */}
}.asFlow()

// Collect events reactively
launch {
executor.events.collect { event ->
println("Event: $event")
}
}

// Execute saga
val result = executor.execute(context)

Parameters

C

The type of context passed to saga steps

R

The type of result produced by saga steps

Constructors

Link copied to clipboard
constructor(delegate: SagaExecutor<C, R>)

Properties

Link copied to clipboard
val events: SharedFlow<SagaEvent>

Reactive stream of saga events.

Functions

Link copied to clipboard
open override fun addInterceptor(interceptor: SagaInterceptor<*, *, Unit>)

Add an interceptor to observe and potentially veto saga step execution.

Link copied to clipboard
open override fun addMonitor(monitor: SagaMonitor)

Add a monitor to observe saga events.

Link copied to clipboard

Extension function to convert a SagaExecutor to a FlowBasedSagaExecutor.

Link copied to clipboard
infix fun SagaExecutor<*, *>.audit(id: String): AuditCommand

Entry point for all audit DSL forms.

Link copied to clipboard
open suspend override fun execute(context: C): SagaResult<R>

Execute the saga with the given context.

open suspend override fun execute(context: C, coroutineContext: CoroutineContext, timeout: Duration?): SagaResult<R>

Execute the saga with the given context, coroutine context, and optional timeout.

open suspend fun execute(context: C, runId: RunId, coroutineContext: CoroutineContext, timeout: Duration?): SagaResult<R>

Execute the saga with an explicit runId for journal-aware runs.

Link copied to clipboard
fun <C, R> SagaExecutor<C, R>.executeToFlow(context: C, coroutineContext: CoroutineContext = EmptyCoroutineContext, timeout: Duration? = null): Flow<SagaEvent>

Execute saga and return a Flow that emits all events followed by a terminal result event.

Link copied to clipboard
open override fun removeInterceptor(interceptor: SagaInterceptor<*, *, Unit>)

Remove a previously registered interceptor.

Link copied to clipboard
open override fun removeMonitor(monitor: SagaMonitor)

Remove a monitor.

Link copied to clipboard
suspend fun <C, R, P> SagaExecutor<C, R>.resume(runId: RunId, context: C, journal: SagaJournal<P>): ResumeOutcome<R, Unit>

Resumes a stateless saga from its journal.