src/voice/realtimeClientCore.ts

import type { ClientRequest, IncomingHttpHeaders, IncomingMessage } from "node:http"; import WebSocket from "ws";

const CONNECT_TIMEOUT_MS = 10_000; const MAX_OUTBOUND_EVENT_HISTORY = 8; const MAX_EVENT_PREVIEW_CHARS = 280; const MAX_HANDSHAKE_BODY_PREVIEW_CHARS = 600; const MAX_CONNECT_ERROR_MESSAGE_CHARS = 1800; const MAX_HANDSHAKE_CAPTURE_WAIT_MS = 1_500; const SENSITIVE_HEADER_RE = /^(?:authorization|proxy-authorization|cookie|set-cookie|x-api-key|x-goog-api-key)$/i;

export type RealtimeConnectErrorDiagnostics = { source: "unexpected_response" | "socket_error" | "timeout"; url: string | null; statusCode: number | null; statusMessage: string | null; headers: Record<string, string> | null; bodyPreview: string | null; };

type RealtimeConnectError = Error & { diagnostics?: RealtimeConnectErrorDiagnostics; };

export function compactObject(value) { const out = {}; for (const [key, entry] of Object.entries(value || {})) { if (entry === undefined || entry === null || entry === "") continue; out[key] = entry; } return out; }

export function extractAudioBase64(event) { const direct = event?.delta || event?.audio || event?.chunk; if (typeof direct === "string" && direct.trim()) { return direct.trim(); }

const nested = event?.audio?.delta || event?.audio?.chunk || event?.data?.audio || event?.data?.delta || event?.response?.audio?.delta || null;

if (typeof nested === "string" && nested.trim()) { return nested.trim(); }

return null; }

export function safeJsonPreview(value, maxChars = MAX_EVENT_PREVIEW_CHARS) { try { const serialized = JSON.stringify(value); if (serialized.length <= maxChars) return serialized; return ${serialized.slice(0, maxChars)}...; } catch { return "[unserializable_payload]"; } }

export function summarizeRealtimeSocketUrl(rawUrl) { const source = String(rawUrl || "").trim(); if (!source) return null; try { const parsed = new URL(source); const queryKeys = [...new Set([...parsed.searchParams.keys()].map((key) => String(key || "").trim()).filter(Boolean))]; const query = queryKeys.length ? ?${queryKeys.map((key) => ${encodeURIComponent(key)}=[redacted]).join("&")} : ""; return ${parsed.protocol}//${parsed.host}${parsed.pathname}${query}; } catch { const compact = source.replace(/\s+/g, " ").trim(); return compact.length > 240 ? ${compact.slice(0, 240)}... : compact; } }

function toSingleHeaderValue(value) { if (Array.isArray(value)) { const rows = value.map((entry) => String(entry || "").trim()).filter(Boolean); if (!rows.length) return null; return rows.join(", "); } const single = String(value || "").trim(); return single || null; }

export function sanitizeHandshakeHeaders(headers: IncomingHttpHeaders = {}) { const out: Record<string, string> = {}; for (const [rawName, rawValue] of Object.entries(headers || {})) { const name = String(rawName || "").trim().toLowerCase(); if (!name) continue; const singleValue = toSingleHeaderValue(rawValue); if (!singleValue) continue; if (SENSITIVE_HEADER_RE.test(name)) { out[name] = "[redacted]"; continue; } const compact = singleValue.replace(/\s+/g, " ").trim(); out[name] = compact.length > 320 ? ${compact.slice(0, 320)}... : compact; } return Object.keys(out).length ? out : null; }

function normalizeBodyPreview(value) { const compact = String(value || "").replace(/\s+/g, " ").trim(); if (!compact) return null; return compact.length > MAX_HANDSHAKE_BODY_PREVIEW_CHARS ? ${compact.slice(0, MAX_HANDSHAKE_BODY_PREVIEW_CHARS)}... : compact; }

function createRealtimeConnectError({ connectErrorPrefix, diagnostics, baseMessage = "" }: { connectErrorPrefix: string; diagnostics: RealtimeConnectErrorDiagnostics; baseMessage?: string; }): RealtimeConnectError { const details: string[] = []; const normalizedBase = String(baseMessage || "").replace(/\s+/g, " ").trim(); if (normalizedBase) { details.push(normalizedBase); } else if (diagnostics.source === "timeout") { details.push("socket connect timed out"); } else if (diagnostics.source === "unexpected_response") { const statusCode = Number.isFinite(Number(diagnostics.statusCode)) ? Number(diagnostics.statusCode) : null; const statusMessage = String(diagnostics.statusMessage || "").trim(); const statusSummary = statusCode ? statusMessage ? unexpected handshake response HTTP ${statusCode} ${statusMessage} : unexpected handshake response HTTP ${statusCode} : "unexpected handshake response"; details.push(statusSummary); } else { details.push("socket error during websocket connect"); }

if (diagnostics.url) { details.push(url=${diagnostics.url}); } if (diagnostics.headers) { details.push(headers=${safeJsonPreview(diagnostics.headers, 620)}); } if (diagnostics.bodyPreview) { details.push(body=${diagnostics.bodyPreview}); }

let message = ${connectErrorPrefix}: ${details.join("; ")}.trim(); if (message.length > MAX_CONNECT_ERROR_MESSAGE_CHARS) { message = ${message.slice(0, MAX_CONNECT_ERROR_MESSAGE_CHARS)}...; }

const error: RealtimeConnectError = new Error(message); error.name = "RealtimeSocketConnectError"; error.diagnostics = diagnostics; return error; }

export function getRealtimeConnectErrorDiagnostics(error): RealtimeConnectErrorDiagnostics | null { if (!error || typeof error !== "object") return null; const candidate = (error as RealtimeConnectError).diagnostics; if (!candidate || typeof candidate !== "object") return null; const source = String(candidate.source || "").trim(); if ( source !== "unexpected_response" && source !== "socket_error" && source !== "timeout" ) { return null; } return { source, url: candidate.url ? String(candidate.url) : null, statusCode: Number.isFinite(Number(candidate.statusCode)) ? Number(candidate.statusCode) : null, statusMessage: candidate.statusMessage ? String(candidate.statusMessage) : null, headers: candidate.headers && typeof candidate.headers === "object" ? Object.fromEntries( Object.entries(candidate.headers).map(([key, value]) => [String(key), String(value)]) ) : null, bodyPreview: candidate.bodyPreview ? String(candidate.bodyPreview) : null }; }

async function readHandshakeBodyPreview(response: IncomingMessage) { return await new Promise((resolve) => { let settled = false; let body = "";

const finish = () => {
  if (settled) return;
  settled = true;
  clearTimeout(waitTimer);
  response.removeListener("data", onData);
  resolve(body);
};

const onData = (chunk: Buffer | string) => {
  if (body.length >= MAX_HANDSHAKE_BODY_PREVIEW_CHARS) return;
  body += String(chunk || "");
  if (body.length >= MAX_HANDSHAKE_BODY_PREVIEW_CHARS) {
    body = body.slice(0, MAX_HANDSHAKE_BODY_PREVIEW_CHARS);
  }
};

const waitTimer = setTimeout(() => {
  finish();
}, MAX_HANDSHAKE_CAPTURE_WAIT_MS);

response.setEncoding("utf8");
response.on("data", onData);
response.once("end", finish);
response.once("close", finish);
response.once("error", finish);

}); }

export function markRealtimeConnected(client, ws) { client.ws = ws; client.connectedAt = Date.now(); client.lastEventAt = Date.now(); client.lastError = null; }

export function handleRealtimeSocketError(client, error, { logEvent }) { client.lastEventAt = Date.now(); client.lastError = String(error?.message || error); client.log("warn", logEvent, { error: client.lastError }); client.emit("socket_error", { message: client.lastError }); }

export function handleRealtimeSocketClose(client, code, reasonBuffer, { logEvent, onClose = null }) { client.lastEventAt = Date.now(); client.lastCloseCode = Number(code) || null; client.lastCloseReason = reasonBuffer ? String(reasonBuffer) : null; if (typeof onClose === "function") { onClose(); } client.log("info", logEvent, { code: client.lastCloseCode, reason: client.lastCloseReason }); client.emit("socket_closed", { code: client.lastCloseCode, reason: client.lastCloseReason }); }

function recordOutboundEvent(client, { payload, eventType, summarizeOutboundPayload, skipHistoryEventType = null }) { const summarizedPayload = summarizeOutboundPayload(payload); const event = compactObject({ type: eventType, at: client.lastOutboundEventAt ? new Date(client.lastOutboundEventAt).toISOString() : null, payload: summarizedPayload }); client.lastOutboundEvent = event; if (skipHistoryEventType && eventType === skipHistoryEventType) return; client.recentOutboundEvents.push(event); if (client.recentOutboundEvents.length > MAX_OUTBOUND_EVENT_HISTORY) { client.recentOutboundEvents = client.recentOutboundEvents.slice(-MAX_OUTBOUND_EVENT_HISTORY); } }

export function sendRealtimePayload(client, { payload, eventType = null, summarizeOutboundPayload, skipHistoryEventType = null, skipLogEventType = null, logEvent, socketNotOpenMessage }) { if (!client.ws || client.ws.readyState !== WebSocket.OPEN) { throw new Error(socketNotOpenMessage); }

const resolvedEventType = String(eventType || payload?.type || "unknown"); client.lastOutboundEventType = resolvedEventType; client.lastOutboundEventAt = Date.now(); recordOutboundEvent(client, { payload, eventType: resolvedEventType, summarizeOutboundPayload, skipHistoryEventType });

if (!skipLogEventType || resolvedEventType !== skipLogEventType) { client.log("info", logEvent, { ...(client.lastOutboundEvent || { type: resolvedEventType }) }); }

client.ws.send(JSON.stringify(payload)); }

export async function openRealtimeSocket({ url, headers, timeoutMs = CONNECT_TIMEOUT_MS, timeoutMessage, connectErrorPrefix }) { return await new Promise((resolve, reject) => { let settled = false; const summarizedUrl = summarizeRealtimeSocketUrl(url);

const ws = new WebSocket(String(url), {
  headers: headers || {},
  handshakeTimeout: timeoutMs
});

const timeout = setTimeout(() => {
  if (settled) return;
  settled = true;
  try {
    ws.terminate();
  } catch {
    // ignore
  }
  reject(
    createRealtimeConnectError({
      connectErrorPrefix,
      diagnostics: {
        source: "timeout",
        url: summarizedUrl,
        statusCode: null,
        statusMessage: null,
        headers: null,
        bodyPreview: null
      },
      baseMessage: timeoutMessage
    })
  );
}, timeoutMs + 1000);

ws.once("open", () => {
  if (settled) return;
  settled = true;
  clearTimeout(timeout);
  resolve(ws);
});

ws.once("unexpected-response", (request: ClientRequest, response: IncomingMessage) => {
  if (settled) return;
  settled = true;
  clearTimeout(timeout);
  void readHandshakeBodyPreview(response)
    .then((body) => {
      reject(
        createRealtimeConnectError({
          connectErrorPrefix,
          diagnostics: {
            source: "unexpected_response",
            url: summarizedUrl,
            statusCode: Number(response?.statusCode) || null,
            statusMessage: String(response?.statusMessage || "").trim() || null,
            headers: sanitizeHandshakeHeaders(response?.headers || {}),
            bodyPreview: normalizeBodyPreview(body)
          }
        })
      );
    })
    .finally(() => {
      try {
        response.destroy();
      } catch {
        // ignore
      }
      try {
        request.destroy();
      } catch {
        // ignore
      }
      try {
        ws.terminate();
      } catch {
        // ignore
      }
    });
});

// Keep this listener attached for the socket lifetime so follow-up
// handshake/teardown errors cannot surface as unhandled "error" events.
ws.on("error", (error) => {
  if (settled) return;
  settled = true;
  clearTimeout(timeout);
  reject(
    createRealtimeConnectError({
      connectErrorPrefix,
      diagnostics: {
        source: "socket_error",
        url: summarizedUrl,
        statusCode: null,
        statusMessage: null,
        headers: null,
        bodyPreview: null
      },
      baseMessage: String(error?.message || error)
    })
  );
});

}); }

export async function closeRealtimeSocket(ws) { if (!ws || ws.readyState === WebSocket.CLOSED) return;

await new Promise((resolve) => { let settled = false; const done = () => { if (settled) return; settled = true; ws.removeAllListeners("close"); resolve(undefined); };

ws.once("close", done);
try {
  ws.close(1000, "session_ended");
} catch {
  done();
}

setTimeout(() => {
  try {
    ws.terminate();
  } catch {
    // ignore
  }
  done();
}, 1500);

}); }

export function buildCommonRealtimeState(client) { return { connected: Boolean(client.ws && client.ws.readyState === WebSocket.OPEN), connectedAt: client.connectedAt ? new Date(client.connectedAt).toISOString() : null, lastEventAt: client.lastEventAt ? new Date(client.lastEventAt).toISOString() : null, sessionId: client.sessionId || null, lastError: client.lastError || null, lastCloseCode: client.lastCloseCode, lastCloseReason: client.lastCloseReason, lastOutboundEventType: client.lastOutboundEventType || null, lastOutboundEventAt: client.lastOutboundEventAt ? new Date(client.lastOutboundEventAt).toISOString() : null, lastOutboundEvent: client.lastOutboundEvent || null, recentOutboundEvents: Array.isArray(client.recentOutboundEvents) ? client.recentOutboundEvents.slice(-4) : [] }; }