04: Orchestrator State Machine
Drives the provider generator as a state machine, routing each ProcessedChunk and resolving stop signals, timeouts, and stream completion into a StreamResult.
File: src/utils/discord/stream/stateMachine.ts:119-456
Mission
Section titled “Mission”StreamOrchestrator.executeStream() is the central coordinator of the Discord-side pipeline.
It owns the for await loop over the stage 02 generator and makes the per-chunk decisions that
determine the shape of the final StreamResult. Three concerns are woven through the loop:
-
Stop / interrupt resolution — before processing each chunk, the stop registry is checked. A user stop (
/stop) flushes the pending buffer and returns{ status: "stopped_by_user" }. A follow-up interrupt discards the buffer and returns{ status: "follow_up_interrupt" }so the chat pipeline can restart for the new message. -
Chunk routing — after stop checks, the
ProcessedChunk.typedetermines the path:"text"→ stage 05 (StreamBufferFlusher.processTextChunk)"function_call"→ flush the pending buffer → return{ status: "function_call", data: functionCall }"error"→ flush the pending buffer → display an error embed if not suppressed → return{ status: "error", data: error }"done"→ recordterminalDoneMetadata(finish reason); continue the loop (generator will exhaust on the nextawait)
-
Inactivity timeout — a rolling
setTimeoutresets on every chunk (resetInactivityTimer). If no chunk arrives forconfig.inactivityTimeoutMs,state.timedOut = true. The loop detects this flag on the next iteration (or at generator exhaust) and returns{ status: "timeout" }.
After the generator exhausts normally, completeStreamAfterProviderEnd() runs the final flush
path (stage 05 flushFinalBuffer), checks for timeout, and assembles the completed StreamResult.
The outer streamToDiscord() method (the public entry point) calls executeStream() and then
does one additional check: if the result is "completed" but wasEmptyStreamResponse() is true
(no text and no function call were emitted), it returns { status: "empty_response" } instead,
so the tool-loop pipeline’s retry logic can handle it.
provider: StreamProvider— the stage 02/03 adapter (generator +processChunk).config: StreamConfig— timing and buffer size configuration.context: StreamContext— full Discord and application state (channel, tomoriState, etc.).
StreamMetrics and StreamState objects are created fresh at the start of executeStream().
Output
Section titled “Output”StreamResult — defined at src/types/provider/interfaces.ts:88:
interface StreamResult { status: | "completed" // generator exhausted; text was sent | "function_call" // provider requested a tool; tool-loop handles it | "error" // provider or Discord error | "timeout" // inactivity timer expired | "stopped_by_user" // user /stop command | "empty_response" // completed but no text or function call | "follow_up_interrupt"; // new user message arrived during generation data?: unknown | Error; accumulatedText?: string; // all text sent to Discord (for STM write) detailsContent?: string; // <details> block body (for STM write) stopReason?: StreamStopReason; thoughtLog?: ThoughtLogPayload; naiContinuationPrefill?: string; // NAI-specific trailing fragment for retry}Side effects
Section titled “Side effects”- Inactivity timer — a
setTimeoutis set on entry and cleared infinally. The timer runs againstNodeJS.Timeout— it is always cleared before the method returns. - Stop-request mutation —
clearStopRequest(channelId)is called on exit paths that consumed a stop. The stop registry is a shared module-level map instopRequests.ts. - Error embed — when
chunk.type === "error"and!context.suppressUserErrors, callsStreamErrorUi.handleProviderError()which sends a Discord embed to the channel. This is the sole embed send path forProviderErrortypes — the downstream response sink (emitStreamResultinresponseEmitter.ts) deliberately skips the generic fallback embed whenresult.datais aProviderError, to avoid double-sending. - Timeout embed — when the inactivity timer fires and user errors are not suppressed, sends
a timeout embed via
sendStandardEmbed(). - Progress callback — calls
context.onStreamProgress?.()on each chunk to reset the rolling timeout in the stage 01 caller (streamOncein the tool-loop pipeline). currentTurnModelPartsaccumulation — stage 05 (processTextChunk) pushes text parts intocontext.currentTurnModelPartsas a side effect; the orchestrator does not do this directly.
Invariants
Section titled “Invariants”After this stage:
- Exactly one
StreamResultis returned; the method never throws to its caller (streamToDiscordcatches all errors and converts them to{ status: "error" }). - The inactivity timer has been cleared unconditionally (via
finally). - If
status === "function_call",result.datais aFunctionCallandresult.accumulatedTextcontains all text sent to Discord before the tool call. - If
status === "completed", all buffered text has been flushed (including final<think>and<details>block captures). clearStopRequesthas been called for any stop that was consumed during this stream.
Extension points
Section titled “Extension points”| Surface | Plugin-relevance |
|---|---|
StreamOrchestrator.streamToDiscord() public method | The universal Discord streaming entry point — src/types/stream/interfaces.ts:313. All providers delegate here. Internal — the orchestrator is not designed to be replaced; new providers plug in via the StreamProvider adapter contract. |
Stop registry (requestStop, hasStopRequest, clearStopRequest) | src/utils/discord/stream/stopRequests.ts. The stop registry is a shared per-channel state map. A plugin that wants to interrupt streaming (e.g., a moderation system) would call StreamOrchestrator.requestStop(channelId, requesterId). → plugin plan candidate |
context.onStreamProgress callback | Set by the tool-loop pipeline (streamOnce) before calling streamToDiscord. The orchestrator calls it on each chunk. Internal — the callback is an operational heartbeat, not a plugin seam. |
StreamResult status union | Consumed by the tool-loop pipeline’s outer switch. Adding a new status requires changes in both the orchestrator and the tool-loop consumer. Internal until the tool-loop plugin contract is defined. |
context.suppressUserErrors | When true, error and timeout embeds are not sent to Discord (used during retries in runGenerationTurn). Internal — set by the chat pipeline’s key-rotation loop. |
Configuration
Section titled “Configuration”| Source | Key / Env var | Default | Purpose |
|---|---|---|---|
StreamConfig / env var | INACTIVITY_TIMEOUT_MS | 120 000 ms | Time with no chunk before stream is considered stalled |
TomoriState.config | send_message_limit | 0 (no limit) | Maximum Discord messages per stream; enforced in stage 07 |
Related docs
Section titled “Related docs”- Stop signal registry:
src/utils/discord/stream/stopRequests.ts - Error embed rendering:
src/utils/discord/stream/errorUi.ts - Empty-response detection:
wasEmptyStreamResponseinsrc/utils/discord/stream/thoughtLog.ts - Stage 05 (text path from this stage): →
05-buffer-management.md - Tool-loop consumer of
StreamResult: → tool-loop pipeline — Stage 01 StreamResulttype:src/types/provider/interfaces.ts:88