Skip to content

04: Orchestrator State Machine

Drives the provider generator as a state machine, routing each ProcessedChunk and resolving stop signals, timeouts, and stream completion into a StreamResult.

File: src/utils/discord/stream/stateMachine.ts:119-456

StreamOrchestrator.executeStream() is the central coordinator of the Discord-side pipeline. It owns the for await loop over the stage 02 generator and makes the per-chunk decisions that determine the shape of the final StreamResult. Three concerns are woven through the loop:

  1. Stop / interrupt resolution — before processing each chunk, the stop registry is checked. A user stop (/stop) flushes the pending buffer and returns { status: "stopped_by_user" }. A follow-up interrupt discards the buffer and returns { status: "follow_up_interrupt" } so the chat pipeline can restart for the new message.

  2. Chunk routing — after stop checks, the ProcessedChunk.type determines the path:

    • "text" → stage 05 (StreamBufferFlusher.processTextChunk)
    • "function_call" → flush the pending buffer → return { status: "function_call", data: functionCall }
    • "error" → flush the pending buffer → display an error embed if not suppressed → return { status: "error", data: error }
    • "done" → record terminalDoneMetadata (finish reason); continue the loop (generator will exhaust on the next await)
  3. Inactivity timeout — a rolling setTimeout resets on every chunk (resetInactivityTimer). If no chunk arrives for config.inactivityTimeoutMs, state.timedOut = true. The loop detects this flag on the next iteration (or at generator exhaust) and returns { status: "timeout" }.

After the generator exhausts normally, completeStreamAfterProviderEnd() runs the final flush path (stage 05 flushFinalBuffer), checks for timeout, and assembles the completed StreamResult.

The outer streamToDiscord() method (the public entry point) calls executeStream() and then does one additional check: if the result is "completed" but wasEmptyStreamResponse() is true (no text and no function call were emitted), it returns { status: "empty_response" } instead, so the tool-loop pipeline’s retry logic can handle it.

  • provider: StreamProvider — the stage 02/03 adapter (generator + processChunk).
  • config: StreamConfig — timing and buffer size configuration.
  • context: StreamContext — full Discord and application state (channel, tomoriState, etc.).

StreamMetrics and StreamState objects are created fresh at the start of executeStream().

StreamResult — defined at src/types/provider/interfaces.ts:88:

interface StreamResult {
status:
| "completed" // generator exhausted; text was sent
| "function_call" // provider requested a tool; tool-loop handles it
| "error" // provider or Discord error
| "timeout" // inactivity timer expired
| "stopped_by_user" // user /stop command
| "empty_response" // completed but no text or function call
| "follow_up_interrupt"; // new user message arrived during generation
data?: unknown | Error;
accumulatedText?: string; // all text sent to Discord (for STM write)
detailsContent?: string; // <details> block body (for STM write)
stopReason?: StreamStopReason;
thoughtLog?: ThoughtLogPayload;
naiContinuationPrefill?: string; // NAI-specific trailing fragment for retry
}
  • Inactivity timer — a setTimeout is set on entry and cleared in finally. The timer runs against NodeJS.Timeout — it is always cleared before the method returns.
  • Stop-request mutationclearStopRequest(channelId) is called on exit paths that consumed a stop. The stop registry is a shared module-level map in stopRequests.ts.
  • Error embed — when chunk.type === "error" and !context.suppressUserErrors, calls StreamErrorUi.handleProviderError() which sends a Discord embed to the channel. This is the sole embed send path for ProviderError types — the downstream response sink (emitStreamResult in responseEmitter.ts) deliberately skips the generic fallback embed when result.data is a ProviderError, to avoid double-sending.
  • Timeout embed — when the inactivity timer fires and user errors are not suppressed, sends a timeout embed via sendStandardEmbed().
  • Progress callback — calls context.onStreamProgress?.() on each chunk to reset the rolling timeout in the stage 01 caller (streamOnce in the tool-loop pipeline).
  • currentTurnModelParts accumulation — stage 05 (processTextChunk) pushes text parts into context.currentTurnModelParts as a side effect; the orchestrator does not do this directly.

After this stage:

  • Exactly one StreamResult is returned; the method never throws to its caller (streamToDiscord catches all errors and converts them to { status: "error" }).
  • The inactivity timer has been cleared unconditionally (via finally).
  • If status === "function_call", result.data is a FunctionCall and result.accumulatedText contains all text sent to Discord before the tool call.
  • If status === "completed", all buffered text has been flushed (including final <think> and <details> block captures).
  • clearStopRequest has been called for any stop that was consumed during this stream.
SurfacePlugin-relevance
StreamOrchestrator.streamToDiscord() public methodThe universal Discord streaming entry point — src/types/stream/interfaces.ts:313. All providers delegate here. Internal — the orchestrator is not designed to be replaced; new providers plug in via the StreamProvider adapter contract.
Stop registry (requestStop, hasStopRequest, clearStopRequest)src/utils/discord/stream/stopRequests.ts. The stop registry is a shared per-channel state map. A plugin that wants to interrupt streaming (e.g., a moderation system) would call StreamOrchestrator.requestStop(channelId, requesterId). → plugin plan candidate
context.onStreamProgress callbackSet by the tool-loop pipeline (streamOnce) before calling streamToDiscord. The orchestrator calls it on each chunk. Internal — the callback is an operational heartbeat, not a plugin seam.
StreamResult status unionConsumed by the tool-loop pipeline’s outer switch. Adding a new status requires changes in both the orchestrator and the tool-loop consumer. Internal until the tool-loop plugin contract is defined.
context.suppressUserErrorsWhen true, error and timeout embeds are not sent to Discord (used during retries in runGenerationTurn). Internal — set by the chat pipeline’s key-rotation loop.
SourceKey / Env varDefaultPurpose
StreamConfig / env varINACTIVITY_TIMEOUT_MS120 000 msTime with no chunk before stream is considered stalled
TomoriState.configsend_message_limit0 (no limit)Maximum Discord messages per stream; enforced in stage 07
  • Stop signal registry: src/utils/discord/stream/stopRequests.ts
  • Error embed rendering: src/utils/discord/stream/errorUi.ts
  • Empty-response detection: wasEmptyStreamResponse in src/utils/discord/stream/thoughtLog.ts
  • Stage 05 (text path from this stage): → 05-buffer-management.md
  • Tool-loop consumer of StreamResult: → tool-loop pipeline — Stage 01
  • StreamResult type: src/types/provider/interfaces.ts:88