src/voice/voiceAsrBridge.ts

/**

  • Unified ASR bridge — handles both per-user and shared transcription modes
  • through a single code path with mode-specific branching where necessary. */ import { OpenAiRealtimeTranscriptionClient } from "./openaiRealtimeTranscriptionClient.ts"; import { resolveVoiceAsrLanguageGuidance, inspectAsrTranscript, normalizeVoiceText, normalizeInlineText, getRealtimeCommitMinimumBytes } from "./voiceSessionHelpers.ts"; import { OPENAI_REALTIME_DEFAULT_TRANSCRIPTION_MODEL, normalizeOpenAiRealtimeTranscriptionModel } from "./realtimeProviderNormalization.ts"; import { getVoiceRuntimeConfig } from "../settings/agentStack.ts"; import { OPENAI_ASR_SESSION_IDLE_TTL_MS, OPENAI_ASR_TRANSCRIPT_STABLE_MS, OPENAI_ASR_TRANSCRIPT_WAIT_MAX_MS } from "./voiceSessionManager.constants.ts"; import type { VoiceSession } from "./voiceSessionTypes.ts";

// ── Types ────────────────────────────────────────────────────────────

type AsrBridgeMode = "per_user" | "shared";

/**

  • Explicit lifecycle phase for an ASR bridge session.
  • Transition rules:
  • idle → connecting ensureAsrSessionConnected starts a new connection
  • connecting → ready WebSocket opens and connectedAt is set
  • ready → committing commitAsrUtterance begins
  • committing → ready commit completes or fails
  • ready|committing → closing teardown begins
  • closing → idle WebSocket closes and state is cleaned up */ export type AsrBridgePhase = "idle" | "connecting" | "ready" | "committing" | "closing";

// ── Phase query helpers ────────────────────────────────────────────── // These are the ONLY way consuming code should ask questions about ASR // bridge lifecycle state. They replace the old closing / isCommittingAsr // boolean checks.

/** The bridge is tearing down (replaces closing boolean). */ function asrPhaseIsClosing(phase: AsrBridgePhase): boolean { return phase === "closing"; }

/** The bridge is in the middle of a commit (replaces isCommittingAsr boolean). */ function asrPhaseIsCommitting(phase: AsrBridgePhase): boolean { return phase === "committing"; }

export interface AsrUtteranceState { id: number; startedAt: number; bytesSent: number; partialText: string; finalSegments: string[]; finalSegmentEntries: AsrFinalSegmentEntry[]; lastUpdateAt: number; }

export interface AsrFinalSegmentEntry { itemId: string; previousItemId: string | null; text: string; receivedAt: number; logprobs: Array<{ token: string; logprob: number; bytes: number[] | null }> | null; }

export interface AsrPendingAudioChunk { utteranceId: number; chunk: Buffer; }

export interface AsrPendingCommitRequest { id: string; userId: string; requestedAt: number; }

export interface AsrPendingCommitResolver { id: string; userId: string | null; commitRequestId: string | null; resolve: (itemId: string) => void; }

/** State fields common to both modes. */ export interface AsrBridgeState { phase: AsrBridgePhase; userId: string | null; client: OpenAiRealtimeTranscriptionClient | null; connectPromise: Promise | null; committingUtteranceId: number; committingUtterance: AsrUtteranceState | null; pendingAudioChunks: AsrPendingAudioChunk[]; pendingAudioBytes: number; connectedAt: number; lastAudioAt: number; lastTranscriptAt: number; lastPartialLogAt: number; lastPartialText: string; idleTimer: ReturnType | null; utterance: AsrUtteranceState; // Shared-mode only fields (unused/empty in per_user mode) itemIdToUserId: Map<string, string>; committedItemUtterances: Map<string, AsrUtteranceState>; finalTranscriptsByItemId: Map<string, string>; pendingCommitResolvers: AsrPendingCommitResolver[]; pendingCommitRequests: AsrPendingCommitRequest[]; consecutiveEmptyCommits: number; // Internal periodic flush telemetry for aggregated runtime logging. _flushAccumBytes: number; _flushAccumChunks: number; _flushAccumSkipped: number; _lastFlushLogAt: number; speechDetectedAt: number; speechStoppedAt: number; speechActive: boolean; speechDetectedUtteranceId: number; speechStoppedUtteranceId: number; }

export interface AsrCommitResult { transcript: string; asrStartedAtMs: number; asrCompletedAtMs: number; transcriptionModelPrimary: string; transcriptionModelFallback: string | null; transcriptionPlanReason: string; usedFallbackModel: boolean; captureReason: string; transcriptLogprobs: Array<{ token: string; logprob: number; bytes: number[] | null }> | null; }

/** Dependencies injected by the session manager. */ export interface AsrBridgeDeps { session: VoiceSession; appConfig: { openaiApiKey: string; [key: string]: unknown }; store: { logAction: (entry: { kind: string; guildId: string; channelId: string; userId?: string | null; content: string; metadata?: Record<string, unknown>; }) => void; getSettings: () => Record<string, unknown> | null; }; botUserId: string | null; resolveVoiceSpeakerName: (session: VoiceSession, userId: string | null) => string; handleSpeechStarted?: (args: { session: VoiceSession; userId: string | null; speakerName: string; utteranceId: number; audioStartMs?: number | null; itemId?: string | null; eventType?: string | null; }) => void; handleSpeechStopped?: (args: { session: VoiceSession; userId: string | null; speakerName: string; utteranceId: number; audioEndMs?: number | null; itemId?: string | null; eventType?: string | null; }) => void; handleTranscriptOverlapSegment?: (args: { session: VoiceSession; userId: string | null; speakerName: string; transcript: string; utteranceId: number; isFinal: boolean; eventType?: string | null; itemId?: string | null; previousItemId?: string | null; }) => void; }

// ── State creation ───────────────────────────────────────────────────

// Circuit breaker: after this many consecutive empty commits with // substantial audio, force-reconnect the ASR session. const ASR_EMPTY_COMMIT_RECONNECT_THRESHOLD = 3; const ASR_EMPTY_COMMIT_MIN_BYTES = 48_000; // ~1s of 24kHz PCM16

export function createAsrBridgeState(): AsrBridgeState { return { phase: "idle", userId: null, client: null, connectPromise: null, committingUtteranceId: 0, committingUtterance: null, pendingAudioChunks: [], pendingAudioBytes: 0, connectedAt: 0, lastAudioAt: 0, lastTranscriptAt: 0, lastPartialLogAt: 0, lastPartialText: "", idleTimer: null, utterance: createAsrUtteranceState(), itemIdToUserId: new Map(), committedItemUtterances: new Map(), finalTranscriptsByItemId: new Map(), pendingCommitResolvers: [], pendingCommitRequests: [], consecutiveEmptyCommits: 0, _flushAccumBytes: 0, _flushAccumChunks: 0, _flushAccumSkipped: 0, _lastFlushLogAt: 0, speechDetectedAt: 0, speechStoppedAt: 0, speechActive: false, speechDetectedUtteranceId: 0, speechStoppedUtteranceId: 0 }; }

function createAsrUtteranceState(prevId = 0): AsrUtteranceState { return { id: prevId + 1, startedAt: Date.now(), bytesSent: 0, partialText: "", finalSegments: [], finalSegmentEntries: [], lastUpdateAt: 0 }; }

// ── Per-user state management (Map<userId, state>) ───────────────────

export function getOrCreatePerUserAsrState( session: VoiceSession, userId: string ): AsrBridgeState | null { if (!session || session.ending) return null; const normalizedUserId = String(userId || "").trim(); if (!normalizedUserId) return null; if (!(session.openAiAsrSessions instanceof Map)) { session.openAiAsrSessions = new Map(); } const existing = session.openAiAsrSessions.get(normalizedUserId); if (existing && typeof existing === "object") return existing as AsrBridgeState; const state = createAsrBridgeState(); state.userId = normalizedUserId; session.openAiAsrSessions.set(normalizedUserId, state); return state; }

// ── Shared state management (single state on session) ────────────────

export function getOrCreateSharedAsrState(session: VoiceSession): AsrBridgeState | null { if (!session || session.ending) return null; if (!session.openAiSharedAsrState) { session.openAiSharedAsrState = createAsrBridgeState(); } return session.openAiSharedAsrState as AsrBridgeState; }

// ── Resolve ASR state for mode ───────────────────────────────────────

function getAsrState( mode: AsrBridgeMode, session: VoiceSession, userId: string ): AsrBridgeState | null { if (mode === "per_user") return getOrCreatePerUserAsrState(session, userId); return getOrCreateSharedAsrState(session); }

// ── Shared helpers ───────────────────────────────────────────────────

const STT_TRANSCRIPT_MAX_CHARS_LOCAL = 2000; const MAX_MAP_SIZE = 320;

function createAsrRuntimeLogger(deps: AsrBridgeDeps, logUserId: string) { return ({ level, event, metadata }: { level: string; event: string; metadata?: Record<string, unknown> | null }) => { deps.store.logAction({ kind: level === "warn" ? "voice_error" : "voice_runtime", guildId: deps.session.guildId, channelId: deps.session.textChannelId, userId: String(logUserId || "").trim() || deps.botUserId || null, content: event, metadata: { sessionId: deps.session.id, ...(metadata && typeof metadata === "object" ? metadata : {}) } }); }; }

function logAsyncAsrCloseFailure( deps: AsrBridgeDeps, session: VoiceSession, { userId = null, reason, mode, error }: { userId?: string | null; reason: string; mode: AsrBridgeMode; error: unknown; } ) { deps.store.logAction({ kind: "voice_error", guildId: session.guildId, channelId: session.textChannelId, userId: String(userId || "").trim() || deps.botUserId || null, content: openai_realtime_asr_async_close_failed: ${String((error as Error)?.message || error)}, metadata: { sessionId: session.id, reason, mode } }); }

function resolveAsrModelParams(session: VoiceSession, settings: Record<string, unknown> | null) { const resolvedSettings = settings || session.settingsSnapshot || {}; const voiceAsrGuidance = resolveVoiceAsrLanguageGuidance(resolvedSettings); const voiceRuntime = getVoiceRuntimeConfig(resolvedSettings); const rawModel = String( session.openAiPerUserAsrModel || voiceRuntime.openaiRealtime?.inputTranscriptionModel || OPENAI_REALTIME_DEFAULT_TRANSCRIPTION_MODEL ) .trim() .slice(0, 120); const normalizedModel = normalizeOpenAiRealtimeTranscriptionModel( rawModel, OPENAI_REALTIME_DEFAULT_TRANSCRIPTION_MODEL ); const language = String( session.openAiPerUserAsrLanguage || voiceAsrGuidance.language || "" ) .trim() .toLowerCase() .replace(/_/g, "-") .slice(0, 24); const prompt = String(session.openAiPerUserAsrPrompt || voiceAsrGuidance.prompt || "") .replace(/\s+/g, " ") .trim() .slice(0, 280); const noiseReduction = voiceAsrGuidance.noiseReduction || "near_field"; return { normalizedModel, language, prompt, noiseReduction }; }

function pruneMap(map: Map<string, unknown>, maxSize = MAX_MAP_SIZE) { if (map.size <= maxSize) return; const overflow = map.size - maxSize; let dropped = 0; for (const key of map.keys()) { map.delete(key); dropped += 1; if (dropped >= overflow) break; } }

// ── Logprobs collection from segment entries ─────────────────────────

function collectSegmentLogprobs( entries: AsrFinalSegmentEntry[] | null | undefined ): Array<{ token: string; logprob: number; bytes: number[] | null }> | null { if (!Array.isArray(entries) || entries.length === 0) return null; const collected: Array<{ token: string; logprob: number; bytes: number[] | null }> = []; for (const entry of entries) { if (!Array.isArray(entry?.logprobs)) continue; for (const lp of entry.logprobs) { if (lp && typeof lp.logprob === "number") { collected.push(lp); } } } return collected.length > 0 ? collected : null; }

// ── Segment ordering (topological sort by previousItemId) ────────────

function orderAsrFinalSegments(entries: AsrFinalSegmentEntry[]): string[] { const normalizedEntries = Array.isArray(entries) ? entries .map((entry, index) => ({ itemId: normalizeInlineText(entry?.itemId, 180), previousItemId: normalizeInlineText(entry?.previousItemId, 180) || null, text: normalizeVoiceText(entry?.text || "", STT_TRANSCRIPT_MAX_CHARS_LOCAL), receivedAt: Math.max(0, Number(entry?.receivedAt || 0)), index })) .filter((entry) => entry.itemId && entry.text) : []; if (normalizedEntries.length <= 1) { return normalizedEntries.map((entry) => entry.text); }

const byId = new Map<string, (typeof normalizedEntries)[number]>(); for (const entry of normalizedEntries) { byId.set(entry.itemId, entry); } const sorted = [...byId.values()].sort((a, b) => { const delta = Number(a.receivedAt || 0) - Number(b.receivedAt || 0); if (delta !== 0) return delta; return Number(a.index || 0) - Number(b.index || 0); });

const placed = new Set(); const ordered: string[] = []; while (ordered.length < sorted.length) { let progressed = false; for (const entry of sorted) { if (placed.has(entry.itemId)) continue; const previousItemId = String(entry.previousItemId || ""); if (!previousItemId || !byId.has(previousItemId) || placed.has(previousItemId)) { placed.add(entry.itemId); ordered.push(entry.text); progressed = true; } } if (progressed) continue; // Fall back to arrival order if chain is incomplete/cyclic. for (const entry of sorted) { if (placed.has(entry.itemId)) continue; placed.add(entry.itemId); ordered.push(entry.text); } }

return ordered; }

// ── Shared-mode: resolve speaker from itemId mapping ─────────────────

function resolveSharedAsrSpeakerUserId(opts: { asrState: AsrBridgeState; itemId: string; fallbackUserId: string | null; botUserId: string | null; }): string | null { const normalizedItemId = normalizeInlineText(opts.itemId, 180); if (normalizedItemId && opts.asrState.itemIdToUserId instanceof Map) { const mappedUserId = String(opts.asrState.itemIdToUserId.get(normalizedItemId) || "").trim(); if (mappedUserId) return mappedUserId; } const normalizedFallbackUserId = String(opts.fallbackUserId || "").trim(); if (normalizedFallbackUserId) return normalizedFallbackUserId; const activeSharedUserId = String(opts.asrState.userId || "").trim(); if (activeSharedUserId) return activeSharedUserId; return opts.botUserId || null; }

// ── Shared-mode: committed item tracking & waiters ───────────────────

function prunePendingCommitRequests(asrState: AsrBridgeState, maxAgeMs = 30_000) { const requests = asrState.pendingCommitRequests; if (!requests.length) return requests; const maxAge = Math.max(1_000, Number(maxAgeMs) || 30_000); const now = Date.now(); while (requests.length > 0) { const head = requests[0]; const requestedAt = Math.max(0, Number(head?.requestedAt || 0)); if (requestedAt > 0 && now - requestedAt <= maxAge) break; requests.shift(); } return requests; }

export function trackSharedAsrCommittedItem( asrState: AsrBridgeState, itemId: string, fallbackUserId: string | null = null ) { if (!(asrState.itemIdToUserId instanceof Map)) return; const normalizedItemId = normalizeInlineText(itemId, 180); if (!normalizedItemId) return; const pendingRequests = prunePendingCommitRequests(asrState); const commitRequest = pendingRequests.length > 0 ? pendingRequests.shift()! : null; const commitRequestUserId = String(commitRequest?.userId || "").trim(); const mappedUserId = String(fallbackUserId || commitRequestUserId || "").trim(); if (mappedUserId) { asrState.itemIdToUserId.set(normalizedItemId, mappedUserId); pruneMap(asrState.itemIdToUserId); } const resolvers = asrState.pendingCommitResolvers; if (!resolvers.length) return; const resolverIndex = mappedUserId ? resolvers.findIndex((entry) => String(entry?.userId || "").trim() === mappedUserId) : resolvers.findIndex((entry) => !String(entry?.userId || "").trim()); if (resolverIndex < 0) return; const [resolver] = resolvers.splice(resolverIndex, 1); if (resolver && typeof resolver.resolve === "function") { resolver.resolve(normalizedItemId); } }

function trackPerUserAsrCommittedItem( asrState: AsrBridgeState, itemId: string ) { const normalizedItemId = normalizeInlineText(itemId, 180); if (!normalizedItemId) return; const trackedUtterance = asrState.committingUtterance && typeof asrState.committingUtterance === "object" ? asrState.committingUtterance // Server VAD can auto-commit the active turn before capture finalization // transitions this bridge into committing. In that case we still need // the emitted item_id to bind to the active utterance so transcript // events do not fall back through previousItemId onto an older turn. : Number(asrState.speechStoppedAt || 0) > 0 && asrState.utterance && typeof asrState.utterance === "object" ? asrState.utterance : null; if (!trackedUtterance || typeof trackedUtterance !== "object") return; asrState.committedItemUtterances.set(normalizedItemId, trackedUtterance); pruneMap(asrState.committedItemUtterances); }

function waitForSharedAsrCommittedItem( session: VoiceSession, asrState: AsrBridgeState, userId: string, commitRequestId: string ): Promise { if (!session || session.ending || !asrState) return Promise.resolve(""); const waitMs = Math.max( 600, Number(session.openAiAsrTranscriptStableMs || OPENAI_ASR_TRANSCRIPT_STABLE_MS) * 4 ); const normalizedUserId = String(userId || "").trim() || null; const normalizedCommitRequestId = String(commitRequestId || "").trim(); return new Promise((resolve) => { const resolvers = asrState.pendingCommitResolvers; const waiterId = ${Date.now()}-${Math.random().toString(36).slice(2, 10)}; const timeout = setTimeout(() => { const index = resolvers.findIndex((entry) => entry?.id === waiterId); if (index >= 0) resolvers.splice(index, 1); resolve(""); }, waitMs); const waiter: AsrPendingCommitResolver = { id: waiterId, userId: normalizedUserId, commitRequestId: normalizedCommitRequestId || null, resolve: (itemId: string) => { clearTimeout(timeout); resolve(normalizeInlineText(itemId, 180) || ""); } }; resolvers.push(waiter); }); }

async function waitForSharedAsrTranscriptByItem( session: VoiceSession, asrState: AsrBridgeState, itemId: string ): Promise { if (!session || session.ending || !asrState) return ""; const normalizedItemId = normalizeInlineText(itemId, 180); if (!normalizedItemId) { return waitForAsrTranscriptSettle(session, asrState); } const stableWindowMs = Math.max( 100, Number(session.openAiAsrTranscriptStableMs || OPENAI_ASR_TRANSCRIPT_STABLE_MS) ); const maxWaitMs = Math.max( stableWindowMs + 120, Number(session.openAiAsrTranscriptWaitMaxMs || OPENAI_ASR_TRANSCRIPT_WAIT_MAX_MS) ); const startedAt = Date.now(); while (Date.now() - startedAt <= maxWaitMs) { if (session.ending) return ""; const transcript = normalizeVoiceText( asrState.finalTranscriptsByItemId.get(normalizedItemId) || "", STT_TRANSCRIPT_MAX_CHARS_LOCAL ); if (transcript) return transcript; await new Promise((resolve) => setTimeout(resolve, 40)); } return normalizeVoiceText( asrState.finalTranscriptsByItemId.get(normalizedItemId) || "", STT_TRANSCRIPT_MAX_CHARS_LOCAL ); }

// ── Transcript settle (per-user mode, fallback for shared) ───────────

async function waitForAsrTranscriptSettle( session: VoiceSession, asrState: AsrBridgeState, utterance: AsrUtteranceState | null = null ): Promise { if (!session || session.ending || !asrState) return ""; const trackedUtterance = utterance || asrState.utterance; const stableWindowMs = Math.max( 100, Number(session.openAiAsrTranscriptStableMs || OPENAI_ASR_TRANSCRIPT_STABLE_MS) ); const maxWaitMs = Math.max( stableWindowMs + 120, Number(session.openAiAsrTranscriptWaitMaxMs || OPENAI_ASR_TRANSCRIPT_WAIT_MAX_MS) ); const startedAt = Date.now(); while (Date.now() - startedAt <= maxWaitMs) { if (session.ending) return ""; const now = Date.now(); const lastUpdateAt = Math.max(0, Number(trackedUtterance?.lastUpdateAt || 0)); const stable = lastUpdateAt > 0 ? now - lastUpdateAt >= stableWindowMs : false; const finalText = normalizeVoiceText( Array.isArray(trackedUtterance?.finalSegments) ? trackedUtterance.finalSegments.join(" ") : "", STT_TRANSCRIPT_MAX_CHARS_LOCAL ); if (finalText && stable) return finalText; // Don't early-return partials — they're inherently incomplete. // A 120ms gap between partial updates is normal ASR batching, // not an indication the transcript is finished. Let the timeout // fallback (below) handle partials when no final arrives in time. await new Promise((resolve) => setTimeout(resolve, 40)); }

const finalText = normalizeVoiceText( Array.isArray(trackedUtterance?.finalSegments) ? trackedUtterance.finalSegments.join(" ") : "", STT_TRANSCRIPT_MAX_CHARS_LOCAL ); if (finalText) return finalText; return normalizeVoiceText(trackedUtterance?.partialText || "", STT_TRANSCRIPT_MAX_CHARS_LOCAL); }

function resolveTranscriptTargetUtterance({ asrState, mode, itemId, previousItemId }: { asrState: AsrBridgeState; mode: AsrBridgeMode; itemId: string | null; previousItemId: string | null; }): AsrUtteranceState { if (mode !== "per_user") { return asrState.utterance; }

const normalizedItemId = normalizeInlineText(itemId, 180); if (normalizedItemId) { const mappedUtterance = asrState.committedItemUtterances.get(normalizedItemId); if (mappedUtterance && typeof mappedUtterance === "object") { return mappedUtterance; } }

const normalizedPreviousItemId = normalizeInlineText(previousItemId, 180); if (normalizedPreviousItemId) { const mappedUtterance = asrState.committedItemUtterances.get(normalizedPreviousItemId); if (mappedUtterance && typeof mappedUtterance === "object") { return mappedUtterance; } }

return asrState.utterance; }

// ── Wire client events (identical for both modes) ────────────────────

function wireClientEvents( mode: AsrBridgeMode, client: OpenAiRealtimeTranscriptionClient, asrState: AsrBridgeState, deps: AsrBridgeDeps, userId: string | null ) { const { session, store, botUserId, resolveVoiceSpeakerName: resolveSpeaker } = deps;

client.on("event", (event: Record<string, unknown>) => { if (session.ending || !event || typeof event !== "object") return; if (event.type !== "input_audio_buffer.committed") return; const itemId = String( (event as Record<string, string>).item_id || (event as Record<string, Record<string, string>>).item?.id || "" ); if (mode === "shared") { trackSharedAsrCommittedItem(asrState, itemId); return; } trackPerUserAsrCommittedItem(asrState, itemId); });

client.on("transcript", (payload: Record<string, unknown>) => { if (session.ending) return; const transcriptGuard = inspectAsrTranscript( String(payload?.text || ""), STT_TRANSCRIPT_MAX_CHARS_LOCAL ); const transcript = transcriptGuard.transcript; if (!transcript) return;

const eventType = String(payload?.eventType || "").trim();
const isFinal = Boolean(payload?.final);
const itemId = normalizeInlineText(payload?.itemId, 180);
const previousItemId = normalizeInlineText(payload?.previousItemId, 180) || null;

let transcriptSpeakerUserId: string | null = userId ? String(userId).trim() : null;
if (mode === "shared") {
  transcriptSpeakerUserId = resolveSharedAsrSpeakerUserId({
    asrState,
    itemId,
    fallbackUserId: asrState.userId,
    botUserId
  });
}

const speakerName = resolveSpeaker(session, transcriptSpeakerUserId) || "someone";
if (transcriptGuard.malformed) {
  store.logAction({
    kind: "voice_runtime",
    guildId: session.guildId,
    channelId: session.textChannelId,
    userId: transcriptSpeakerUserId ? String(transcriptSpeakerUserId).trim() : null,
    content: "openai_realtime_asr_control_token_transcript_dropped",
    metadata: {
      sessionId: session.id,
      speakerName,
      transcript,
      eventType: eventType || null,
      itemId: itemId || null,
      previousItemId,
      final: isFinal,
      controlTokenCount: transcriptGuard.controlTokenCount,
      reservedAudioMarkerCount: transcriptGuard.reservedAudioMarkerCount
    }
  });
  return;
}

const now = Date.now();
const targetUtterance = resolveTranscriptTargetUtterance({
  asrState,
  mode,
  itemId: itemId || null,
  previousItemId
});

asrState.lastTranscriptAt = now;
targetUtterance.lastUpdateAt = now;

if (isFinal) {
  if (itemId) {
    const entries = Array.isArray(targetUtterance.finalSegmentEntries)
      ? targetUtterance.finalSegmentEntries
      : [];
    const nextEntry: AsrFinalSegmentEntry = {
      itemId,
      previousItemId,
      text: transcript,
      receivedAt: now,
      logprobs: Array.isArray(payload?.logprobs) ? payload.logprobs : null
    };
    const existingIndex = entries.findIndex((entry) => String(entry?.itemId || "") === itemId);
    if (existingIndex >= 0) {
      entries[existingIndex] = nextEntry;
    } else {
      entries.push(nextEntry);
    }
    targetUtterance.finalSegmentEntries = entries;
    targetUtterance.finalSegments = orderAsrFinalSegments(entries);

    // Shared mode: also index final transcripts by itemId
    if (mode === "shared") {
      asrState.finalTranscriptsByItemId.set(itemId, transcript);
      pruneMap(asrState.finalTranscriptsByItemId);
    }
  } else {
    targetUtterance.finalSegments.push(transcript);
  }
  targetUtterance.partialText = "";
} else {
  targetUtterance.partialText = transcript;
}
const shouldLogPartial =
  !isFinal &&
  transcript !== asrState.lastPartialText &&
  now - Number(asrState.lastPartialLogAt || 0) >= 180;

if (isFinal || shouldLogPartial) {
  if (!isFinal) {
    asrState.lastPartialLogAt = now;
    asrState.lastPartialText = transcript;
  }
  store.logAction({
    kind: "voice_runtime",
    guildId: session.guildId,
    channelId: session.textChannelId,
    userId: transcriptSpeakerUserId ? String(transcriptSpeakerUserId).trim() : null,
    content: isFinal ? "openai_realtime_asr_final_segment" : "openai_realtime_asr_partial_segment",
    metadata: {
      sessionId: session.id,
      speakerName,
      transcript,
      eventType: eventType || null,
      itemId: itemId || null,
      previousItemId
    }
  });
}

try {
  deps.handleTranscriptOverlapSegment?.({
    session,
    userId: transcriptSpeakerUserId,
    speakerName,
    transcript,
    utteranceId: Math.max(0, Number(targetUtterance.id || 0)),
    isFinal,
    eventType: eventType || null,
    itemId: itemId || null,
    previousItemId
  });
} catch (error) {
  store.logAction({
    kind: "voice_error",
    guildId: session.guildId,
    channelId: session.textChannelId,
    userId: transcriptSpeakerUserId ? String(transcriptSpeakerUserId).trim() : null,
    content: `openai_realtime_asr_transcript_overlap_handler_failed: ${String(error?.message || error)}`,
    metadata: {
      sessionId: session.id,
      utteranceId: Math.max(0, Number(targetUtterance.id || 0)),
      isFinal,
      eventType: eventType || null
    }
  });
}

});

client.on("speech_started", (payload: Record<string, unknown>) => { if (session.ending) return; const now = Date.now(); const utteranceId = Math.max(0, Number(asrState.utterance?.id || 0)); asrState.speechActive = true; asrState.speechDetectedAt = now; asrState.speechDetectedUtteranceId = utteranceId; const speechUserId = mode === "shared" ? asrState.userId : (userId ? String(userId).trim() : null); store.logAction({ kind: "voice_runtime", guildId: session.guildId, channelId: session.textChannelId, userId: speechUserId || null, content: "openai_realtime_asr_speech_started", metadata: { sessionId: session.id, utteranceId: utteranceId || null, audioStartMs: Number.isFinite(Number(payload?.audioStartMs)) ? Math.max(0, Math.round(Number(payload.audioStartMs))) : null, itemId: normalizeInlineText(payload?.itemId, 180) || null } });

try {
  deps.handleSpeechStarted?.({
    session,
    userId: speechUserId || null,
    speakerName: deps.resolveVoiceSpeakerName(session, speechUserId || null),
    utteranceId: utteranceId || 0,
    audioStartMs: Number.isFinite(Number(payload?.audioStartMs))
      ? Math.max(0, Math.round(Number(payload.audioStartMs)))
      : null,
    itemId: normalizeInlineText(payload?.itemId, 180) || null,
    eventType: "input_audio_buffer.speech_started"
  });
} catch (error) {
  store.logAction({
    kind: "voice_error",
    guildId: session.guildId,
    channelId: session.textChannelId,
    userId: speechUserId || null,
    content: `openai_realtime_asr_speech_started_handler_failed: ${String(error?.message || error)}`,
    metadata: {
      sessionId: session.id,
      utteranceId: utteranceId || null
    }
  });
}

});

client.on("speech_stopped", (payload: Record<string, unknown>) => { if (session.ending) return; const now = Date.now(); const utteranceId = Math.max(0, Number(asrState.utterance?.id || 0)); asrState.speechActive = false; asrState.speechStoppedAt = now; asrState.speechStoppedUtteranceId = utteranceId; const speechUserId = mode === "shared" ? asrState.userId : (userId ? String(userId).trim() : null); const speakerName = resolveSpeaker(session, speechUserId) || "someone"; store.logAction({ kind: "voice_runtime", guildId: session.guildId, channelId: session.textChannelId, userId: speechUserId || null, content: "openai_realtime_asr_speech_stopped", metadata: { sessionId: session.id, utteranceId: utteranceId || null, audioEndMs: Number.isFinite(Number(payload?.audioEndMs)) ? Math.max(0, Math.round(Number(payload.audioEndMs))) : null, itemId: normalizeInlineText(payload?.itemId, 180) || null } });

try {
  deps.handleSpeechStopped?.({
    session,
    userId: speechUserId || null,
    speakerName,
    utteranceId: utteranceId || 0,
    audioEndMs: Number.isFinite(Number(payload?.audioEndMs))
      ? Math.max(0, Math.round(Number(payload.audioEndMs)))
      : null,
    itemId: normalizeInlineText(payload?.itemId, 180) || null,
    eventType: "input_audio_buffer.speech_stopped"
  });
} catch (error) {
  store.logAction({
    kind: "voice_error",
    guildId: session.guildId,
    channelId: session.textChannelId,
    userId: speechUserId || null,
    content: `openai_realtime_asr_speech_stopped_handler_failed: ${String(error?.message || error)}`,
    metadata: {
      sessionId: session.id,
      utteranceId: utteranceId || null
    }
  });
}

});

client.on("error_event", (payload: Record<string, unknown>) => { if (session.ending) return; const errorUserId = mode === "shared" ? asrState.userId : (userId ? String(userId).trim() : null); const code = String(payload?.code || "").trim() || null; const normalizedCode = String(code || "").trim().toLowerCase(); const message = String(payload?.message || "unknown error"); const isEmptyCommit = normalizedCode === "input_audio_buffer_commit_empty"; store.logAction({ kind: isEmptyCommit ? "voice_runtime" : "voice_error", guildId: session.guildId, channelId: session.textChannelId, userId: errorUserId || null, content: isEmptyCommit ? "openai_realtime_asr_commit_empty" : openai_realtime_asr_error: ${message}, metadata: { sessionId: session.id, code, param: (payload?.param as string) || null, message } }); });

client.on("socket_closed", (payload: Record<string, unknown>) => { if (session.ending) return; const closedUserId = mode === "shared" ? asrState.userId : (userId ? String(userId).trim() : null); store.logAction({ kind: "voice_runtime", guildId: session.guildId, channelId: session.textChannelId, userId: closedUserId || null, content: "openai_realtime_asr_socket_closed", metadata: { sessionId: session.id, code: Number(payload?.code || 0) || null, reason: String(payload?.reason || "").trim() || null } }); }); }

// ── Connect ──────────────────────────────────────────────────────────

export async function ensureAsrSessionConnected( mode: AsrBridgeMode, deps: AsrBridgeDeps, settings: Record<string, unknown> | null, userId: string ): Promise<AsrBridgeState | null> { const { session, appConfig, store } = deps; if (!session || session.ending) return null; const asrState = getAsrState(mode, session, userId); if (!asrState || asrPhaseIsClosing(asrState.phase)) return null;

const ws = asrState.client?.ws; if (ws && ws.readyState === 1) { // Ensure phase reflects the live connection if (asrState.phase === "idle" || asrState.phase === "connecting") { asrState.phase = "ready"; } return asrState; }

if (asrState.connectPromise) { try { await asrState.connectPromise; } catch { // The primary connect path already logged and cleaned up the failure. } return asrState.client ? asrState : null; }

const resolvedSettings = settings || session.settingsSnapshot || store.getSettings(); const { normalizedModel, language, prompt, noiseReduction } = resolveAsrModelParams(session, resolvedSettings); const logUserId = mode === "shared" ? "shared_asr" : String(userId || "").trim(); const runtimeLogger = createAsrRuntimeLogger(deps, logUserId); const client = new OpenAiRealtimeTranscriptionClient({ apiKey: appConfig.openaiApiKey, logger: runtimeLogger }); asrState.client = client; asrState.phase = "connecting";

asrState.connectPromise = (async () => { wireClientEvents(mode, client, asrState, deps, userId);

await client.connect({
  model: normalizedModel,
  inputAudioFormat: "pcm16",
  inputTranscriptionModel: normalizedModel,
  inputTranscriptionLanguage: language,
  inputTranscriptionPrompt: prompt,
  noiseReduction
});
asrState.connectedAt = Date.now();
asrState.phase = "ready";

// Flush any audio that was buffered while connecting
flushPendingAsrAudio(mode, session, deps, asrState, userId);

})();

try { await asrState.connectPromise; return asrState; } catch (error: unknown) { const errorUserId = mode === "shared" ? asrState.userId : String(userId || "").trim(); store.logAction({ kind: "voice_error", guildId: session.guildId, channelId: session.textChannelId, userId: errorUserId || null, content: openai_realtime_asr_connect_failed: ${String((error as Error)?.message || error)}, metadata: { sessionId: session.id } }); if (mode === "per_user") { await closePerUserAsrSession(session, deps, userId, "connect_failed"); } else { await closeSharedAsrSession(session, deps, "connect_failed"); } return null; } finally { asrState.connectPromise = null; } }

// ── Flush pending audio ──────────────────────────────────────────────

function flushPendingAsrAudio( mode: AsrBridgeMode, session: VoiceSession, deps: AsrBridgeDeps, asrState: AsrBridgeState | null = null, userId: string | null = null, utteranceId: number | null = null ) { const state = asrState || getAsrState(mode, session, userId || ""); if (!state || asrPhaseIsClosing(state.phase)) return; const client = state.client; if (!client || !client.ws || client.ws.readyState !== 1) return; const targetUtteranceId = Math.max( 0, Number( utteranceId !== null && utteranceId !== undefined ? utteranceId : state.utterance?.id || 0 ) ); if (!targetUtteranceId) return; const committingUtteranceId = Math.max(0, Number(state.committingUtteranceId || 0)); if ( asrPhaseIsCommitting(state.phase) && committingUtteranceId > 0 && targetUtteranceId !== committingUtteranceId ) { return; } const chunks = state.pendingAudioChunks; if (!chunks.length) return;

const remainingChunks: AsrPendingAudioChunk[] = []; let flushedBytes = 0; let flushedChunks = 0; let skippedUtteranceMismatch = 0; while (chunks.length > 0) { const entry = chunks.shift()!; if (!entry || !Buffer.isBuffer(entry.chunk)) continue; if (Number(entry.utteranceId || 0) !== targetUtteranceId) { remainingChunks.push(entry); skippedUtteranceMismatch += 1; continue; } try { client.appendInputAudioPcm(entry.chunk); flushedBytes += entry.chunk.length; flushedChunks += 1; } catch (error: unknown) { const errorUserId = mode === "shared" ? state.userId : (userId ? String(userId).trim() : null); deps.store.logAction({ kind: "voice_error", guildId: session.guildId, channelId: session.textChannelId, userId: errorUserId || null, content: openai_realtime_asr_audio_append_failed: ${String((error as Error)?.message || error)}, metadata: { sessionId: session.id } }); remainingChunks.push(entry); while (chunks.length > 0) { const pendingEntry = chunks.shift()!; if (!pendingEntry || !Buffer.isBuffer(pendingEntry.chunk)) continue; remainingChunks.push(pendingEntry); } break; } } state.pendingAudioChunks = remainingChunks; state.pendingAudioBytes = state.pendingAudioChunks.reduce( (total, pendingChunk) => total + Number(pendingChunk?.chunk?.length || 0), 0 );

// Track cumulative flush stats on the state for periodic reporting. state._flushAccumBytes = Math.max(0, Number(state._flushAccumBytes || 0)) + flushedBytes; state._flushAccumChunks = Math.max(0, Number(state._flushAccumChunks || 0)) + flushedChunks; state._flushAccumSkipped = Math.max(0, Number(state._flushAccumSkipped || 0)) + skippedUtteranceMismatch; const lastFlushLogAt = Number(state._lastFlushLogAt || 0); const flushLogIntervalMs = 2000; const now = Date.now(); if ( (now - lastFlushLogAt >= flushLogIntervalMs && state._flushAccumBytes > 0) || skippedUtteranceMismatch > 0 ) { const logUserId = mode === "shared" ? state.userId : (userId ? String(userId).trim() : null); deps.store.logAction({ kind: "voice_runtime", guildId: session.guildId, channelId: session.textChannelId, userId: logUserId || null, content: "openai_realtime_asr_audio_flushed", metadata: { sessionId: session.id, flushedBytes: state._flushAccumBytes, flushedChunks: state._flushAccumChunks, skippedUtteranceMismatch: state._flushAccumSkipped, remainingPendingBytes: state.pendingAudioBytes, targetUtteranceId, wsReadyState: client.ws?.readyState ?? null } }); state._flushAccumBytes = 0; state._flushAccumChunks = 0; state._flushAccumSkipped = 0; state._lastFlushLogAt = now; } }

// ── Begin utterance ──────────────────────────────────────────────────

export function beginAsrUtterance( mode: AsrBridgeMode, session: VoiceSession, deps: AsrBridgeDeps, settings: Record<string, unknown> | null, userId: string ): boolean { if (!session || session.ending) return false; const asrState = getAsrState(mode, session, userId); const normalizedUserId = String(userId || "").trim(); if (!asrState || !normalizedUserId) return false; if (asrPhaseIsClosing(asrState.phase)) return false;

// Shared mode: user lock — only one user can use the shared bridge at a time if (mode === "shared") { if (asrState.userId && asrState.userId !== normalizedUserId) return false; asrState.userId = normalizedUserId; }

if (asrState.idleTimer) { clearTimeout(asrState.idleTimer); asrState.idleTimer = null; }

asrState.utterance = createAsrUtteranceState(asrState.utterance?.id || 0); asrState.lastPartialText = ""; asrState.lastPartialLogAt = 0; asrState.speechActive = false; asrState.speechDetectedAt = 0; asrState.speechStoppedAt = 0; asrState.speechDetectedUtteranceId = 0; asrState.speechStoppedUtteranceId = 0;

void ensureAsrSessionConnected(mode, deps, settings, userId); return true; }

// ── Append audio ─────────────────────────────────────────────────────

export function appendAudioToAsr( mode: AsrBridgeMode, session: VoiceSession, deps: AsrBridgeDeps, settings: Record<string, unknown> | null, userId: string, pcmChunk: Buffer ): boolean { if (!session || session.ending) return false; const asrState = getAsrState(mode, session, userId); const normalizedUserId = String(userId || "").trim(); if (!asrState || asrPhaseIsClosing(asrState.phase) || !normalizedUserId) return false;

// Shared mode: user lock if (mode === "shared") { if (!asrState.userId) { asrState.userId = normalizedUserId; } else if (asrState.userId !== normalizedUserId) { return false; } }

const chunk = Buffer.isBuffer(pcmChunk) ? pcmChunk : Buffer.from(pcmChunk || []); if (!chunk.length) return false; asrState.lastAudioAt = Date.now(); asrState.utterance.bytesSent = Math.max(0, Number(asrState.utterance?.bytesSent || 0)) + chunk.length; const utteranceId = Math.max(0, Number(asrState.utterance?.id || 0)); if (!utteranceId) return false; const queuedChunk: AsrPendingAudioChunk = { utteranceId, chunk };

asrState.pendingAudioChunks.push(queuedChunk); asrState.pendingAudioBytes = Math.max(0, Number(asrState.pendingAudioBytes || 0)) + chunk.length; const maxBufferedBytes = 24_000 * 2 * 10; if (asrState.pendingAudioBytes > maxBufferedBytes && asrState.pendingAudioChunks.length > 1) { while (asrState.pendingAudioChunks.length > 1 && asrState.pendingAudioBytes > maxBufferedBytes) { const dropped = asrState.pendingAudioChunks.shift(); asrState.pendingAudioBytes = Math.max( 0, asrState.pendingAudioBytes - Number(dropped?.chunk?.length || 0) ); } }

void ensureAsrSessionConnected(mode, deps, settings, userId).then((state) => { if (!state) return; flushPendingAsrAudio(mode, session, deps, state, userId, utteranceId); }); return true; }

export function discardAsrUtterance( mode: AsrBridgeMode, session: VoiceSession, userId: string ): boolean { if (!session || session.ending) return false; const normalizedUserId = String(userId || "").trim(); if (!normalizedUserId) return false; const asrState = getAsrState(mode, session, normalizedUserId); if (!asrState || asrPhaseIsClosing(asrState.phase)) return false;

if (mode === "shared" && asrState.userId && asrState.userId !== normalizedUserId) { return false; }

try { asrState.client?.clearInputAudioBuffer?.(); } catch { // ignore best-effort buffer reset }

asrState.pendingAudioChunks = []; asrState.pendingAudioBytes = 0; asrState.lastPartialText = ""; asrState.lastPartialLogAt = 0; asrState.speechActive = false; asrState.speechDetectedAt = 0; asrState.speechStoppedAt = 0; asrState.speechDetectedUtteranceId = 0; asrState.speechStoppedUtteranceId = 0; asrState.utterance = createAsrUtteranceState(asrState.utterance?.id || 0);

if (mode === "shared" && asrState.userId === normalizedUserId) { asrState.userId = null; } return true; }

// ── Commit utterance ─────────────────────────────────────────────────

export async function commitAsrUtterance( mode: AsrBridgeMode, deps: AsrBridgeDeps, settings: Record<string, unknown> | null, userId: string, captureReason = "stream_end" ): Promise<AsrCommitResult | null> { const { session, store } = deps; if (!session || session.ending) return null; const normalizedUserId = String(userId || "").trim(); if (!normalizedUserId) return null;

// For shared mode, connect first then validate the user lock let asrState: AsrBridgeState | null; if (mode === "shared") { asrState = await ensureAsrSessionConnected(mode, deps, settings, normalizedUserId); if (!asrState || asrPhaseIsClosing(asrState.phase)) return null; if (asrState.userId && asrState.userId !== normalizedUserId) return null; asrState.userId = normalizedUserId; } else { asrState = getAsrState(mode, session, normalizedUserId); if (!asrState || asrPhaseIsClosing(asrState.phase)) return null; }

const trackedUtterance = asrState.utterance; const trackedUtteranceId = Math.max(0, Number(trackedUtterance?.id || 0)); if (!trackedUtteranceId) return null;

const transcriptionModelPrimary = normalizeOpenAiRealtimeTranscriptionModel( session.openAiPerUserAsrModel, OPENAI_REALTIME_DEFAULT_TRANSCRIPTION_MODEL ); const planReason = mode === "per_user" ? "openai_realtime_per_user_transcription" : "openai_realtime_shared_transcription"; const utteranceBytesSent = Math.max(0, Number(trackedUtterance?.bytesSent || 0)); const minCommitBytes = getRealtimeCommitMinimumBytes( session.mode, Number(session.realtimeInputSampleRateHz) || 24000 );

if (utteranceBytesSent < minCommitBytes) { if (utteranceBytesSent > 0) { store.logAction({ kind: "voice_runtime", guildId: session.guildId, channelId: session.textChannelId, userId: normalizedUserId, content: "openai_realtime_asr_commit_skipped_small_buffer", metadata: { sessionId: session.id, utteranceBytesSent, minCommitBytes, captureReason: String(captureReason || "stream_end") } }); } // Mode-specific cleanup for small buffer skip if (mode === "per_user") { scheduleAsrIdleClose(mode, session, deps, normalizedUserId); } else { if (asrState.userId === normalizedUserId) asrState.userId = null; } return { transcript: "", asrStartedAtMs: 0, asrCompletedAtMs: 0, transcriptionModelPrimary, transcriptionModelFallback: null, transcriptionPlanReason: planReason, usedFallbackModel: false, captureReason: String(captureReason || "stream_end"), transcriptLogprobs: null }; }

// Per-user: connect now (shared already connected above) if (mode === "per_user") { asrState.phase = "committing"; asrState.committingUtteranceId = trackedUtteranceId; asrState.committingUtterance = trackedUtterance; const connectedState = await ensureAsrSessionConnected(mode, deps, settings, normalizedUserId); if (!connectedState || connectedState !== asrState || asrPhaseIsClosing(asrState.phase)) { asrState.phase = "ready"; asrState.committingUtteranceId = 0; asrState.committingUtterance = null; return null; } } else { asrState.phase = "committing"; asrState.committingUtteranceId = trackedUtteranceId; asrState.committingUtterance = trackedUtterance; }

flushPendingAsrAudio(mode, session, deps, asrState, normalizedUserId, trackedUtteranceId);

const asrStartedAtMs = Date.now(); try { if (mode === "shared") { // Shared mode: register commit request, commit, wait for item mapping + transcript const commitRequestId = ${Date.now()}-${Math.random().toString(36).slice(2, 10)}; prunePendingCommitRequests(asrState); asrState.pendingCommitRequests.push({ id: commitRequestId, userId: normalizedUserId, requestedAt: Date.now() }); asrState.client?.commitInputAudioBuffer?.(); const committedItemId = await waitForSharedAsrCommittedItem( session, asrState, normalizedUserId, commitRequestId ); const transcript = await waitForSharedAsrTranscriptByItem( session, asrState, committedItemId ); const asrCompletedAtMs = Date.now();

  if (asrState.utterance === trackedUtterance) trackedUtterance.bytesSent = 0;
  if (asrState.userId === normalizedUserId) asrState.userId = null;

  // Shared-mode streaming fallback when committed buffer was empty
  let resolvedTranscript = transcript;
  if (!resolvedTranscript && trackedUtterance) {
    const streamingFinal = normalizeVoiceText(
      Array.isArray(trackedUtterance.finalSegments) ? trackedUtterance.finalSegments.join(" ") : "",
      STT_TRANSCRIPT_MAX_CHARS_LOCAL
    );
    const streamingPartial = normalizeVoiceText(
      trackedUtterance.partialText || "",
      STT_TRANSCRIPT_MAX_CHARS_LOCAL
    );
    resolvedTranscript = streamingFinal || streamingPartial;
    if (resolvedTranscript) {
      store.logAction({
        kind: "voice_runtime",
        guildId: session.guildId,
        channelId: session.textChannelId,
        userId: normalizedUserId,
        content: "openai_realtime_asr_streaming_fallback_used",
        metadata: {
          sessionId: session.id,
          transcriptChars: resolvedTranscript.length,
          source: streamingFinal ? "final_segments" : "partial_text",
          captureReason: String(captureReason || "stream_end")
        }
      });
    }
  }

  if (!resolvedTranscript) {
    store.logAction({
      kind: "voice_runtime",
      guildId: session.guildId,
      channelId: session.textChannelId,
      userId: normalizedUserId,
      content: "voice_realtime_transcription_empty",
      metadata: {
        sessionId: session.id,
        source: "openai_realtime_asr",
        model: transcriptionModelPrimary,
        captureReason: String(captureReason || "stream_end"),
        trackedUtteranceId: trackedUtteranceId || null,
        activeUtteranceId: Math.max(0, Number(asrState.utterance?.id || 0)) || null,
        finalSegmentCount: Array.isArray(trackedUtterance?.finalSegments)
          ? trackedUtterance.finalSegments.length
          : 0,
        partialChars: String(trackedUtterance?.partialText || "").trim().length
      }
    });
  }

  return {
    transcript: resolvedTranscript,
    asrStartedAtMs,
    asrCompletedAtMs,
    transcriptionModelPrimary,
    transcriptionModelFallback: null,
    transcriptionPlanReason: planReason,
    usedFallbackModel: false,
    captureReason: String(captureReason || "stream_end"),
    transcriptLogprobs: collectSegmentLogprobs(trackedUtterance?.finalSegmentEntries)
  };
} else {
  // Per-user mode: commit and wait for transcript settle
  asrState.client?.commitInputAudioBuffer?.();
  const transcript = await waitForAsrTranscriptSettle(session, asrState, trackedUtterance);
  const asrCompletedAtMs = Date.now();

  scheduleAsrIdleClose(mode, session, deps, normalizedUserId);
  if (trackedUtterance) trackedUtterance.bytesSent = 0;

  // Circuit breaker: track consecutive empty commits with substantial audio.
  // A silently dead WebSocket will produce zero transcripts indefinitely.
  if (!transcript && utteranceBytesSent >= ASR_EMPTY_COMMIT_MIN_BYTES) {
    asrState.consecutiveEmptyCommits = Math.max(0, Number(asrState.consecutiveEmptyCommits || 0)) + 1;
    if (asrState.consecutiveEmptyCommits >= ASR_EMPTY_COMMIT_RECONNECT_THRESHOLD) {
      asrState.consecutiveEmptyCommits = 0;
      store.logAction({
        kind: "voice_error",
        guildId: session.guildId,
        channelId: session.textChannelId,
        userId: normalizedUserId,
        content: "openai_realtime_asr_circuit_breaker_reconnect",
        metadata: {
          sessionId: session.id,
          threshold: ASR_EMPTY_COMMIT_RECONNECT_THRESHOLD,
          captureReason: String(captureReason || "stream_end")
        }
      });
      // Force-close the dead session; next capture will reconnect.
      void closePerUserAsrSession(session, deps, normalizedUserId, "circuit_breaker").catch((error: unknown) => {
        logAsyncAsrCloseFailure(deps, session, {
          userId: normalizedUserId,
          reason: "circuit_breaker",
          mode: "per_user",
          error
        });
      });
    }
  } else if (transcript) {
    asrState.consecutiveEmptyCommits = 0;
  }

  if (!transcript) {
    store.logAction({
      kind: "voice_runtime",
      guildId: session.guildId,
      channelId: session.textChannelId,
      userId: normalizedUserId,
      content: "voice_realtime_transcription_empty",
      metadata: {
        sessionId: session.id,
        source: "openai_realtime_asr",
        model: transcriptionModelPrimary,
        captureReason: String(captureReason || "stream_end"),
        trackedUtteranceId: trackedUtteranceId || null,
        activeUtteranceId: Math.max(0, Number(asrState.utterance?.id || 0)) || null,
        finalSegmentCount: Array.isArray(trackedUtterance?.finalSegments)
          ? trackedUtterance.finalSegments.length
          : 0,
        partialChars: String(trackedUtterance?.partialText || "").trim().length
      }
    });
  }

  return {
    transcript,
    asrStartedAtMs,
    asrCompletedAtMs,
    transcriptionModelPrimary,
    transcriptionModelFallback: null,
    transcriptionPlanReason: planReason,
    usedFallbackModel: false,
    captureReason: String(captureReason || "stream_end"),
    transcriptLogprobs: collectSegmentLogprobs(trackedUtterance?.finalSegmentEntries)
  };
}

} catch (error: unknown) { store.logAction({ kind: "voice_error", guildId: session.guildId, channelId: session.textChannelId, userId: normalizedUserId, content: openai_realtime_asr_commit_failed: ${String((error as Error)?.message || error)}, metadata: { sessionId: session.id } }); return null; } finally { // Transition back to ready unless we're already closing/idle if (asrState.phase === "committing") { asrState.phase = "ready"; } asrState.committingUtteranceId = 0; asrState.committingUtterance = null; const activeUtteranceId = Math.max(0, Number(asrState.utterance?.id || 0)); if (activeUtteranceId > 0) { flushPendingAsrAudio(mode, session, deps, asrState, normalizedUserId, activeUtteranceId); } } }

// ── Idle close scheduling ────────────────────────────────────────────

export function scheduleAsrIdleClose( mode: AsrBridgeMode, session: VoiceSession, deps: AsrBridgeDeps, userId: string ) { if (!session || session.ending) return; const asrState = getAsrState(mode, session, userId); if (!asrState) return; if (asrState.idleTimer) { clearTimeout(asrState.idleTimer); asrState.idleTimer = null; } const ttlMs = Math.max( 1_000, Number(session.openAiAsrSessionIdleTtlMs || OPENAI_ASR_SESSION_IDLE_TTL_MS) ); asrState.idleTimer = setTimeout(() => { asrState.idleTimer = null; if (mode === "per_user") { void closePerUserAsrSession(session, deps, userId, "idle_ttl").catch((error: unknown) => { logAsyncAsrCloseFailure(deps, session, { userId, reason: "idle_ttl", mode: "per_user", error }); }); } else { void closeSharedAsrSession(session, deps, "idle_ttl").catch((error: unknown) => { logAsyncAsrCloseFailure(deps, session, { userId: session.openAiSharedAsrState?.userId || null, reason: "idle_ttl", mode: "shared", error }); }); } }, ttlMs); }

// ── Close sessions ───────────────────────────────────────────────────

async function closePerUserAsrSession( session: VoiceSession, deps: AsrBridgeDeps, userId: string, reason = "manual" ) { if (!session) return; if (!(session.openAiAsrSessions instanceof Map)) return; const normalizedUserId = String(userId || "").trim(); if (!normalizedUserId) return; const state = session.openAiAsrSessions.get(normalizedUserId) as AsrBridgeState | undefined; if (!state) return; state.phase = "closing";

if (state.idleTimer) { clearTimeout(state.idleTimer); state.idleTimer = null; } session.openAiAsrSessions.delete(normalizedUserId);

try { await state.client?.close?.(); } catch { // ignore } state.phase = "idle";

deps.store.logAction({ kind: "voice_runtime", guildId: session.guildId, channelId: session.textChannelId, userId: normalizedUserId, content: "openai_realtime_asr_session_closed", metadata: { sessionId: session.id, sessionScope: "per_user", reason: String(reason || "manual") } }); }

export async function closeAllPerUserAsrSessions( session: VoiceSession, deps: AsrBridgeDeps, reason = "session_end" ) { if (!session) return; if (!(session.openAiAsrSessions instanceof Map)) return; if (session.openAiAsrSessions.size <= 0) return; const userIds = [...session.openAiAsrSessions.keys()]; for (const userId of userIds) { await closePerUserAsrSession(session, deps, String(userId), reason); } }

export async function closeSharedAsrSession( session: VoiceSession, deps: AsrBridgeDeps, reason = "manual" ) { if (!session) return; const state = session.openAiSharedAsrState as AsrBridgeState | null; if (!state) return; state.phase = "closing";

if (state.idleTimer) { clearTimeout(state.idleTimer); state.idleTimer = null; } // Drain pending commit resolvers while (state.pendingCommitResolvers.length > 0) { const entry = state.pendingCommitResolvers.shift(); if (entry && typeof entry.resolve === "function") { entry.resolve(""); } } session.openAiSharedAsrState = null;

try { await state.client?.close?.(); } catch { // ignore } state.phase = "idle";

deps.store.logAction({ kind: "voice_runtime", guildId: session.guildId, channelId: session.textChannelId, userId: String(state.userId || "").trim() || null, content: "openai_realtime_asr_session_closed", metadata: { sessionId: session.id, sessionScope: "shared", reason: String(reason || "manual") } }); }

// ── Shared-mode: release active user + handoff ───────────────────────

export function releaseSharedAsrActiveUser(session: VoiceSession, userId: string | null = null) { if (!session || session.ending) return; const asrState = session.openAiSharedAsrState as AsrBridgeState | null; if (!asrState) return; const normalizedUserId = String(userId || "").trim(); if (!normalizedUserId || String(asrState.userId || "").trim() === normalizedUserId) { asrState.userId = null; } }

/**

  • After a shared-mode user releases the lock, try to hand the bridge
  • off to another user who has audio buffered but hasn't started shared
  • ASR streaming yet.
  • The beginUtterance and appendAudio callbacks let the session
  • manager delegate to the unified bridge functions (which need deps and
  • settings that only the session manager has). */ export function tryHandoffSharedAsr(opts: { session: VoiceSession; asrState: AsrBridgeState | null; deps: AsrBridgeDeps; settings: Record<string, unknown> | null; beginUtterance: (userId: string) => boolean; appendAudio: (userId: string, pcmChunk: Buffer) => boolean; releaseUser: (userId: string) => void; }): boolean { const { session, asrState, deps, beginUtterance, appendAudio, releaseUser } = opts; if (!session || session.ending) return false; if (!asrState || asrPhaseIsClosing(asrState.phase)) return false; if (asrState.userId) return false;

for (const [candidateUserId, captureState] of session.userCaptures) { if (!captureState || !candidateUserId) continue; if (Math.max(0, Number(captureState.promotedAt || 0)) <= 0) continue; if (Math.max(0, Number(captureState.sharedAsrBytesSent || 0)) > 0) continue; if (Math.max(0, Number(captureState.bytesSent || 0)) <= 0) continue;

const began = beginUtterance(candidateUserId);
if (!began) continue;

const chunks = Array.isArray(captureState.pcmChunks) ? captureState.pcmChunks : [];
if (chunks.length <= 0) {
  releaseUser(candidateUserId);
  continue;
}
let replayedChunks = 0;
let replayedBytes = 0;
for (const chunk of chunks) {
  if (!chunk || !chunk.length) continue;
  const appended = appendAudio(candidateUserId, chunk);
  if (appended) {
    replayedChunks += 1;
    replayedBytes += chunk.length;
    captureState.sharedAsrBytesSent =
      Math.max(0, Number(captureState.sharedAsrBytesSent || 0)) + chunk.length;
  }
}
if (replayedChunks <= 0 || replayedBytes <= 0) {
  releaseUser(candidateUserId);
  continue;
}

deps.store.logAction({
  kind: "voice_runtime",
  guildId: session.guildId,
  channelId: session.textChannelId,
  userId: candidateUserId,
  content: "openai_shared_asr_handoff",
  metadata: {
    sessionId: session.id,
    replayedChunks,
    replayedBytes
  }
});
return true;

} return false; }