/**
- Main-process (Bun) client for the clankvox Rust voice engine.
- Spawns clankvox, relays IPC messages, proxies the Discord gateway
- adapter so clankvox can join voice channels through the main
- process's gateway connection, and emits events for the session manager. */
import { EventEmitter } from "node:events"; import path from "node:path"; import type { StreamWatchVisualizerMode } from "../settings/voiceDashboardMappings.ts"; import type { TtsPlaybackState } from "./assistantOutputState.ts";
type ClankvoxProcess = ReturnType<typeof Bun.spawn<"pipe", "pipe", "inherit">>; type JsonRecord = Record<string, unknown>; type VoiceServerUpdatePayload = JsonRecord & { endpoint?: string | null; token?: string | null; }; type VoiceStateUpdatePayload = JsonRecord & { session_id?: string | null; channel_id?: string | null; user_id?: string | null; }; type ClankvoxTransportRole = "voice" | "stream_watch" | "stream_publish"; type ClankvoxTransportState = { role: ClankvoxTransportRole; status: string; reason: string | null; }; type ClankvoxVideoResolution = { width: number | null; height: number | null; type: string | null; }; export type ClankvoxVideoStreamDescriptor = { ssrc: number; rtxSsrc: number | null; rid: string | null; quality: number | null; streamType: string | null; active: boolean | null; maxBitrate: number | null; maxFramerate: number | null; maxResolution: ClankvoxVideoResolution | null; }; export type ClankvoxUserVideoState = { userId: string; audioSsrc: number | null; videoSsrc: number | null; codec: string | null; streams: ClankvoxVideoStreamDescriptor[]; }; export type ClankvoxUserVideoFrame = { userId: string; ssrc: number; codec: string; keyframe: boolean; frameBase64: string; rtpTimestamp: number; streamType: string | null; rid: string | null; daveDecrypted: boolean; }; type ClankvoxDecodedVideoFrame = { userId: string; ssrc: number; width: number; height: number; jpegBase64: string; rtpTimestamp: number; streamType: string | null; rid: string | null; /** Instantaneous coarse-luma diff score (0.0–1.0). / changeScore: number; /* EMA-smoothed change score for filtering single-frame noise. / emaChangeScore: number; /* True when instantaneous diff indicates a hard scene cut. */ isSceneCut: boolean; }; type ClankvoxUserVideoEnd = { userId: string; ssrc: number | null; }; type ClankvoxGuildLike = { shard?: { send(payload: JsonRecord): void; }; voiceAdapterCreator?: (callbacks: { onVoiceServerUpdate(data: VoiceServerUpdatePayload): void; onVoiceStateUpdate(data: VoiceStateUpdatePayload): void; }) => { destroy?: () => void; } | null | undefined; } | null; type ClankvoxSpawnOptions = { selfDeaf?: boolean; selfMute?: boolean; timeoutMs?: number; }; type ClankvoxIpcErrorCode = | "invalid_request" | "invalid_json" | "input_too_large" | "voice_connect_failed" | "stream_watch_connect_failed" | "stream_publish_connect_failed" | "voice_runtime_error"; type ClankvoxIpcError = { code: ClankvoxIpcErrorCode | null; message: string; }; type ClankvoxCommand = | { type: "join"; guildId: string; channelId: string; selfDeaf: boolean; selfMute: boolean; } | { type: "voice_server"; data: VoiceServerUpdatePayload } | { type: "voice_state"; data: VoiceStateUpdatePayload } | { type: "stream_watch_connect"; endpoint: string; token: string; serverId: string; sessionId: string; userId: string; daveChannelId: string; } | { type: "stream_watch_disconnect"; reason: string | null } | { type: "stream_publish_connect"; endpoint: string; token: string; serverId: string; sessionId: string; userId: string; daveChannelId: string; } | { type: "stream_publish_disconnect"; reason: string | null } | { type: "audio"; pcmBase64: string; sampleRate: number } | { type: "stop_playback" } | { type: "stop_tts_playback" } | { type: "subscribe_user"; userId: string; silenceDurationMs: number; sampleRate: number; } | { type: "unsubscribe_user"; userId: string } | { type: "subscribe_user_video"; userId: string; maxFramesPerSecond: number; preferredQuality: number; preferredPixelCount: number | null; preferredStreamType: string | null; jpegQuality: number | null; } | { type: "unsubscribe_user_video"; userId: string } | { type: "music_play"; url: string; resolvedDirectUrl: boolean; visualizerMode?: StreamWatchVisualizerMode | null; } | { type: "music_stop" } | { type: "music_pause" } | { type: "music_resume" } | { type: "music_set_gain"; target: number; fadeMs: number } | { type: "stream_publish_play"; url: string; resolvedDirectUrl: boolean } | { type: "stream_publish_play_visualizer"; url: string; resolvedDirectUrl: boolean; visualizerMode: Exclude<StreamWatchVisualizerMode, "off">; } | { type: "stream_publish_browser_start"; mimeType: string } | { type: "stream_publish_browser_frame"; mimeType: string; frameBase64: string; capturedAtMs: number; } | { type: "stream_publish_stop" } | { type: "stream_publish_pause" } | { type: "stream_publish_resume" } | { type: "destroy" };
type PendingTtsIngressChunk = { pcm: Buffer; sampleRate: number; offsetBytes: number; remainingOutputSamples: number; };
const TTS_INGRESS_TARGET_SAMPLES = 48_000 * 2; // Keep clankvox's live TTS buffer around 2s. const TTS_INGRESS_CHUNK_MS = 240; const TTS_INGRESS_RECHECK_MS = 60;
function isRecord(value: unknown): value is JsonRecord { return value !== null && typeof value === "object"; }
function asString(value: unknown): string | null { return typeof value === "string" ? value : null; }
function asNumber(value: unknown): number | null { return typeof value === "number" && Number.isFinite(value) ? value : null; }
function asBoolean(value: unknown): boolean | null { return typeof value === "boolean" ? value : null; }
function asClankvoxIpcErrorCode(value: unknown): ClankvoxIpcErrorCode | null { switch (value) { case "invalid_request": case "invalid_json": case "input_too_large": case "voice_connect_failed": case "stream_watch_connect_failed": case "stream_publish_connect_failed": case "voice_runtime_error": return value; default: return null; } }
function asTransportRole(value: unknown): ClankvoxTransportRole | null { switch (value) { case "voice": case "stream_watch": case "stream_publish": return value; default: return null; } }
function normalizeSampleRate(sampleRate: number): number { return Math.max(8_000, Math.floor(Number(sampleRate) || 24_000)); }
function clampEvenPcmByteLength(byteLength: number): number { const normalized = Math.max(0, Math.floor(Number(byteLength) || 0)); return normalized - (normalized % 2); }
function estimateOutputSamplesFromPcmBytes(byteLength: number, sampleRate: number): number { const normalizedRate = normalizeSampleRate(sampleRate); const normalizedBytes = clampEvenPcmByteLength(byteLength); if (normalizedBytes <= 0) return 0; const inputSamples = normalizedBytes / 2; return Math.max(0, Math.round((inputSamples * 48_000) / normalizedRate)); }
function estimatePcmBytesForOutputSamples(outputSamples: number, sampleRate: number): number { const normalizedRate = normalizeSampleRate(sampleRate); const normalizedOutputSamples = Math.max(0, Math.floor(Number(outputSamples) || 0)); if (normalizedOutputSamples <= 0) return 0; const inputSamples = Math.max(1, Math.floor((normalizedOutputSamples * normalizedRate) / 48_000)); return clampEvenPcmByteLength(inputSamples * 2); }
function estimatePcmBytesForDurationMs(durationMs: number, sampleRate: number): number { const normalizedRate = normalizeSampleRate(sampleRate); const normalizedDurationMs = Math.max(0, Math.floor(Number(durationMs) || 0)); if (normalizedDurationMs <= 0) return 0; const inputSamples = Math.max(1, Math.floor((normalizedRate * normalizedDurationMs) / 1000)); return clampEvenPcmByteLength(inputSamples * 2); }
function parseVideoResolution(value: unknown): ClankvoxVideoResolution | null { if (!isRecord(value)) return null; return { width: asNumber(value.width), height: asNumber(value.height), type: asString(value.type) }; }
function parseVideoStreamDescriptor(value: unknown): ClankvoxVideoStreamDescriptor | null { if (!isRecord(value)) return null; const ssrc = asNumber(value.ssrc); if (ssrc === null) return null; return { ssrc, rtxSsrc: asNumber(value.rtxSsrc), rid: asString(value.rid), quality: asNumber(value.quality), streamType: asString(value.streamType), active: asBoolean(value.active), maxBitrate: asNumber(value.maxBitrate), maxFramerate: asNumber(value.maxFramerate), maxResolution: parseVideoResolution(value.maxResolution) }; }
function parseUserVideoState(msg: JsonRecord): ClankvoxUserVideoState | null { const userId = asString(msg.userId); if (!userId) return null; return { userId, audioSsrc: asNumber(msg.audioSsrc), videoSsrc: asNumber(msg.videoSsrc), codec: asString(msg.codec), streams: Array.isArray(msg.streams) ? msg.streams .map((entry) => parseVideoStreamDescriptor(entry)) .filter((entry): entry is ClankvoxVideoStreamDescriptor => Boolean(entry)) : [] }; }
function parseUserVideoFrame(msg: JsonRecord): ClankvoxUserVideoFrame | null { const userId = asString(msg.userId); const ssrc = asNumber(msg.ssrc); const codec = asString(msg.codec); const frameBase64 = asString(msg.frameBase64); const rtpTimestamp = asNumber(msg.rtpTimestamp); const keyframe = asBoolean(msg.keyframe); if (!userId || ssrc === null || !codec || !frameBase64 || rtpTimestamp === null || keyframe === null) { return null; } return { userId, ssrc, codec, keyframe, frameBase64, rtpTimestamp, streamType: asString(msg.streamType), rid: asString(msg.rid), daveDecrypted: asBoolean(msg.daveDecrypted) ?? false }; }
function parseDecodedVideoFrame(msg: JsonRecord): ClankvoxDecodedVideoFrame | null { const userId = asString(msg.userId); const ssrc = asNumber(msg.ssrc); const width = asNumber(msg.width); const height = asNumber(msg.height); const jpegBase64 = asString(msg.jpegBase64); const rtpTimestamp = asNumber(msg.rtpTimestamp); if (!userId || ssrc === null || width === null || height === null || !jpegBase64 || rtpTimestamp === null) { return null; } return { userId, ssrc, width, height, jpegBase64, rtpTimestamp, streamType: asString(msg.streamType), rid: asString(msg.rid), changeScore: asNumber(msg.changeScore) ?? 0, emaChangeScore: asNumber(msg.emaChangeScore) ?? 0, isSceneCut: asBoolean(msg.isSceneCut) ?? false }; }
export class ClankvoxClient extends EventEmitter { private static liveClients = new Set(); private static processExitHandlersInstalled = false;
private child: ClankvoxProcess | null = null; private guildId: string; private channelId: string; private guild: ClankvoxGuildLike; private destroyed = false; private destroyPromise: Promise | null = null; private adapterCleanup: (() => void) | null = null; private stdoutBuffer: Buffer = Buffer.alloc(0); private lastPlaybackArmedReason: string | null = null; private lastTtsPlaybackState: TtsPlaybackState = "idle"; private lastTtsTelemetryAt = 0; /** Latest TTS buffer depth reported by clankvox (samples @ 48kHz) */ ttsBufferDepthSamples: number = 0; private estimatedBufferedTtsSamples = 0; private estimatedBufferedTtsSamplesAt = 0; private queuedTtsOutputSamples = 0; private queuedTtsIngress: PendingTtsIngressChunk[] = []; private ttsDrainTimer: ReturnType | null = null; private stdoutReaderController: AbortController | null = null; private _resolveExitWaiter: (() => void) | null = null; private _exitWaiterPromise: Promise | null = null; private lastVoiceSessionId: string | null = null; private lastVoiceStateUserId: string | null = null; logAction: ((action: { kind: string; guildId?: string | null; channelId?: string | null; userId?: string | null; content: string; metadata?: Record<string, unknown> }) => void) | null = null;
constructor(guildId: string, channelId: string, guild: ClankvoxGuildLike) { super(); ClankvoxClient.installProcessExitHandlers(); this.guildId = guildId; this.channelId = channelId; this.guild = guild; }
private log(kind: string, content: string, metadata?: Record<string, unknown>) { if (this.logAction) { try { this.logAction({ kind, content, metadata: metadata || {} }); } catch { // Structured logging failed — fall through to console } } }
static async spawn( guildId: string, channelId: string, guild: ClankvoxGuildLike, opts: ClankvoxSpawnOptions = {} ): Promise { const client = new ClankvoxClient(guildId, channelId, guild); await client._spawn(opts); return client; }
private async _spawn(opts: ClankvoxSpawnOptions) { const moduleDir = path.dirname(decodeURIComponent(new URL(import.meta.url).pathname)); const clankvoxDir = path.resolve( moduleDir, "clankvox" );
// Prefer the pre-built Rust binary; fall back to cargo run for development.
// On Windows cargo produces clankvox.exe; on Unix it's just clankvox.
const binName = process.platform === "win32" ? "clankvox.exe" : "clankvox";
const releaseBin = path.join(clankvoxDir, "target", "release", binName);
const usePrebuilt = await Bun.file(releaseBin).exists();
const spawnEnv = {
...process.env,
// audiopus_sys needs these to build opus from source on arm64 macOS
// (the homebrew x86 opus won't link). These are no-ops if opus is already
// linked or the binary is pre-built.
OPUS_STATIC: "1",
OPUS_NO_PKG: "1",
// CMake 4.x removed compat with cmake_minimum_required < 3.5.
CMAKE_POLICY_VERSION_MINIMUM: "3.5",
};
// Set up exit-waiter before spawning so _handleExit can resolve it
this._exitWaiterPromise = new Promise<void>((resolve) => {
this._resolveExitWaiter = resolve;
});
try {
if (usePrebuilt) {
this.child = Bun.spawn([releaseBin], {
cwd: clankvoxDir,
stdin: "pipe",
stdout: "pipe",
stderr: "inherit",
env: spawnEnv,
onExit: (_proc, exitCode, signalCode) => {
this._handleExit(exitCode, signalCode);
},
});
} else {
this.log("voice_lifecycle", "clankvox_prebuilt_binary_not_found");
console.warn(
"[clankvox] Pre-built binary not found, using cargo run --release (slow first start)"
);
this.child = Bun.spawn(["cargo", "run", "--release"], {
cwd: clankvoxDir,
stdin: "pipe",
stdout: "pipe",
stderr: "inherit",
env: spawnEnv,
onExit: (_proc, exitCode, signalCode) => {
this._handleExit(exitCode, signalCode);
},
});
}
} catch (err) {
this.log("voice_error", "clankvox_spawn_error", { error: String((err as Error)?.message || err) });
console.error("[clankvox] spawn error:", err);
this.emit("error", `spawn_error: ${String((err as Error)?.message || err)}`);
this._resolveExitWaiter?.();
throw err;
}
ClankvoxClient.liveClients.add(this);
this._startStdoutReader();
this._setupAdapterProxy();
const timeoutMs = opts.timeoutMs ?? 15_000;
await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
void this.destroy();
reject(new Error(`clankvox ready timeout after ${timeoutMs}ms`));
}, timeoutMs);
this.once("ready", () => {
clearTimeout(timer);
resolve();
});
this.once("crashed", ({ code, signal }) => {
clearTimeout(timer);
reject(
new Error(
`clankvox crashed before ready code=${code} signal=${signal}`
)
);
});
this._send({
type: "join",
guildId: this.guildId,
channelId: this.channelId,
selfDeaf: opts.selfDeaf ?? false,
selfMute: opts.selfMute ?? false
});
});
}
private _handleExit(exitCode: number | null, signalCode: number | null) {
if (!this.destroyed) {
this.log("voice_error", "clankvox_exited_unexpectedly", { exitCode, signalCode });
console.error(
[clankvox] exited unexpectedly code=${exitCode} signal=${signalCode}
);
this.emit("crashed", { code: exitCode, signal: signalCode });
}
ClankvoxClient.liveClients.delete(this);
this._cleanupAdapter();
this._clearQueuedTtsIngress();
this._setEstimatedBufferedTtsSamples(0, Date.now());
this.ttsBufferDepthSamples = 0;
this.lastTtsPlaybackState = "idle";
this.child = null;
this._resolveExitWaiter?.();
}
private _startStdoutReader() { const child = this.child; if (!child) return;
this.stdoutReaderController = new AbortController();
const signal = this.stdoutReaderController.signal;
const reader = child.stdout.getReader();
const read = async () => {
try {
while (!signal.aborted) {
const { done, value } = await reader.read();
if (done) break;
if (value) {
const buf = Buffer.from(value.buffer, value.byteOffset, value.byteLength);
this._processStdoutChunk(buf);
}
}
} catch {
// reader cancelled or stream closed — expected during destroy
} finally {
try { reader.releaseLock(); } catch { /* ignore */ }
}
};
// Fire-and-forget — errors are caught inside
void read();
}
private _processStdoutChunk(data: Buffer) { this.stdoutBuffer = Buffer.concat([this.stdoutBuffer, data]);
while (this.stdoutBuffer.length >= 5) {
const format = this.stdoutBuffer.readUInt8(0);
const length = this.stdoutBuffer.readUInt32LE(1);
if (this.stdoutBuffer.length >= 5 + length) {
const payload = this.stdoutBuffer.subarray(5, 5 + length);
this.stdoutBuffer = this.stdoutBuffer.subarray(5 + length);
if (format === 0) {
try {
const msg: unknown = JSON.parse(payload.toString("utf8"));
this._handleMessage(msg);
} catch {
// ignore non-json payload
}
} else if (format === 1) {
// Binary audio frame: [8-byte user_id][2-byte peak][4-byte active][4-byte total][PCM...]
if (payload.length >= 18) {
const audioUserId = payload.readBigUInt64LE(0).toString();
const signalPeakAbs = payload.readUInt16LE(8);
const signalActiveSampleCount = payload.readUInt32LE(10);
const signalSampleCount = payload.readUInt32LE(14);
const pcmBuffer = payload.subarray(18);
this.emit(
"userAudio",
audioUserId,
pcmBuffer,
signalPeakAbs,
signalActiveSampleCount,
signalSampleCount
);
}
}
} else {
break;
}
}
}
private adapterCallbackCount = { voiceState: 0, voiceServer: 0, op4Forward: 0 };
private _setupAdapterProxy() { const guild = this.guild; if (!guild?.voiceAdapterCreator) return;
const adapter = guild.voiceAdapterCreator({
onVoiceServerUpdate: (data) => {
this.adapterCallbackCount.voiceServer++;
this.log("voice_runtime", "clankvox_voice_server_update", {
endpoint: data?.endpoint ?? null,
hasToken: !!data?.token,
count: this.adapterCallbackCount.voiceServer
});
this._send({ type: "voice_server", data });
},
onVoiceStateUpdate: (data) => {
this.adapterCallbackCount.voiceState++;
this.lastVoiceSessionId = asString(data?.session_id)?.trim() || null;
this.lastVoiceStateUserId = asString(data?.user_id)?.trim() || null;
this.log("voice_runtime", "clankvox_voice_state_update", {
sessionId: data?.session_id ?? null,
channelId: data?.channel_id ?? null,
userId: data?.user_id ?? null,
count: this.adapterCallbackCount.voiceState
});
this._send({ type: "voice_state", data });
}
});
this.adapterCleanup = () => {
try { adapter?.destroy?.(); } catch { /* ignore */ }
};
}
private _cleanupAdapter() { if (this.adapterCleanup) { this.adapterCleanup(); this.adapterCleanup = null; } }
private _handleMessage(msg: unknown) { if (!isRecord(msg)) return;
const msgType = asString(msg.type);
if (!msgType) return;
switch (msgType) {
case "ready":
this.emit("ready");
break;
case "adapter_send":
if (isRecord(msg.payload)) {
this._forwardToGateway(msg.payload);
}
break;
case "connection_state": {
const status = asString(msg.status);
if (status) {
this.emit("connectionState", status);
}
break;
}
case "transport_state": {
const role = asTransportRole(msg.role);
const status = asString(msg.status);
if (role && status) {
this.emit("transportState", {
role,
status,
reason: asString(msg.reason)
} satisfies ClankvoxTransportState);
}
break;
}
case "player_state": {
const status = asString(msg.status);
if (status) {
this.emit("playerState", status);
}
break;
}
case "playback_armed": {
const reason = asString(msg.reason);
this.lastPlaybackArmedReason = reason?.trim() || null;
this.emit("playbackArmed", reason ?? undefined);
break;
}
case "tts_playback_state": {
const status = asString(msg.status)?.trim().toLowerCase() === "buffered" ? "buffered" : "idle";
this.lastTtsTelemetryAt = Date.now();
this.lastTtsPlaybackState = status;
if (status === "idle") {
// Trust the Rust-reported idle state: zero out the Rust-side depth
// and decay estimate so the output lock converges immediately.
// Do NOT zero queuedTtsOutputSamples — that tracks PCM still queued
// locally in the TS ingress pipeline (e.g. a follow-up utterance).
// Zeroing it would make the system think it's fully idle when the
// next utterance's PCM is already waiting to be sent to Rust.
this.ttsBufferDepthSamples = 0;
this._setEstimatedBufferedTtsSamples(0, Date.now());
}
this._scheduleTtsDrain(0);
this.emit("ttsPlaybackState", status);
break;
}
case "speaking_start": {
const userId = asString(msg.userId);
if (userId) {
this.emit("speakingStart", userId);
}
break;
}
case "speaking_end": {
const userId = asString(msg.userId);
if (userId) {
this.emit("speakingEnd", userId);
}
break;
}
// "user_audio" (JSON) is bypassed above in the binary fast path, but kept here for fallback or tests
case "user_audio": {
const userId = asString(msg.userId);
const pcmBase64 = asString(msg.pcmBase64);
const signalPeakAbs = asNumber(msg.signalPeakAbs);
const signalActiveSampleCount = asNumber(msg.signalActiveSampleCount);
const signalSampleCount = asNumber(msg.signalSampleCount);
if (
userId &&
pcmBase64 &&
signalPeakAbs !== null &&
signalActiveSampleCount !== null &&
signalSampleCount !== null
) {
this.emit(
"userAudio",
userId,
pcmBase64,
signalPeakAbs,
signalActiveSampleCount,
signalSampleCount
);
}
break;
}
case "user_audio_end": {
const userId = asString(msg.userId);
if (userId) {
this.emit("userAudioEnd", userId);
}
break;
}
case "user_video_state": {
const state = parseUserVideoState(msg);
if (state) {
this.emit("userVideoState", state);
}
break;
}
case "user_video_frame": {
const frame = parseUserVideoFrame(msg);
if (frame) {
this.emit("userVideoFrame", frame);
}
break;
}
case "decoded_video_frame": {
const frame = parseDecodedVideoFrame(msg);
if (frame) {
this.emit("decodedVideoFrame", frame);
}
break;
}
case "user_video_end": {
const userId = asString(msg.userId);
if (userId) {
this.emit("userVideoEnd", {
userId,
ssrc: asNumber(msg.ssrc)
} satisfies ClankvoxUserVideoEnd);
}
break;
}
case "client_disconnect": {
const userId = asString(msg.userId);
if (userId) {
this.emit("clientDisconnect", userId);
}
break;
}
case "music_idle":
this.emit("musicIdle");
break;
case "music_error": {
const message = asString(msg.message);
if (message !== null) {
this.emit("musicError", message);
}
break;
}
case "music_gain_reached": {
const gain = asNumber(msg.gain);
if (gain !== null) {
this.emit("musicGainReached", gain);
}
break;
}
case "buffer_depth": {
const ttsSamples = asNumber(msg.ttsSamples) ?? 0;
const musicSamples = asNumber(msg.musicSamples) ?? 0;
const now = Date.now();
this.lastTtsTelemetryAt = now;
this.ttsBufferDepthSamples = Math.max(0, ttsSamples);
this.lastTtsPlaybackState =
this.ttsBufferDepthSamples > 0 ? "buffered" : "idle";
this._setEstimatedBufferedTtsSamples(this.ttsBufferDepthSamples, now);
// Note: we do NOT zero queuedTtsOutputSamples here — that counter
// tracks PCM still queued locally in the TS ingress pipeline (e.g. a
// follow-up utterance). Zeroing it when utterance A's drain arrives
// would discard the count for utterance B's unsent PCM, causing a
// false idle transition between back-to-back utterances.
this._scheduleTtsDrain(0);
this.emit("bufferDepth", ttsSamples, musicSamples);
break;
}
case "error": {
const message = asString(msg.message);
const code = asClankvoxIpcErrorCode(msg.code);
if (message !== null) {
const ipcError: ClankvoxIpcError = { message, code };
this.emit("ipcError", ipcError);
this.emit("error", message, code ?? undefined);
}
break;
}
case "log": {
const level = String(msg.level || "info").trim();
const target = String(msg.target || "").trim();
const message = String(msg.message || "").trim();
const fields = msg.fields && typeof msg.fields === "object" ? msg.fields : {};
// Map Rust tracing levels to our log levels
const kind = level === "error" ? "voice_error"
: level === "warn" ? "voice_error"
: "voice_runtime";
// Use the message as the content key (clankvox events use descriptive names like clankvox_voice_ready)
const content = message || `clankvox_${target.split("::").pop() || "trace"}`;
this.log(kind, content, {
clankvoxLevel: level,
clankvoxTarget: target,
...(typeof fields === "object" && fields !== null ? fields as Record<string, unknown> : {})
});
break;
}
default:
this.log("voice_runtime", "clankvox_unknown_ipc_message", { messageType: msgType });
break;
}
}
getPlaybackArmedReason(): string | null { return this.lastPlaybackArmedReason; }
clearTtsPlaybackTelemetry(): void { this.lastTtsTelemetryAt = Date.now(); this.lastTtsPlaybackState = "idle"; this.ttsBufferDepthSamples = 0; this._clearQueuedTtsIngress(); this._setEstimatedBufferedTtsSamples(0, Date.now()); }
getTtsPlaybackState(): TtsPlaybackState { return this.getTtsBufferDepthSamples() > 0 ? "buffered" : this.lastTtsPlaybackState; }
getTtsBufferDepthSamples(): number { return Math.max( 0, Math.round( this._getEstimatedBufferedTtsSamples() + this.queuedTtsOutputSamples + this.getBatchedTtsOutputSamples() ) ); }
getTtsTelemetryUpdatedAt(): number { if (this.getTtsBufferDepthSamples() > 0) { return Math.max(0, Date.now()); } return Math.max(0, Number(this.lastTtsTelemetryAt || 0)); }
/** Returns the latest reported TTS buffer depth in seconds (48kHz sample rate). */ getTtsBufferDepthSeconds(): number { return this.getTtsBufferDepthSamples() / 48_000; }
private _forwardToGateway(payload: JsonRecord) { if (!payload || !this.guild) return; this.adapterCallbackCount.op4Forward++; const payloadData = isRecord(payload.d) ? payload.d : null; this.log("voice_runtime", "clankvox_gateway_op4_forwarded", { guildId: payloadData?.guild_id ?? null, channelId: payloadData?.channel_id ?? null, count: this.adapterCallbackCount.op4Forward }); try { const shard = this.guild.shard; if (shard && typeof shard.send === "function") { shard.send(payload); } } catch (err) { this.log("voice_error", "clankvox_gateway_op4_forward_failed", { error: String((err as Error)?.message || err) }); console.error( "[clankvox] failed to forward OP4 to gateway:", err ); } }
private _sendGatewayVoiceStateUpdate(channelId: string | null) { this._forwardToGateway({ op: 4, d: { guild_id: this.guildId, channel_id: channelId, self_mute: false, self_deaf: false } }); }
private _send(msg: ClankvoxCommand) { if (!this.child || this.destroyed || this.child.killed || this.child.exitCode !== null) return; try { this.child.stdin.write(JSON.stringify(msg) + " "); this.child.stdin.flush(); } catch { // EPIPE expected during shutdown — silently ignore } }
// --- Public API ---
private audioBatchPcm: Buffer[] = []; private audioBatchTimer: ReturnType | null = null; private currentSampleRate: number = 24000;
private getBatchedTtsOutputSamples(): number { if (this.audioBatchPcm.length <= 0) return 0; return this.audioBatchPcm.reduce((total, chunk) => { return total + estimateOutputSamplesFromPcmBytes(chunk.length, this.currentSampleRate); }, 0); }
private _getEstimatedBufferedTtsSamples(now = Date.now()): number { const normalizedEstimate = Math.max(0, Number(this.estimatedBufferedTtsSamples || 0)); if (normalizedEstimate <= 0) return 0; const elapsedMs = Math.max(0, now - Math.max(0, Number(this.estimatedBufferedTtsSamplesAt || 0))); return Math.max(0, Math.round(normalizedEstimate - (elapsedMs * 48))); }
private _setEstimatedBufferedTtsSamples(samples: number, now = Date.now()) { this.estimatedBufferedTtsSamples = Math.max(0, Math.round(Number(samples) || 0)); this.estimatedBufferedTtsSamplesAt = now; if (this.estimatedBufferedTtsSamples > 0 || this.queuedTtsOutputSamples > 0 || this.getBatchedTtsOutputSamples() > 0) { this.lastTtsTelemetryAt = now; this.lastTtsPlaybackState = "buffered"; } else if (this.ttsBufferDepthSamples <= 0) { this.lastTtsPlaybackState = "idle"; } }
private _clearQueuedTtsIngress() { if (this.audioBatchTimer) { clearTimeout(this.audioBatchTimer); this.audioBatchTimer = null; } if (this.ttsDrainTimer) { clearTimeout(this.ttsDrainTimer); this.ttsDrainTimer = null; } this.audioBatchPcm = []; this.queuedTtsIngress = []; this.queuedTtsOutputSamples = 0; }
private _scheduleTtsDrain(delayMs = TTS_INGRESS_RECHECK_MS) { const normalizedDelayMs = Math.max(0, Math.floor(Number(delayMs) || 0)); if (this.ttsDrainTimer) { if (normalizedDelayMs > 0) return; clearTimeout(this.ttsDrainTimer); this.ttsDrainTimer = null; } this.ttsDrainTimer = setTimeout(() => { this.ttsDrainTimer = null; this._drainQueuedTtsIngress(); }, normalizedDelayMs); }
private _enqueueTtsIngressChunk(pcm: Buffer, sampleRate: number) { const normalizedSampleRate = normalizeSampleRate(sampleRate); const normalizedByteLength = clampEvenPcmByteLength(pcm.length); if (normalizedByteLength <= 0) return; const chunk = normalizedByteLength === pcm.length ? pcm : pcm.subarray(0, normalizedByteLength); const outputSamples = estimateOutputSamplesFromPcmBytes(chunk.length, normalizedSampleRate); if (outputSamples <= 0) return; this.queuedTtsIngress.push({ pcm: chunk, sampleRate: normalizedSampleRate, offsetBytes: 0, remainingOutputSamples: outputSamples }); this.queuedTtsOutputSamples += outputSamples; this.lastTtsTelemetryAt = Date.now(); this.lastTtsPlaybackState = "buffered"; }
private _drainQueuedTtsIngress() { if (!this.isAlive) { this._clearQueuedTtsIngress(); return; }
let estimatedBufferedSamples = this._getEstimatedBufferedTtsSamples();
const targetSamples = TTS_INGRESS_TARGET_SAMPLES;
while (this.queuedTtsIngress.length > 0 && estimatedBufferedSamples < targetSamples) {
const head = this.queuedTtsIngress[0];
const remainingBytes = clampEvenPcmByteLength(head.pcm.length - head.offsetBytes);
if (remainingBytes <= 0 || head.remainingOutputSamples <= 0) {
this.queuedTtsIngress.shift();
continue;
}
const headroomSamples = Math.max(0, targetSamples - estimatedBufferedSamples);
const byteBudgetByDuration = estimatePcmBytesForDurationMs(TTS_INGRESS_CHUNK_MS, head.sampleRate);
const byteBudgetByHeadroom = estimatePcmBytesForOutputSamples(headroomSamples, head.sampleRate);
const chunkByteLength = clampEvenPcmByteLength(
Math.min(
remainingBytes,
byteBudgetByDuration > 0 ? byteBudgetByDuration : remainingBytes,
byteBudgetByHeadroom > 0 ? byteBudgetByHeadroom : remainingBytes
)
);
if (chunkByteLength <= 0) break;
const chunk = head.pcm.subarray(head.offsetBytes, head.offsetBytes + chunkByteLength);
const outputSamples = estimateOutputSamplesFromPcmBytes(chunk.length, head.sampleRate);
if (outputSamples <= 0) break;
this._send({
type: "audio",
pcmBase64: chunk.toString("base64"),
sampleRate: head.sampleRate
});
head.offsetBytes += chunkByteLength;
head.remainingOutputSamples = Math.max(0, head.remainingOutputSamples - outputSamples);
this.queuedTtsOutputSamples = Math.max(0, this.queuedTtsOutputSamples - outputSamples);
if (head.offsetBytes >= head.pcm.length || head.remainingOutputSamples <= 0) {
this.queuedTtsIngress.shift();
}
estimatedBufferedSamples += outputSamples;
this._setEstimatedBufferedTtsSamples(estimatedBufferedSamples, Date.now());
}
if (this.queuedTtsIngress.length > 0) {
this._scheduleTtsDrain(TTS_INGRESS_RECHECK_MS);
} else if (this._getEstimatedBufferedTtsSamples() <= 0 && this.ttsBufferDepthSamples <= 0) {
this.lastTtsPlaybackState = "idle";
}
}
private _flushAudioBatch() { this.audioBatchTimer = null; if (this.audioBatchPcm.length === 0) return;
// Buffer.concat can block the event loop if the array is huge
// But sending multiple IPC messages also blocks. Let's chunk the IPC messages
// to a maximum size if it gets too large, but 10ms of accumulation shouldn't be huge.
const batchedPcm = Buffer.concat(this.audioBatchPcm);
this.audioBatchPcm = [];
this._enqueueTtsIngressChunk(batchedPcm, this.currentSampleRate);
this._drainQueuedTtsIngress();
}
sendAudio(pcmBase64: string, sampleRate: number = 24000) { const normalizedSampleRate = normalizeSampleRate(sampleRate); if (this.audioBatchPcm.length > 0 && normalizedSampleRate !== this.currentSampleRate) { this._flushAudioBatch(); } this.currentSampleRate = normalizedSampleRate; try { const buf = Buffer.from(pcmBase64, "base64"); if (buf.length) this.audioBatchPcm.push(buf); } catch { return; }
this.lastTtsTelemetryAt = Date.now();
this.lastTtsPlaybackState = "buffered";
if (!this.audioBatchTimer) {
// Very fast flush to keep latency low, but batching sync event loop drops
this.audioBatchTimer = setTimeout(() => this._flushAudioBatch(), 5);
}
}
stopPlayback() { this.clearTtsPlaybackTelemetry(); this._send({ type: "stop_playback" }); }
stopTtsPlayback() { this.clearTtsPlaybackTelemetry(); this._send({ type: "stop_tts_playback" }); }
subscribeUser(userId: string, silenceDurationMs: number = 700, sampleRate: number = 24000) { this._send({ type: "subscribe_user", userId, silenceDurationMs, sampleRate }); }
unsubscribeUser(userId: string) { this._send({ type: "unsubscribe_user", userId }); }
subscribeUserVideo({ userId, maxFramesPerSecond = 2, preferredQuality = 100, preferredPixelCount = 640 * 360, preferredStreamType = "screen", jpegQuality }: { userId: string; maxFramesPerSecond?: number; preferredQuality?: number; preferredPixelCount?: number | null; preferredStreamType?: string | null; jpegQuality?: number | null; }) { this._send({ type: "subscribe_user_video", userId, maxFramesPerSecond: Math.max(1, Math.floor(Number(maxFramesPerSecond) || 2)), preferredQuality: Math.max(0, Math.min(100, Math.floor(Number(preferredQuality) || 100))), preferredPixelCount: preferredPixelCount === null || preferredPixelCount === undefined ? null : Math.max(1, Math.floor(Number(preferredPixelCount) || 0)), preferredStreamType: String(preferredStreamType || "").trim() || null, jpegQuality: jpegQuality === null || jpegQuality === undefined ? null : Math.max(10, Math.min(100, Math.floor(Number(jpegQuality) || 60))) }); }
unsubscribeUserVideo(userId: string) { this._send({ type: "unsubscribe_user_video", userId }); }
getLastVoiceSessionId() { return this.lastVoiceSessionId; }
getLastVoiceStateUserId() { return this.lastVoiceStateUserId; }
streamWatchConnect({ endpoint, token, serverId, sessionId, userId, daveChannelId }: { endpoint: string; token: string; serverId: string; sessionId: string; userId: string; daveChannelId: string; }) { this._send({ type: "stream_watch_connect", endpoint: String(endpoint || "").trim(), token: String(token || "").trim(), serverId: String(serverId || "").trim(), sessionId: String(sessionId || "").trim(), userId: String(userId || "").trim(), daveChannelId: String(daveChannelId || "").trim() }); }
streamWatchDisconnect(reason: string | null = null) { const normalizedReason = String(reason || "").trim(); this._send({ type: "stream_watch_disconnect", reason: normalizedReason || null }); }
streamPublishConnect({ endpoint, token, serverId, sessionId, userId, daveChannelId }: { endpoint: string; token: string; serverId: string; sessionId: string; userId: string; daveChannelId: string; }) { this._send({ type: "stream_publish_connect", endpoint: String(endpoint || "").trim(), token: String(token || "").trim(), serverId: String(serverId || "").trim(), sessionId: String(sessionId || "").trim(), userId: String(userId || "").trim(), daveChannelId: String(daveChannelId || "").trim() }); }
streamPublishDisconnect(reason: string | null = null) { const normalizedReason = String(reason || "").trim(); this._send({ type: "stream_publish_disconnect", reason: normalizedReason || null }); }
musicPlay( url: string, resolvedDirectUrl = false, visualizerMode: StreamWatchVisualizerMode | null = null ) { this._send({ type: "music_play", url, resolvedDirectUrl, visualizerMode: visualizerMode || undefined }); }
musicStop() { this._send({ type: "music_stop" }); }
musicPause() { this._send({ type: "music_pause" }); }
musicResume() { this._send({ type: "music_resume" }); }
musicSetGain(target: number, fadeMs: number) { this._send({ type: "music_set_gain", target, fadeMs }); }
streamPublishPlay(url: string, resolvedDirectUrl: boolean) { this._send({ type: "stream_publish_play", url: String(url || "").trim(), resolvedDirectUrl }); }
streamPublishPlayVisualizer( url: string, resolvedDirectUrl: boolean, visualizerMode: Exclude<StreamWatchVisualizerMode, "off"> ) { this._send({ type: "stream_publish_play_visualizer", url: String(url || "").trim(), resolvedDirectUrl, visualizerMode }); }
streamPublishBrowserStart(mimeType = "image/png") { this._send({ type: "stream_publish_browser_start", mimeType: String(mimeType || "").trim() || "image/png" }); }
streamPublishBrowserFrame({ mimeType = "image/png", frameBase64, capturedAtMs }: { mimeType?: string; frameBase64: string; capturedAtMs?: number; }) { this._send({ type: "stream_publish_browser_frame", mimeType: String(mimeType || "").trim() || "image/png", frameBase64: String(frameBase64 || "").trim(), capturedAtMs: Math.max(0, Math.round(Number(capturedAtMs) || 0)) }); }
streamPublishStop() { this._send({ type: "stream_publish_stop" }); }
streamPublishPause() { this._send({ type: "stream_publish_pause" }); }
streamPublishResume() { this._send({ type: "stream_publish_resume" }); }
async destroy(): Promise { if (this.destroyPromise) return this.destroyPromise; this.destroyed = true;
this._clearQueuedTtsIngress();
this.stdoutBuffer = Buffer.alloc(0);
this._cleanupAdapter();
// Abort the stdout reader loop
this.stdoutReaderController?.abort();
const child = this.child;
if (!child) {
ClankvoxClient.liveClients.delete(this);
return;
}
// Explicitly leave the voice channel through the main gateway before
// clankvox exits. Killing clankvox alone does not send OP4 with
// channel_id=null, so Discord can keep the bot shown in VC until the
// session times out.
this._sendGatewayVoiceStateUpdate(null);
this.destroyPromise = new Promise<void>((resolve) => {
let finished = false;
const finish = () => {
if (finished) return;
finished = true;
clearTimeout(termTimer);
clearTimeout(killTimer);
ClankvoxClient.liveClients.delete(this);
resolve();
};
// Wait for the onExit callback to fire (via _handleExit -> _resolveExitWaiter)
this._exitWaiterPromise?.then(finish);
this._send({ type: "destroy" });
try {
child.stdin.end();
} catch {
// ignore
}
const termTimer = setTimeout(() => {
this.killChild("SIGTERM");
}, 250);
const killTimer = setTimeout(() => {
this.killChild("SIGKILL");
}, 5_000);
});
return this.destroyPromise;
}
get isAlive(): boolean { if (this.destroyed || this.child === null) return false; if (this.child.exitCode !== null) return false; if (this.child.signalCode !== null) return false; return !this.child.killed; }
private killChild(signal: NodeJS.Signals): void { const child = this.child; if (!child) return; if (child.exitCode !== null || child.signalCode !== null || child.killed) return; try { child.kill(signal); } catch { // ignore } }
private static installProcessExitHandlers(): void { if (ClankvoxClient.processExitHandlersInstalled) return; ClankvoxClient.processExitHandlersInstalled = true;
const killLiveChildren = () => {
for (const client of ClankvoxClient.liveClients) {
client.killChild("SIGKILL");
}
ClankvoxClient.liveClients.clear();
};
process.once("exit", killLiveChildren);
} }
