04: Channel Lock
Per-channel mutex wrapper around the per-turn body.
File: src/utils/chat/channelQueue.ts:75-134
Concurrency wrapper, not a data-transform stage. Input and output are structurally the same (
RunnableChatAdmissionflows in; the callback receives aLockedChatTurnderived from it). What this stage does is enforce that exactly one turn-sequence runs per channel at a time, manage the Discord typing indicator, and replay queued messages on release.
Mission
Section titled “Mission”Acquire a channel-scoped mutex, run the per-turn callback under that lock with a
Discord typing keepalive active, and on release: replay the next queued message
and/or trigger any pending stop-response generation. Make recursive re-entries
into tomoriChat() safe by recognizing the skipLock=true flag and reusing the
outer lock instead of deadlocking on it.
RunnableChatAdmission(from stage 02).callback: (LockedChatTurn, startTyping) => Promise<T>— receives the locked turn and a function that starts the typing keepalive.options: { handleStopResponse, processQueuedMessage }— the coordinator’s re-entry callbacks for stop-response and queued messages.
Output
Section titled “Output”Promise<T> — the callback’s return value, pass-through.
The callback receives LockedChatTurn:
{ admission: RunnableChatAdmission; channelId: string; lockedAt: number; queueDepth: number; skipLock: boolean;}Side effects
Section titled “Side effects”Lock acquisition (if skipLock === false):
- Looks up or creates a
ChannelLockEntrykeyed bychannelIdin the in-memorychannelLocksmap. - Forcibly releases the lock if older than
CHANNEL_LOCK_TIMEOUT_MS(default 180s, configurable via env). Logs a warning, aborts the turn abort controller, fires the stream kill callback, and clears the existing queue. - Sets
isLocked = true, recordslockedAt,currentMessageId,userDiscId, persona-job/persona-id/command-triggered flags. - Creates a fresh
AbortController(activeTurnAbortController) for this turn. Its signal is passed to tools viaToolContext.abortSignalso HTTP-level cancellation propagates on/bot kill.
During the callback:
startTyping()(called by the coordinator afterplanChatTurnsproduces ≥ 1 turn) starts the Discord typing keepalive interval (default 8s, configurable via env). Interval auto-stops when the lock is released or a stop request is registered.
Lock release (always runs via finally):
- Clears
isLocked,lockedAt, all active-turn state. - Aborts
activeTurnAbortControllerand clearsactiveStreamKill— ensures no stale kill handles survive across turns. - Stops the typing keepalive.
- Checks
StreamOrchestrator.getAndClearStopContext(channelId). If present, scheduleshandleStopResponse(originalStopMessage, client)viasetImmediate— stop-response generation runs after lock release so the stop response itself can acquire the lock. - Pops the next message from
messageQueue(FIFO). If present, schedulesprocessQueuedMessage(next)viasetImmediate. TheQueuedMessageshape mirrors the cross-cutting fields ofTomoriChatInputthat affect what the bot will say on replay — including reminder context (reminderRecipientID,reminderData) and the streaming-context overrides (disableCrossChannelMessage,disableRecentMessageReplyTool,disableReminderTool). Any new input field that influences generation must also be added toQueuedMessageand threaded throughprocessQueuedMessage, otherwise the queued replay will be a silently-degraded copy of the original call.
skipLock=true path:
- Re-entries from retry/post-turn effects pass
skipLock=true. The stage short-circuits: reuses the outer lock’slockedAtand queue depth, invokes the callback immediately, returns the result. No new typing keepalive is started (the outer keepalive is still active).
Invariants
Section titled “Invariants”After this stage’s finally block runs:
lockEntry.isLocked === falsefor the duration between turn-sequences.- The Discord typing keepalive timer is cleared (
typingKeepaliveTimer === null). - The queued-message replay is scheduled via
setImmediate, not awaited — the current invocation returns before the next message is processed, so the call stack stays shallow even under heavy queue pressure. - A pending stop-response (if any) was scheduled before the queue replay, so the stop response runs first.
/bot kill mechanics
Section titled “/bot kill mechanics”forceKillChannelStream(channelId) is the single entry point for hard-killing
an active turn. It does both:
- Abort the turn controller (
activeTurnAbortController.abort()) — if a tool is executing, thekillPromiseinexecuteToolCall’s race fires immediately, returning{kind: "abort", status: "stopped_by_user"}. The channel lock releases as normal via thefinallyblock ofrunWithChannelLock. - Fire the stream kill callback (
activeStreamKill(...)) — if the LLM is mid-stream, this simultaneously callsabortController.abort()(cancels the HTTP request) and rejects thePromise.raceinstreamOnce, causing it to return{status: "timeout"}.
/bot kill in src/commands/bot/kill.ts additionally calls
StreamOrchestrator.requestStop before forceKillChannelStream, and
clearChannelProcessingQueue to drain the message queue — so neither the
current turn nor any queued messages continue processing.
activeStreamKill is registered by toolLoop.ts/streamOnce at the start of
each provider call and cleared in finally. activeTurnAbortController is
created in acquireChannelLockForTurn and cleared on release.
Extension points
Section titled “Extension points”Internal — concurrency primitive. The lock, queue, and typing-keepalive
mechanics are tightly coupled to Discord rate limits, the stream orchestrator’s
stop/follow-up signaling, and the recursive tomoriChat() re-entry pattern.
Replacing this stage from a plugin would risk breaking those guarantees.
Plugin-relevant adjacent surfaces (lower in the same module):
| Helper | What a plugin might do | Plugin-relevance |
|---|---|---|
enqueueBusyChannelMessage, queuePersonaJobsAtFront, queueStopResponseAtFront | Add a new “queue at front” entry type | → plugin plan candidate; today these are call-site-specific |
queueFollowUpForLockedTurn | Change follow-up interrupt eligibility rules | Internal — coupled to MAX_FOLLOW_UP_INTERRUPTS, the tool-call-chain flag, and the cross-persona trigger guard (see hasExplicitCrossPersonaTrigger in triggerProcessor.ts) |
requestNaturalStopForLockedTurn | Add a new “soft stop” signal type | Internal — coupled to StreamOrchestrator.requestStop semantics |
clearQueuedSelfReplyWork | Customize what gets cleared on natural stop | Internal — coupled to isSelfTriggerMessage and persona-job semantics |
The lock’s policy (timeout, typing interval, max follow-ups) is configurable via env vars; behaviour customization should go through that channel rather than monkey-patching the stage.
Configuration
Section titled “Configuration”| Env var | Default | Purpose |
|---|---|---|
CHANNEL_LOCK_TIMEOUT_MS | 180000 | Stale-lock detection threshold |
DISCORD_TYPING_KEEPALIVE_INTERVAL_MS | 8000 | Typing-refresh cadence |
MAX_FOLLOW_UP_INTERRUPTS | 3 | Per-lock follow-up interrupt cap |
Related docs
Section titled “Related docs”- Queue policy decision tree: lives in
evaluateAdmissionQueueAndTriggerGate(stage 02 helper); → admission-queue helper doc TBD if it grows. - Stop request mechanics: → provider pipeline (stream orchestrator stage).
- Follow-up interrupt semantics: → folded into stage 05 docs (follow-up eligibility gating).