src/voice/openaiRealtimeTranscriptionClient.ts

import { EventEmitter } from "node:events"; import WebSocket from "ws"; import { buildCommonRealtimeState, closeRealtimeSocket, compactObject, handleRealtimeSocketClose, handleRealtimeSocketError, markRealtimeConnected, openRealtimeSocket, sendRealtimePayload } from "./realtimeClientCore.ts"; import { DEFAULT_OPENAI_BASE_URL, OPENAI_REALTIME_DEFAULT_TRANSCRIPTION_MODEL, normalizeOpenAiBaseUrl, normalizeOpenAiRealtimeTranscriptionModel } from "./realtimeProviderNormalization.ts";

const TRANSCRIPT_DELTA_TYPES = new Set([ "conversation.item.input_audio_transcription.delta" ]);

const TRANSCRIPT_FINAL_TYPES = new Set([ "conversation.item.input_audio_transcription.completed" ]);

const INPUT_AUDIO_SPEECH_STARTED_TYPE = "input_audio_buffer.speech_started"; const INPUT_AUDIO_SPEECH_STOPPED_TYPE = "input_audio_buffer.speech_stopped"; // Server-side VAD is disabled. Turn boundaries are managed locally by // clankvox capture lifecycle (speakingStart/speakingEnd, idle timeout, // max-duration cap). The local system commits the full utterance audio // via input_audio_buffer.commit, avoiding server VAD fragmentation that // breaks whispered and quiet speech. const OPENAI_REALTIME_ASR_TURN_DETECTION = null;

export class OpenAiRealtimeTranscriptionClient extends EventEmitter { apiKey; baseUrl; logger; ws; connectedAt; lastEventAt; lastError; sessionId; lastCloseCode; lastCloseReason; lastOutboundEventType; lastOutboundEventAt; lastOutboundEvent; recentOutboundEvents; sessionConfig; committedInputAudioItems;

constructor({ apiKey, baseUrl = DEFAULT_OPENAI_BASE_URL, logger = null }) { super(); this.apiKey = String(apiKey || "").trim(); this.baseUrl = String(baseUrl || DEFAULT_OPENAI_BASE_URL).trim() || DEFAULT_OPENAI_BASE_URL; this.logger = typeof logger === "function" ? logger : null; this.ws = null; this.connectedAt = 0; this.lastEventAt = 0; this.lastError = null; this.sessionId = null; this.lastCloseCode = null; this.lastCloseReason = null; this.lastOutboundEventType = null; this.lastOutboundEventAt = 0; this.lastOutboundEvent = null; this.recentOutboundEvents = []; this.sessionConfig = null; this.committedInputAudioItems = new Map(); }

async connect({ model = OPENAI_REALTIME_DEFAULT_TRANSCRIPTION_MODEL, inputAudioFormat = "pcm16", inputTranscriptionModel = OPENAI_REALTIME_DEFAULT_TRANSCRIPTION_MODEL, inputTranscriptionLanguage = "", inputTranscriptionPrompt = "", noiseReduction = "near_field" } = {}) { if (!this.apiKey) { throw new Error("Missing OPENAI_API_KEY for OpenAI realtime transcription runtime."); }

if (this.ws && this.ws.readyState === WebSocket.OPEN) {
  return this.getState();
}

const resolvedInputAudioFormat = normalizeOpenAiRealtimeAudioFormat(inputAudioFormat);
const resolvedInputTranscriptionModel = normalizeOpenAiRealtimeTranscriptionModel(
  inputTranscriptionModel || model,
  OPENAI_REALTIME_DEFAULT_TRANSCRIPTION_MODEL
);
const resolvedInputTranscriptionLanguage = String(inputTranscriptionLanguage || "")
  .trim()
  .toLowerCase()
  .replace(/_/g, "-")
  .slice(0, 24);
const resolvedInputTranscriptionPrompt = String(inputTranscriptionPrompt || "")
  .replace(/\s+/g, " ")
  .trim()
  .slice(0, 280);

this.committedInputAudioItems.clear();
const ws = await this.openSocket(this.buildRealtimeUrl());
markRealtimeConnected(this, ws);

ws.on("message", (payload) => {
  this.lastEventAt = Date.now();
  this.handleIncoming(payload);
});

ws.on("error", (error) => {
  handleRealtimeSocketError(this, error, {
    logEvent: "openai_realtime_asr_ws_error"
  });
});

ws.on("close", (code, reasonBuffer) => {
  handleRealtimeSocketClose(this, code, reasonBuffer, {
    logEvent: "openai_realtime_asr_ws_closed"
  });
});

const resolvedNoiseReduction = String(noiseReduction || "").trim().toLowerCase();

this.sessionConfig = {
  inputAudioFormat: resolvedInputAudioFormat,
  inputTranscriptionModel: resolvedInputTranscriptionModel,
  inputTranscriptionLanguage: resolvedInputTranscriptionLanguage,
  inputTranscriptionPrompt: resolvedInputTranscriptionPrompt,
  noiseReduction: resolvedNoiseReduction === "far_field" ? "far_field"
    : resolvedNoiseReduction === "off" ? "off"
    : "near_field"
};
this.sendSessionUpdate();
return this.getState();

}

buildRealtimeUrl() { const base = normalizeOpenAiBaseUrl(this.baseUrl); const url = new URL(base); url.protocol = url.protocol === "http:" ? "ws:" : "wss:"; const basePath = url.pathname.replace(//+$/, ""); url.pathname = ${basePath}/realtime; url.searchParams.set("intent", "transcription"); return url.toString(); }

async openSocket(url): Promise { return await openRealtimeSocket({ url, headers: { "Content-Type": "application/json", Authorization: Bearer ${this.apiKey} }, timeoutMessage: "Timed out connecting to OpenAI realtime ASR after 10000ms.", connectErrorPrefix: "OpenAI realtime ASR connection failed" }); }

handleIncoming(payload) { let event = null;

try {
  event = JSON.parse(String(payload || ""));
} catch {
  return;
}

if (!event || typeof event !== "object") return;
this.emit("event", event);

if (event.type === "input_audio_buffer.committed") {
  const itemId = normalizeRealtimeItemId(event.item_id || event.item?.id);
  if (!itemId) return;
  const previousItemId = normalizeRealtimeItemId(event.previous_item_id);
  this.committedInputAudioItems.set(itemId, {
    previousItemId: previousItemId || null,
    at: Date.now()
  });
  // Bound memory for long sessions.
  if (this.committedInputAudioItems.size > 320) {
    const overflow = this.committedInputAudioItems.size - 320;
    let dropped = 0;
    for (const staleItemId of this.committedInputAudioItems.keys()) {
      this.committedInputAudioItems.delete(staleItemId);
      dropped += 1;
      if (dropped >= overflow) break;
    }
  }
  return;
}

if (
  event.type === "session.created" ||
  event.type === "session.updated" ||
  event.type === "transcription_session.created" ||
  event.type === "transcription_session.updated"
) {
  this.sessionId =
    event.session?.id ||
    event.transcription_session?.id ||
    (typeof event.id === "string" ? event.id : null) ||
    this.sessionId;
  this.log("info", "openai_realtime_asr_session_updated", { sessionId: this.sessionId });
  return;
}

if (event.type === "error") {
  const errorPayload = event.error && typeof event.error === "object" ? event.error : {};
  const message =
    event.error?.message || event.error?.code || event.message || "Unknown OpenAI realtime ASR error";
  this.lastError = String(message);
  this.log("warn", "openai_realtime_asr_error_event", {
    error: this.lastError,
    code: errorPayload?.code || null,
    type: event.type,
    param: errorPayload?.param || null,
    eventId: event.event_id || null
  });
  this.emit("error_event", {
    message: this.lastError,
    code: errorPayload?.code || null,
    param: errorPayload?.param || null,
    event
  });
  return;
}

const eventType = String(event.type || "");
if (
  eventType === INPUT_AUDIO_SPEECH_STARTED_TYPE ||
  eventType === INPUT_AUDIO_SPEECH_STOPPED_TYPE
) {
  this.emit(eventType === INPUT_AUDIO_SPEECH_STARTED_TYPE ? "speech_started" : "speech_stopped", {
    eventType,
    audioStartMs: Number.isFinite(Number(event.audio_start_ms))
      ? Math.max(0, Math.round(Number(event.audio_start_ms)))
      : null,
    audioEndMs: Number.isFinite(Number(event.audio_end_ms))
      ? Math.max(0, Math.round(Number(event.audio_end_ms)))
      : null,
    itemId: normalizeRealtimeItemId(event.item_id || event.item?.id) || null
  });
  return;
}

if (TRANSCRIPT_DELTA_TYPES.has(eventType) || TRANSCRIPT_FINAL_TYPES.has(eventType)) {
  const itemId = normalizeRealtimeItemId(event.item_id || event.item?.id);
  const previousItemId =
    normalizeRealtimeItemId(event.previous_item_id) ||
    (itemId ? this.committedInputAudioItems.get(itemId)?.previousItemId || null : null);
  const transcript =
    event.transcript ||
    event.text ||
    event.delta ||
    event?.item?.content?.[0]?.transcript ||
    "";
  const normalizedTranscript = String(transcript || "").trim();
  if (!normalizedTranscript) return;
  const isFinal = TRANSCRIPT_FINAL_TYPES.has(eventType);
  this.emit("transcript", {
    text: normalizedTranscript,
    eventType,
    final: isFinal,
    itemId: itemId || null,
    previousItemId: previousItemId || null,
    logprobs: isFinal && Array.isArray(event.logprobs) ? event.logprobs : null
  });
  return;
}

}

appendInputAudioPcm(audioBuffer) { if (!audioBuffer || !audioBuffer.length) return; this.appendInputAudioBase64(audioBuffer.toString("base64")); }

appendInputAudioBase64(audioBase64) { if (!audioBase64) return; this.send({ type: "input_audio_buffer.append", audio: String(audioBase64) }); }

commitInputAudioBuffer() { this.send({ type: "input_audio_buffer.commit" }); }

clearInputAudioBuffer() { this.send({ type: "input_audio_buffer.clear" }); }

send(payload) { sendRealtimePayload(this, { payload, eventType: String(payload?.type || "unknown"), summarizeOutboundPayload, skipHistoryEventType: "input_audio_buffer.append", skipLogEventType: "input_audio_buffer.append", logEvent: "openai_realtime_asr_event_sent", socketNotOpenMessage: "OpenAI realtime ASR socket is not open." }); }

updateTranscriptionGuidance({ language = "", prompt = "" } = {}) { if (!this.sessionConfig || typeof this.sessionConfig !== "object") { throw new Error("OpenAI realtime ASR session config is not initialized."); } this.sessionConfig = { ...this.sessionConfig, inputTranscriptionLanguage: String(language || "") .trim() .toLowerCase() .replace(/_/g, "-") .slice(0, 24), inputTranscriptionPrompt: String(prompt || "") .replace(/\s+/g, " ") .trim() .slice(0, 280) }; this.sendSessionUpdate(); }

sendSessionUpdate() { const session = this.sessionConfig && typeof this.sessionConfig === "object" ? this.sessionConfig : {}; const transcription = compactObject({ model: normalizeOpenAiRealtimeTranscriptionModel( session.inputTranscriptionModel, OPENAI_REALTIME_DEFAULT_TRANSCRIPTION_MODEL ), language: String(session.inputTranscriptionLanguage || "").trim() || null, prompt: String(session.inputTranscriptionPrompt || "").trim() || null }); const inputAudio = { format: normalizeOpenAiRealtimeAudioFormat(session.inputAudioFormat), noise_reduction: session.noiseReduction === "off" ? null : { type: session.noiseReduction === "far_field" ? "far_field" : "near_field" }, turn_detection: OPENAI_REALTIME_ASR_TURN_DETECTION, transcription }; this.send({ type: "session.update", session: { type: "transcription", audio: { input: inputAudio }, // Include logprobs so downstream can compute transcript confidence when needed. include: ["item.input_audio_transcription.logprobs"] } }); }

async close() { if (!this.ws) return; if (this.ws.readyState === WebSocket.CLOSED) { this.ws = null; return; } await closeRealtimeSocket(this.ws); this.ws = null; }

getState() { return { ...buildCommonRealtimeState(this) }; }

log(level, event, metadata = null) { if (!this.logger) return; this.logger({ level, event, metadata }); } }

function normalizeOpenAiRealtimeAudioFormat(value) { if (value && typeof value === "object" && !Array.isArray(value)) { const type = String(value.type || "") .trim() .toLowerCase(); if (type === "audio/pcm" || type === "pcm16") { const rate = Number(value.rate); return { type: "audio/pcm", rate: Number.isFinite(rate) && rate > 0 ? Math.floor(rate) : 24000 }; } if (type === "audio/pcmu" || type === "g711_ulaw") return { type: "audio/pcmu" }; if (type === "audio/pcma" || type === "g711_alaw") return { type: "audio/pcma" }; }

const normalized = String(value || "") .trim() .toLowerCase(); if (normalized === "audio/pcmu" || normalized === "g711_ulaw") return { type: "audio/pcmu" }; if (normalized === "audio/pcma" || normalized === "g711_alaw") return { type: "audio/pcma" };

return { type: "audio/pcm", rate: 24000 }; }

function normalizeRealtimeItemId(value) { const normalized = String(value || "").trim(); if (!normalized) return ""; return normalized.slice(0, 180); }

function summarizeOutboundPayload(payload) { if (!payload || typeof payload !== "object") return null; const type = String(payload.type || "unknown"); if (type === "input_audio_buffer.append") { const audioChars = typeof payload.audio === "string" ? payload.audio.length : null; return compactObject({ type, audioChars }); }

if (type === "input_audio_buffer.commit" || type === "input_audio_buffer.clear") { return { type }; }

if (type === "session.update") { const session = payload.session && typeof payload.session === "object" ? payload.session : {}; const audio = session.audio && typeof session.audio === "object" ? session.audio : {}; return compactObject({ type, sessionType: session.type || null, inputFormat: audio?.input?.format || null, inputTurnDetectionType: audio?.input?.turn_detection?.type || null, inputTranscriptionModel: audio?.input?.transcription?.model || null }); }

return { type }; }