src/voice_conn.rs

use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, Ordering}; use std::time::Duration;

use anyhow::{Context, Result, bail}; use futures_util::{SinkExt, StreamExt}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use tokio::net::UdpSocket; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio::time; use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::tungstenite::Message; use tracing::{debug, error, info, trace, warn};

use crate::dave::DaveManager; use crate::h264::{ H264Depacketizer, collect_annexb_nal_types, h264_annexb_has_idr_slice, rewrite_h264_annexb_start_codes, split_h264_annexb_nalus, }; use crate::media_sink_wants::build_media_sink_wants_payload; use crate::rtcp::build_protected_rtcp_packet; use crate::rtp::{ MAX_VIDEO_RTP_CHUNK_BYTES, OPUS_PT, RTP_HEADER_LEN, VIDEO_RTP_EXTENSION_HEADER, VIDEO_RTP_EXTENSION_PAYLOAD, VideoCodecKind, build_rtp_header, build_video_rtp_header, parse_rtp_header, strip_rtp_extension_payload, strip_rtp_padding, }; use crate::transport_crypto::TransportCrypto; use crate::video::{VideoResolution, VideoStreamDescriptor}; use crate::video_state::{ RemoteVideoStatePayload, RemoteVideoStreamPayload, RemoteVideoTrackBinding, apply_remote_video_state, build_video_state_announcement, convert_video_stream_descriptor, update_current_video_codec, }; use crate::vp8::Vp8Depacketizer;

type WsStream = tokio_tungstenite::WebSocketStream<MaybeTlsStreamtokio::net::TcpStream>;

#[derive(Debug, Deserialize)] struct VoiceOpcode { op: u64, d: T, }

#[derive(Debug, Deserialize)] struct HelloPayload { heartbeat_interval: Option, }

#[derive(Debug, Deserialize, Clone)] struct ReadyPayload { ssrc: u32, ip: String, port: u16, modes: Vec, #[serde(default)] experiments: Vec, #[serde(default)] video_ssrc: Option, #[serde(default)] streams: Vec, }

#[derive(Debug, Deserialize, Clone)] struct SessionDescriptionPayload { secret_key: Vec, #[serde(default)] dave_protocol_version: u16, #[serde(default)] video_codec: Option, #[serde(default)] audio_codec: Option, #[serde(default)] media_session_id: Option, }

#[derive(Debug, Deserialize)] struct SpeakingPayload { ssrc: u32, user_id: String, }

#[derive(Debug, Deserialize)] struct UserIdPayload { user_id: String, }

#[derive(Debug, Deserialize)] struct TransitionPayload { transition_id: u16, #[serde(default)] protocol_version: u16, }

#[derive(Debug, Deserialize)] struct EpochPayload { protocol_version: u16, epoch: u64, }

#[derive(Debug, Deserialize, Clone)] struct SessionUpdatePayload { #[serde(default)] video_codec: Option, #[serde(default)] audio_codec: Option, #[serde(default)] media_session_id: Option, #[serde(default)] keyframe_interval: Option, }

fn parse_voice_opcode(text: &str) -> Result<VoiceOpcode> where T: for<'de> Deserialize<'de>, { serde_json::from_str(text).context("invalid voice gateway payload") }

pub(crate) fn parse_user_id(user_id: &str, context: &str) -> Option { match user_id.parse::() { Ok(user_id) => Some(user_id), Err(error) => { warn!(user_id, context, error = %error, "ignoring voice gateway payload with invalid user id"); None } } }

// --------------------------------------------------------------------------- // Events emitted by the voice connection back to the main loop // ---------------------------------------------------------------------------

pub enum VoiceEvent { Ready { role: TransportRole, ssrc: u32, }, SsrcUpdate { role: TransportRole, ssrc: u32, user_id: u64, }, VideoStateUpdate { role: TransportRole, user_id: u64, audio_ssrc: Option, video_ssrc: Option, codec: Option, streams: Vec, }, ClientDisconnect { role: TransportRole, user_id: u64, }, OpusReceived { role: TransportRole, ssrc: u32, opus_frame: Vec, rtp_sequence: u16, }, VideoFrameReceived { role: TransportRole, user_id: u64, ssrc: u32, codec: String, keyframe: bool, frame: Vec, rtp_timestamp: u32, stream_type: Option, rid: Option, dave_decrypted: bool, }, DaveReady { role: TransportRole, }, Disconnected { role: TransportRole, reason: String, }, }

// --------------------------------------------------------------------------- // Internal commands for the WS write task // ---------------------------------------------------------------------------

enum WsCommand { SendJson(Value), SendBinary(Vec), }

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] #[serde(rename_all = "snake_case")] pub enum TransportRole { Voice, StreamWatch, StreamPublish, }

impl TransportRole { pub fn as_str(self) -> &'static str { match self { Self::Voice => "voice", Self::StreamWatch => "stream_watch", Self::StreamPublish => "stream_publish", } } }

#[derive(Default)] struct VideoDepacketizers { by_ssrc: HashMap<u32, VideoDepacketizerState>, }

impl VideoDepacketizers { fn push( &mut self, ssrc: u32, codec: VideoCodecKind, sequence: u16, timestamp: u32, marker: bool, payload: &[u8], ) -> Option<(Vec, bool)> { let state = self .by_ssrc .entry(ssrc) .or_insert_with(|| VideoDepacketizerState::new(codec)); if state.codec != codec { *state = VideoDepacketizerState::new(codec); } state.push(ssrc, sequence, timestamp, marker, payload) }

/// Prepend cached SPS+PPS from the depacketizer to a frame.
/// Called AFTER DAVE decrypt so the DAVE trailer's unencrypted ranges
/// reference the correct byte offsets in the original frame.
fn prepend_cached_h264_params(&self, ssrc: u32, frame: Vec<u8>) -> Vec<u8> {
    if let Some(state) = self.by_ssrc.get(&ssrc) {
        state.h264.prepend_cached_parameter_sets(frame)
    } else {
        frame
    }
}

}

struct VideoDepacketizerState { codec: VideoCodecKind, last_sequence: Option, h264: H264Depacketizer, vp8: Vp8Depacketizer, }

impl VideoDepacketizerState { fn new(codec: VideoCodecKind) -> Self { Self { codec, last_sequence: None, h264: H264Depacketizer::default(), vp8: Vp8Depacketizer::default(), } }

fn push(
    &mut self,
    ssrc: u32,
    sequence: u16,
    timestamp: u32,
    marker: bool,
    payload: &[u8],
) -> Option<(Vec<u8>, bool)> {
    if let Some(previous_sequence) = self.last_sequence {
        let expected_sequence = previous_sequence.wrapping_add(1);
        if expected_sequence != sequence {
            debug!(
                ssrc,
                codec = self.codec.as_str(),
                expected_sequence,
                sequence,
                timestamp,
                "UDP video sequence gap/reorder detected; dropping partial frame"
            );
            self.clear_partial_frame();
        }
    }
    self.last_sequence = Some(sequence);

    match self.codec {
        VideoCodecKind::H264 => self.h264.push(timestamp, marker, payload),
        VideoCodecKind::Vp8 => self.vp8.push(timestamp, marker, payload),
    }
}

fn clear_partial_frame(&mut self) {
    self.h264.reset();
    self.vp8.reset();
}

}

fn ready_video_stream_descriptors(ready: &ReadyPayload) -> Vec { ready .streams .clone() .into_iter() .filter_map(convert_video_stream_descriptor) .collect() }

fn default_publish_video_stream_descriptor(video_ssrc: u32) -> VideoStreamDescriptor { VideoStreamDescriptor { ssrc: video_ssrc, rtx_ssrc: None, rid: Some("100".to_string()), quality: Some(100), stream_type: Some("screen".to_string()), active: Some(true), max_bitrate: Some(2_500_000), max_framerate: Some(30), max_resolution: Some(VideoResolution { width: Some(1280), height: Some(720), resolution_type: Some("fixed".to_string()), }), } }

fn ready_publish_video_stream_descriptors(ready: &ReadyPayload) -> Vec { let streams = ready_video_stream_descriptors(ready); if !streams.is_empty() { return streams; } ready .video_ssrc .filter(|ssrc| *ssrc != 0) .map(default_publish_video_stream_descriptor) .into_iter() .collect() }

fn build_inactive_video_state_announcement(audio_ssrc: u32, ready: &ReadyPayload) -> Option { let streams = ready_video_stream_descriptors(ready); build_video_state_announcement(audio_ssrc, &streams, false) }

fn json_object_keys(value: &Value) -> Vec { value .as_object() .map(|object| object.keys().cloned().collect::<Vec<_>>()) .unwrap_or_default() }

fn build_select_protocol_payload( external_ip: &str, external_port: u16, mode: &str, experiments: &[String], role: TransportRole, ) -> Value { let video_codecs = match role { TransportRole::StreamPublish => vec![json!({ "name": VideoCodecKind::H264.as_str(), "type": "video", "priority": 900, "payload_type": VideoCodecKind::H264.payload_type(), "rtx_payload_type": VideoCodecKind::H264.rtx_payload_type(), "encode": true, "decode": false, })], TransportRole::Voice | TransportRole::StreamWatch => { [VideoCodecKind::H264, VideoCodecKind::Vp8] .into_iter() .enumerate() .map(|(idx, codec)| { json!({ "name": codec.as_str(), "type": "video", "priority": 900u32.saturating_sub(idx as u32 * 10), "payload_type": codec.payload_type(), "rtx_payload_type": codec.rtx_payload_type(), "encode": false, "decode": true, }) }) .collect::<Vec<_>>() } };

let mut codecs = vec![json!({
    "name": "opus",
    "type": "audio",
    "priority": 1000,
    "payload_type": OPUS_PT,
})];
codecs.extend(video_codecs);

json!({
    "op": 1,
    "d": {
        "protocol": "udp",
        "data": {
            "address": external_ip,
            "port": external_port,
            "mode": mode
        },
        "codecs": codecs,
        "experiments": experiments,
    }
})

}

pub struct VoiceConnectionParams<'a> { pub endpoint: &'a str, pub server_id: u64, pub user_id: u64, pub session_id: &'a str, pub token: &'a str, pub dave_channel_id: u64, pub role: TransportRole, }

// --------------------------------------------------------------------------- // UDP IP discovery (Discord voice hole-punch) // ---------------------------------------------------------------------------

async fn ip_discovery(socket: &UdpSocket, ssrc: u32) -> Result<(String, u16)> { let mut buf = [0u8; 74]; // Type=0x0001, Length=70 buf[0..2].copy_from_slice(&0x0001u16.to_be_bytes()); buf[2..4].copy_from_slice(&70u16.to_be_bytes()); buf[4..8].copy_from_slice(&ssrc.to_be_bytes());

socket.send(&buf).await.context("IP discovery send")?;

let mut resp = [0u8; 74];
let timeout = time::timeout(Duration::from_secs(5), socket.recv(&mut resp)).await;
let n = timeout
    .context("IP discovery timeout")?
    .context("IP discovery recv")?;
if n < 74 {
    bail!("IP discovery response too short: {n} bytes");
}

// Response: [type(2) | length(2) | ssrc(4) | address(64) | port(2)]
let ip_bytes = &resp[8..72];
let ip = std::str::from_utf8(ip_bytes)
    .context("IP discovery: invalid UTF-8")?
    .trim_end_matches('\0')
    .to_string();
let port = u16::from_be_bytes([resp[72], resp[73]]);

info!("IP discovery: external {ip}:{port}");
Ok((ip, port))

}

// --------------------------------------------------------------------------- // VoiceConnection — the public handle // ---------------------------------------------------------------------------

pub struct VoiceConnection { pub ssrc: u32, role: TransportRole, shutdown: Arc, udp_socket: Arc, crypto: Arc, rtp_sequence: AtomicU32, timestamp: AtomicU32, video_payload_type: u8, video_ssrc: Option, video_streams: Vec, video_sequence: AtomicU32, video_timestamp: AtomicU32, fir_sequence: AtomicU32, ws_cmd_tx: mpsc::Sender, ws_read_task: JoinHandle<()>, ws_write_task: JoinHandle<()>, udp_recv_task: JoinHandle<()>, }

impl VoiceConnection { /// Perform the full voice WS + UDP handshake, then spawn background tasks. #[allow(clippy::too_many_lines)] pub async fn connect( params: VoiceConnectionParams<'_>, event_tx: mpsc::Sender, dave: Arc<Mutex<Option>>, ) -> Result { let VoiceConnectionParams { endpoint, server_id, user_id, session_id, token, dave_channel_id, role, } = params;

    let ep = endpoint.trim_start_matches("wss://").trim_end_matches('/');
    let ws_url = format!("wss://{ep}/?v=9");
    info!("Connecting voice WS: {ws_url}");

    let (ws, _) = tokio_tungstenite::connect_async(&ws_url)
        .await
        .context("Voice WS connect failed")?;
    let (mut ws_write, mut ws_read) = ws.split();

    // ---- OP8 Hello ----
    let heartbeat_interval = recv_hello(&mut ws_read).await?;

    // ---- OP0 Identify (advertise DAVE v1 + v9 channel_id + video receive) ----
    let identify = json!({
        "op": 0,
        "d": {
            "server_id": server_id.to_string(),
            "user_id": user_id.to_string(),
            "session_id": session_id,
            "token": token,
            "channel_id": dave_channel_id.to_string(),
            "max_dave_protocol_version": 1,
            "video": true,
            "streams": [
                { "type": "screen", "rid": "100", "quality": 100 }
            ]
        }
    });
    ws_write
        .send(Message::Text(identify.to_string()))
        .await
        .context("Send Identify")?;

    // Handshake overflow buffer: messages that arrive during the handshake
    // but aren't the target opcode (e.g. DAVE OP21/OP25 or video state) get
    // buffered here and replayed into the ws_read_loop once background tasks
    // are spawned.
    let mut handshake_overflow: HandshakeOverflow = Vec::new();

    // ---- OP2 Ready ----
    let ready = recv_ready(&mut ws_read, &mut handshake_overflow).await?;
    let ready_stream_ssrcs = ready
        .streams
        .iter()
        .filter_map(|stream| stream.ssrc.filter(|ssrc| *ssrc != 0))
        .collect::<Vec<_>>();
    info!(
        ssrc = ready.ssrc,
        video_ssrc = ready.video_ssrc,
        ready_stream_count = ready_stream_ssrcs.len(),
        ready_stream_ssrcs = ?ready_stream_ssrcs,
        udp_ip = %ready.ip,
        udp_port = ready.port,
        modes = ?ready.modes,
        experiments = ?ready.experiments,
        "clankvox_voice_ready"
    );

    // ---- UDP socket + IP discovery ----
    let udp = UdpSocket::bind("0.0.0.0:0").await.context("UDP bind")?;
    let voice_addr: SocketAddr = format!("{}:{}", ready.ip, ready.port)
        .parse()
        .context("Parse voice UDP addr")?;
    udp.connect(voice_addr).await.context("UDP connect")?;

    let (external_ip, external_port) = ip_discovery(&udp, ready.ssrc).await?;

    // ---- Select encryption mode ----
    let mode = if ready.modes.iter().any(|m| m == "aead_aes256_gcm_rtpsize") {
        "aead_aes256_gcm_rtpsize"
    } else if ready
        .modes
        .iter()
        .any(|m| m == "aead_xchacha20_poly1305_rtpsize")
    {
        warn!("AES256-GCM RTP-size unavailable; using XChaCha20-Poly1305 RTP-size fallback");
        "aead_xchacha20_poly1305_rtpsize"
    } else {
        bail!(
            "No supported encryption mode (need aead_aes256_gcm_rtpsize or aead_xchacha20_poly1305_rtpsize), got: {:?}",
            ready.modes
        );
    };

    // ---- OP1 Select Protocol ----
    let select = build_select_protocol_payload(
        &external_ip,
        external_port,
        mode,
        &ready.experiments,
        role,
    );
    ws_write
        .send(Message::Text(select.to_string()))
        .await
        .context("Send Select Protocol")?;

    // ---- OP4 Session Description ----
    let session_description =
        recv_session_description(&mut ws_read, &mut handshake_overflow).await?;
    let crypto = Arc::new(TransportCrypto::new(&session_description.secret_key, mode)?);
    info!(
        "Voice session established, transport crypto ready, audio_codec={:?}, video_codec={:?}, media_session_id={:?}",
        session_description.audio_codec,
        session_description.video_codec,
        session_description.media_session_id
    );
    if role == TransportRole::StreamPublish
        && session_description
            .video_codec
            .as_deref()
            .is_some_and(|codec| !codec.eq_ignore_ascii_case("h264"))
    {
        bail!(
            "stream publish negotiated unsupported video codec {:?}",
            session_description.video_codec
        );
    }

    let current_video_codec = Arc::new(Mutex::new(None::<String>));
    update_current_video_codec(
        &current_video_codec,
        session_description.video_codec.clone(),
    );

    if session_description.dave_protocol_version > 0 {
        match DaveManager::new(
            session_description.dave_protocol_version,
            user_id,
            dave_channel_id,
        ) {
            Ok((dm, pkg)) => {
                *dave.lock() = Some(dm);
                info!(
                    "DaveManager initialized with protocol version {}",
                    session_description.dave_protocol_version
                );

                let mut op26_payload = vec![26u8];
                op26_payload.extend_from_slice(&pkg);
                ws_write
                    .send(Message::Binary(op26_payload))
                    .await
                    .context("Send DAVE KeyPackage OP26")?;
                info!("Sent DAVE OP26 KeyPackage to Discord ({} bytes)", pkg.len());
            }
            Err(e) => {
                error!("Failed to initialize DaveManager: {e}");
            }
        }
    }

    // ---- Spawn background tasks ----
    let shutdown = Arc::new(AtomicBool::new(false));
    let (ws_cmd_tx, ws_cmd_rx) = mpsc::channel::<WsCommand>(128);
    let udp = Arc::new(udp);
    let ssrc_map: Arc<Mutex<HashMap<u32, u64>>> = Arc::new(Mutex::new(HashMap::new()));
    let video_ssrc_map: Arc<Mutex<HashMap<u32, RemoteVideoTrackBinding>>> =
        Arc::new(Mutex::new(HashMap::new()));
    let ws_sequence = Arc::new(AtomicI32::new(-1));
    let disconnect_sent = Arc::new(AtomicBool::new(false));

    // WS read loop (handles Speaking updates, DAVE opcodes, video stream metadata, etc.)
    let ws_read_task = {
        let shutdown = shutdown.clone();
        let event_tx = event_tx.clone();
        let dave = dave.clone();
        let ws_cmd_tx = ws_cmd_tx.clone();
        let ssrc_map = ssrc_map.clone();
        let video_ssrc_map = video_ssrc_map.clone();
        let ws_sequence = ws_sequence.clone();
        let disconnect_sent = disconnect_sent.clone();
        let current_video_codec = current_video_codec.clone();
        if !handshake_overflow.is_empty() {
            info!(
                "Replaying {} buffered handshake messages into read loop",
                handshake_overflow.len()
            );
        }
        tokio::spawn(async move {
            for (i, msg) in handshake_overflow.into_iter().enumerate() {
                match msg {
                    Message::Text(ref text) => {
                        if let Ok(v) = serde_json::from_str::<Value>(text) {
                            let op = v["op"].as_u64().unwrap_or(u64::MAX);
                            info!("Replay [{i}]: Text OP={op}");
                            let d = &v["d"];
                            handle_text_opcode(
                                op,
                                d,
                                &event_tx,
                                &ws_cmd_tx,
                                &dave,
                                &ssrc_map,
                                &video_ssrc_map,
                                &current_video_codec,
                                user_id,
                                dave_channel_id,
                                role,
                                &ws_sequence,
                            )
                            .await;
                        } else {
                            info!("Replay [{i}]: Invalid Text");
                        }
                    }
                    Message::Binary(ref data) if data.len() >= 3 => {
                        let seq = u16::from_be_bytes([data[0], data[1]]);
                        let op = data[2];
                        info!(
                            "Replay [{}]: Binary OP={} seq={} len={}",
                            i,
                            op,
                            seq,
                            data.len()
                        );
                        handle_binary_opcode(
                            data,
                            &event_tx,
                            &ws_cmd_tx,
                            &dave,
                            role,
                            &ws_sequence,
                        )
                        .await;
                    }
                    Message::Binary(_) => {
                        info!("Replay [{i}]: Empty Binary");
                    }
                    _ => {
                        info!("Replay [{i}]: Other message type");
                    }
                }
            }
            ws_read_loop(
                ws_read,
                event_tx,
                ws_cmd_tx,
                dave,
                ssrc_map,
                video_ssrc_map,
                current_video_codec,
                shutdown,
                user_id,
                dave_channel_id,
                role,
                ws_sequence,
                disconnect_sent,
            )
            .await;
        })
    };

    // WS write loop (heartbeat + outgoing commands)
    let ws_write_task = {
        let shutdown = shutdown.clone();
        let ws_sequence = ws_sequence.clone();
        let event_tx = event_tx.clone();
        let disconnect_sent = disconnect_sent.clone();
        tokio::spawn(async move {
            ws_write_loop(
                ws_write,
                ws_cmd_rx,
                shutdown,
                heartbeat_interval,
                role,
                ws_sequence,
                event_tx,
                disconnect_sent,
            )
            .await;
        })
    };

    // UDP receive loop
    let udp_recv_task = {
        let shutdown = shutdown.clone();
        let event_tx = event_tx.clone();
        let crypto = crypto.clone();
        let dave = dave.clone();
        let udp = udp.clone();
        let ssrc_map = ssrc_map.clone();
        let video_ssrc_map = video_ssrc_map.clone();
        let ws_cmd_tx = ws_cmd_tx.clone();
        let disconnect_sent = disconnect_sent.clone();
        tokio::spawn(async move {
            udp_recv_loop(
                udp,
                crypto,
                dave,
                ssrc_map,
                video_ssrc_map,
                event_tx,
                ws_cmd_tx,
                shutdown,
                role,
                disconnect_sent,
            )
            .await;
        })
    };

    if role == TransportRole::Voice {
        // Set speaking state so Discord knows we may transmit audio.
        let _ = ws_cmd_tx
            .send(WsCommand::SendJson(json!({
                "op": 5,
                "d": { "speaking": 1, "delay": 0, "ssrc": ready.ssrc }
            })))
            .await;
    }

    // Announce video capability (OP12) so Discord sends us other users' video states.
    // We declare our streams as inactive (we only receive, not send video).
    if let Some(video_state_announcement) =
        build_inactive_video_state_announcement(ready.ssrc, &ready)
    {
        let announced_video_ssrc = video_state_announcement["d"]["video_ssrc"].as_u64();
        let announced_stream_ssrcs = video_state_announcement["d"]["streams"]
            .as_array()
            .into_iter()
            .flatten()
            .filter_map(|stream| stream["ssrc"].as_u64())
            .collect::<Vec<_>>();
        info!(
            audio_ssrc = ready.ssrc,
            announced_video_ssrc,
            announced_stream_count = announced_stream_ssrcs.len(),
            announced_stream_ssrcs = ?announced_stream_ssrcs,
            "clankvox_sending_inactive_video_state_announcement"
        );
        let _ = ws_cmd_tx
            .send(WsCommand::SendJson(video_state_announcement))
            .await;
    } else {
        info!("No usable stream metadata in OP2 Ready, skipping OP12 video state announcement");
    }

    let _ = event_tx
        .send(VoiceEvent::Ready {
            role,
            ssrc: ready.ssrc,
        })
        .await;

    Ok(VoiceConnection {
        ssrc: ready.ssrc,
        role,
        shutdown,
        udp_socket: udp,
        crypto,
        rtp_sequence: AtomicU32::new(0),
        timestamp: AtomicU32::new(0),
        video_payload_type: VideoCodecKind::H264.payload_type(),
        video_ssrc: ready.video_ssrc.filter(|ssrc| *ssrc != 0).or_else(|| {
            ready_publish_video_stream_descriptors(&ready)
                .first()
                .map(|stream| stream.ssrc)
        }),
        video_streams: match role {
            TransportRole::StreamPublish => ready_publish_video_stream_descriptors(&ready),
            TransportRole::Voice | TransportRole::StreamWatch => {
                ready_video_stream_descriptors(&ready)
            }
        },
        video_sequence: AtomicU32::new(0),
        video_timestamp: AtomicU32::new(0),
        fir_sequence: AtomicU32::new(0),
        ws_cmd_tx,
        ws_read_task,
        ws_write_task,
        udp_recv_task,
    })
}

/// Build an RTP packet, transport-encrypt, and send via UDP.
/// `opus_payload` should already be DAVE-encrypted if DAVE is active.
pub async fn send_rtp_frame(&self, opus_payload: &[u8]) -> Result<()> {
    let seq = self.rtp_sequence.fetch_add(1, Ordering::SeqCst) as u16;
    let ts = self.timestamp.fetch_add(960, Ordering::SeqCst); // 20ms @ 48kHz
    let header = build_rtp_header(seq, ts, self.ssrc);

    let encrypted = self.crypto.encrypt(&header, opus_payload)?;

    let mut packet = Vec::with_capacity(RTP_HEADER_LEN + encrypted.len());
    packet.extend_from_slice(&header);
    packet.extend_from_slice(&encrypted);

    self.udp_socket.send(&packet).await.context("UDP send")?;
    Ok(())
}

pub async fn send_h264_frame(
    &self,
    access_unit: &[u8],
    timestamp_increment: u32,
) -> Result<()> {
    let Some(video_ssrc) = self.video_ssrc else {
        bail!("stream publish video_ssrc unavailable");
    };

    let nalus = split_h264_annexb_nalus(access_unit);
    if nalus.is_empty() {
        bail!("stream publish frame did not contain Annex-B NAL units");
    }

    let timestamp = self
        .video_timestamp
        .fetch_add(timestamp_increment.max(1), Ordering::SeqCst);

    for (nal_index, nal) in nalus.iter().enumerate() {
        if nal.is_empty() {
            continue;
        }
        let is_last_nal = nal_index + 1 == nalus.len();
        let max_single_nal_payload =
            MAX_VIDEO_RTP_CHUNK_BYTES.saturating_sub(VIDEO_RTP_EXTENSION_PAYLOAD.len());
        if nal.len() <= max_single_nal_payload {
            let seq = self.video_sequence.fetch_add(1, Ordering::SeqCst) as u16;
            let header = build_video_rtp_header(
                self.video_payload_type,
                seq,
                timestamp,
                video_ssrc,
                is_last_nal,
            );
            let mut aad = Vec::with_capacity(RTP_HEADER_LEN + VIDEO_RTP_EXTENSION_HEADER.len());
            aad.extend_from_slice(&header);
            aad.extend_from_slice(&VIDEO_RTP_EXTENSION_HEADER);
            let mut payload = Vec::with_capacity(VIDEO_RTP_EXTENSION_PAYLOAD.len() + nal.len());
            payload.extend_from_slice(&VIDEO_RTP_EXTENSION_PAYLOAD);
            payload.extend_from_slice(nal);
            let encrypted = self.crypto.encrypt(&aad, &payload)?;
            let mut packet = Vec::with_capacity(
                RTP_HEADER_LEN + VIDEO_RTP_EXTENSION_HEADER.len() + encrypted.len(),
            );
            packet.extend_from_slice(&header);
            packet.extend_from_slice(&VIDEO_RTP_EXTENSION_HEADER);
            packet.extend_from_slice(&encrypted);
            self.udp_socket
                .send(&packet)
                .await
                .context("UDP send video packet")?;
            continue;
        }

        let nal_header = nal[0];
        let nal_type = nal_header & 0x1f;
        let fnri = nal_header & 0xe0;
        let fu_indicator = fnri | 28;
        let max_fu_payload = MAX_VIDEO_RTP_CHUNK_BYTES
            .saturating_sub(VIDEO_RTP_EXTENSION_PAYLOAD.len())
            .saturating_sub(2);
        for (chunk_index, chunk) in nal[1..].chunks(max_fu_payload).enumerate() {
            let is_first_chunk = chunk_index == 0;
            let chunk_start = chunk_index * max_fu_payload;
            let is_last_chunk = chunk_start + chunk.len() >= nal.len().saturating_sub(1);
            let marker = is_last_nal && is_last_chunk;
            let seq = self.video_sequence.fetch_add(1, Ordering::SeqCst) as u16;
            let header = build_video_rtp_header(
                self.video_payload_type,
                seq,
                timestamp,
                video_ssrc,
                marker,
            );
            let fu_header = (if is_first_chunk { 0x80 } else { 0x00 })
                | (if is_last_chunk { 0x40 } else { 0x00 })
                | nal_type;
            let mut aad = Vec::with_capacity(RTP_HEADER_LEN + VIDEO_RTP_EXTENSION_HEADER.len());
            aad.extend_from_slice(&header);
            aad.extend_from_slice(&VIDEO_RTP_EXTENSION_HEADER);
            let mut payload =
                Vec::with_capacity(VIDEO_RTP_EXTENSION_PAYLOAD.len() + 2 + chunk.len());
            payload.extend_from_slice(&VIDEO_RTP_EXTENSION_PAYLOAD);
            payload.extend_from_slice(&[fu_indicator, fu_header]);
            payload.extend_from_slice(chunk);
            let encrypted = self.crypto.encrypt(&aad, &payload)?;
            let mut packet = Vec::with_capacity(
                RTP_HEADER_LEN + VIDEO_RTP_EXTENSION_HEADER.len() + encrypted.len(),
            );
            packet.extend_from_slice(&header);
            packet.extend_from_slice(&VIDEO_RTP_EXTENSION_HEADER);
            packet.extend_from_slice(&encrypted);
            self.udp_socket
                .send(&packet)
                .await
                .context("UDP send video FU-A packet")?;
        }
    }

    Ok(())
}

pub fn set_stream_publish_speaking(&self, speaking: bool) -> Result<()> {
    if self.role != TransportRole::StreamPublish {
        return Ok(());
    }
    self.ws_cmd_tx
        .try_send(WsCommand::SendJson(json!({
            "op": 5,
            "d": {
                "speaking": if speaking { 2 } else { 0 },
                "delay": 0,
                "ssrc": self.ssrc,
            }
        })))
        .map_err(|error| {
            anyhow::anyhow!("failed to enqueue stream publish speaking update: {error}")
        })
}

pub fn set_stream_publish_video_active(&self, active: bool) -> Result<()> {
    if self.role != TransportRole::StreamPublish {
        return Ok(());
    }
    let Some(payload) = build_video_state_announcement(self.ssrc, &self.video_streams, active)
    else {
        return Ok(());
    };
    self.ws_cmd_tx
        .try_send(WsCommand::SendJson(payload))
        .map_err(|error| {
            anyhow::anyhow!("failed to enqueue stream publish video state update: {error}")
        })
}

pub fn update_media_sink_wants(
    &self,
    wants: &[(u32, u8)],
    pixel_counts: &[(u32, f64)],
) -> Result<()> {
    let payload = build_media_sink_wants_payload(wants, pixel_counts);
    self.ws_cmd_tx
        .try_send(WsCommand::SendJson(payload))
        .map_err(|error| anyhow::anyhow!("failed to enqueue media sink wants: {error}"))
}

fn send_protected_rtcp_packet(
    &self,
    fmt_or_count: u8,
    packet_type: u8,
    body: &[u8],
    packet_label: &'static str,
) -> Result<usize> {
    let packet = build_protected_rtcp_packet(&self.crypto, fmt_or_count, packet_type, body)
        .with_context(|| format!("RTCP {packet_label} transport encrypt"))?;
    self.udp_socket
        .try_send(&packet)
        .with_context(|| format!("RTCP {packet_label} send"))?;
    Ok(packet.len())
}

/// Send protected RTCP feedback packets containing:
///   1. RR (Receiver Report)
///   2. PLI (Picture Loss Indication, RFC 4585)
///   3. FIR (Full Intra Request, RFC 5104)
///
/// Under Discord's `rtpsize` modes, feedback rides the same transport
/// protection as media. Each RTCP packet is protected independently so its
/// header length still matches the on-wire packet bytes.
pub fn send_rtcp_pli(&self, media_ssrc: u32) -> Result<()> {
    let fir_seq = self.fir_sequence.fetch_add(1, Ordering::Relaxed) as u8;

    let rr_body = self.ssrc.to_be_bytes();

    let mut pli_body = [0u8; 8];
    pli_body[0..4].copy_from_slice(&self.ssrc.to_be_bytes());
    pli_body[4..8].copy_from_slice(&media_ssrc.to_be_bytes());

    let mut fir_body = [0u8; 16];
    fir_body[0..4].copy_from_slice(&self.ssrc.to_be_bytes());
    fir_body[4..8].copy_from_slice(&0u32.to_be_bytes()); // media source = 0 for FIR
    fir_body[8..12].copy_from_slice(&media_ssrc.to_be_bytes());
    fir_body[12] = fir_seq;

    let rr_packet_len = self.send_protected_rtcp_packet(0, 201, &rr_body, "rr")?;
    let pli_packet_len = self.send_protected_rtcp_packet(1, 206, &pli_body, "pli")?;
    let fir_packet_len = self.send_protected_rtcp_packet(4, 206, &fir_body, "fir")?;
    info!(
        sender_ssrc = self.ssrc,
        media_ssrc,
        fir_seq,
        rr_packet_len,
        pli_packet_len,
        fir_packet_len,
        "clankvox_rtcp_pli_sent"
    );
    Ok(())
}

pub fn shutdown(&self) {
    self.shutdown.store(true, Ordering::SeqCst);
    self.ws_read_task.abort();
    self.ws_write_task.abort();
    self.udp_recv_task.abort();
}

}

impl Drop for VoiceConnection { fn drop(&mut self) { self.shutdown(); } }

async fn send_disconnect_once( event_tx: &mpsc::Sender, disconnect_sent: &Arc, role: TransportRole, reason: impl Into, ) { if disconnect_sent .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_ok() { let _ = event_tx .send(VoiceEvent::Disconnected { role, reason: reason.into(), }) .await; } }

// --------------------------------------------------------------------------- // Handshake helpers (synchronous WS reads during connect) // ---------------------------------------------------------------------------

/// Messages received during the handshake that weren't the target opcode. /// These are buffered and replayed into the ws_read_loop so DAVE opcodes /// (OP21 text, OP25/27/29/30 binary) that arrive between Ready and Session /// Description aren't silently dropped. type HandshakeOverflow = Vec;

async fn recv_hello( ws: &mut (impl StreamExt<Item = Result<Message, tokio_tungstenite::tungstenite::Error>> + Unpin), ) -> Result { let deadline = time::Instant::now() + Duration::from_secs(10); loop { let msg = time::timeout_at(deadline, ws.next()) .await .context("Timeout waiting for OP8 Hello")? .context("WS stream ended")? .context("WS error")?; if let Message::Text(text) = msg { let message: VoiceOpcode = parse_voice_opcode(&text)?; if message.op == 8 { let payload: HelloPayload = serde_json::from_value(message.d).context("invalid hello payload")?; return Ok(payload.heartbeat_interval.unwrap_or(13_750.0)); } } } }

async fn recv_ready( ws: &mut (impl StreamExt<Item = Result<Message, tokio_tungstenite::tungstenite::Error>> + Unpin), overflow: &mut HandshakeOverflow, ) -> Result { let deadline = time::Instant::now() + Duration::from_secs(10); loop { let msg = time::timeout_at(deadline, ws.next()) .await .context("Timeout waiting for OP2 Ready")? .context("WS stream ended")? .context("WS error")?; match &msg { Message::Text(text) => { let message: VoiceOpcode = parse_voice_opcode(text)?; if message.op == 2 { let payload: ReadyPayload = serde_json::from_value(message.d).context("invalid ready payload")?; return Ok(payload); } debug!( "Handshake (waiting OP2): buffered text op={op}", op = message.op ); overflow.push(msg); } Message::Binary(data) => { debug!( "Handshake (waiting OP2): buffered binary opcode={} ({} bytes)", data.first().copied().unwrap_or(0), data.len() ); overflow.push(msg); } _ => {} } } }

async fn recv_session_description( ws: &mut (impl StreamExt<Item = Result<Message, tokio_tungstenite::tungstenite::Error>> + Unpin), overflow: &mut HandshakeOverflow, ) -> Result { let deadline = time::Instant::now() + Duration::from_secs(10); loop { let msg = time::timeout_at(deadline, ws.next()) .await .context("Timeout waiting for OP4 Session Description")? .context("WS stream ended")? .context("WS error")?; match &msg { Message::Text(text) => { let message: VoiceOpcode = parse_voice_opcode(text)?; if message.op == 4 { let payload: SessionDescriptionPayload = serde_json::from_value(message.d) .context("invalid session description payload")?; return Ok(payload); } debug!( "Handshake (waiting OP4): buffered text op={op}", op = message.op ); overflow.push(msg); } Message::Binary(data) => { debug!( "Handshake (waiting OP4): buffered binary opcode={} ({} bytes)", data.first().copied().unwrap_or(0), data.len() ); overflow.push(msg); } _ => {} } } }

// --------------------------------------------------------------------------- // Background tasks // ---------------------------------------------------------------------------

#[allow(clippy::too_many_arguments)] async fn ws_read_loop( mut ws_read: futures_util::stream::SplitStream, event_tx: mpsc::Sender, ws_cmd_tx: mpsc::Sender, dave: Arc<Mutex<Option>>, ssrc_map: Arc<Mutex<HashMap<u32, u64>>>, video_ssrc_map: Arc<Mutex<HashMap<u32, RemoteVideoTrackBinding>>>, current_video_codec: Arc<Mutex<Option>>, shutdown: Arc, bot_user_id: u64, channel_id: u64, role: TransportRole, ws_sequence: Arc, disconnect_sent: Arc, ) { while let Some(msg) = ws_read.next().await { if shutdown.load(Ordering::Relaxed) { break; } match msg { Ok(Message::Text(text)) => { let v: Value = match serde_json::from_str(&text) { Ok(v) => v, Err(_) => continue, };

            // Track WebSocket sequence numbers for OP3 Heartbeat
            if let Some(s) = v["seq"].as_i64() {
                ws_sequence.store(s as i32, Ordering::Relaxed);
            }

            let op = v["op"].as_u64().unwrap_or(u64::MAX);
            let d = &v["d"];
            handle_text_opcode(
                op,
                d,
                &event_tx,
                &ws_cmd_tx,
                &dave,
                &ssrc_map,
                &video_ssrc_map,
                &current_video_codec,
                bot_user_id,
                channel_id,
                role,
                &ws_sequence,
            )
            .await;
        }
        Ok(Message::Binary(data)) => {
            if data.is_empty() {
                continue;
            }
            handle_binary_opcode(&data, &event_tx, &ws_cmd_tx, &dave, role, &ws_sequence).await;
        }
        Ok(Message::Close(frame)) => {
            let reason = match frame {
                Some(cf) => format!(
                    "WebSocket closed by server: code={} reason={}",
                    cf.code, cf.reason
                ),
                None => "WebSocket closed by server (no close frame)".into(),
            };
            warn!("{reason}");
            send_disconnect_once(&event_tx, &disconnect_sent, role, reason).await;
            break;
        }
        Err(e) => {
            send_disconnect_once(
                &event_tx,
                &disconnect_sent,
                role,
                format!("WS read error: {e}"),
            )
            .await;
            break;
        }
        _ => {}
    }
}
info!("Voice WS read loop exited");

}

#[allow(clippy::too_many_arguments, clippy::too_many_lines)] async fn handle_text_opcode( op: u64, d: &Value, event_tx: &mpsc::Sender, ws_cmd_tx: &mpsc::Sender, dave: &Arc<Mutex<Option>>, ssrc_map: &Arc<Mutex<HashMap<u32, u64>>>, video_ssrc_map: &Arc<Mutex<HashMap<u32, RemoteVideoTrackBinding>>>, current_video_codec: &Arc<Mutex<Option>>, bot_user_id: u64, channel_id: u64, role: TransportRole, _ws_sequence: &Arc, ) { match op { // Heartbeat ACK 6 => { debug!("Voice heartbeat ACK"); } // Speaking state update (OP5) — SSRC map only, speaking detection is audio-driven 5 => { let payload: SpeakingPayload = match serde_json::from_value(d.clone()) { Ok(payload) => payload, Err(error) => { warn!(error = %error, "ignoring malformed speaking payload"); return; } }; let Some(uid) = parse_user_id(&payload.user_id, "speaking") else { return; };

        ssrc_map.lock().insert(payload.ssrc, uid);

        let _ = event_tx
            .send(VoiceEvent::SsrcUpdate {
                role,
                ssrc: payload.ssrc,
                user_id: uid,
            })
            .await;
    }
    // Video stream metadata (Discord may send this as OP12 or OP18 depending on path)
    12 | 18 => {
        let has_streams = d.get("streams").is_some();
        let has_video_ssrc = d.get("video_ssrc").is_some();
        let has_audio_ssrc = d.get("audio_ssrc").is_some();
        let has_user_id = d.get("user_id").is_some();
        let payload_keys = json_object_keys(d);

        if has_streams || has_video_ssrc {
            let payload: RemoteVideoStatePayload = match serde_json::from_value(d.clone()) {
                Ok(payload) => payload,
                Err(error) => {
                    warn!(
                        error = %error,
                        op,
                        has_streams,
                        has_video_ssrc,
                        has_audio_ssrc,
                        has_user_id,
                        payload_keys = ?payload_keys,
                        "ignoring malformed video state payload"
                    );
                    return;
                }
            };
            apply_remote_video_state(
                payload,
                event_tx,
                video_ssrc_map,
                current_video_codec,
                role,
            )
            .await;
            return;
        }

        if op == 18 {
            info!(
                has_streams,
                has_video_ssrc,
                has_audio_ssrc,
                has_user_id,
                payload_keys = ?payload_keys,
                "clankvox_voice_ws_unclassified_op18"
            );
            return;
        }

        // Client disconnect (OP13 in current Discord docs, but some servers historically used OP12)
        let payload: UserIdPayload = match serde_json::from_value(d.clone()) {
            Ok(payload) => payload,
            Err(error) => {
                warn!(
                    error = %error,
                    op,
                    has_streams,
                    has_video_ssrc,
                    has_audio_ssrc,
                    has_user_id,
                    payload_keys = ?payload_keys,
                    "ignoring malformed client disconnect payload"
                );
                return;
            }
        };
        let Some(uid) = parse_user_id(&payload.user_id, "client_disconnect") else {
            return;
        };
        ssrc_map.lock().retain(|_, v| *v != uid);
        video_ssrc_map
            .lock()
            .retain(|_, binding| binding.user_id != uid);
        let _ = event_tx
            .send(VoiceEvent::ClientDisconnect { role, user_id: uid })
            .await;
    }
    13 => {
        let payload: UserIdPayload = match serde_json::from_value(d.clone()) {
            Ok(payload) => payload,
            Err(error) => {
                warn!(error = %error, "ignoring malformed client disconnect payload");
                return;
            }
        };
        let Some(uid) = parse_user_id(&payload.user_id, "client_disconnect") else {
            return;
        };
        ssrc_map.lock().retain(|_, v| *v != uid);
        video_ssrc_map
            .lock()
            .retain(|_, binding| binding.user_id != uid);
        let _ = event_tx
            .send(VoiceEvent::ClientDisconnect { role, user_id: uid })
            .await;
    }
    // Session update / codec update
    14 => {
        let payload: SessionUpdatePayload = match serde_json::from_value(d.clone()) {
            Ok(payload) => payload,
            Err(error) => {
                warn!(error = %error, "ignoring malformed session update payload");
                return;
            }
        };
        if payload.video_codec.is_some() {
            update_current_video_codec(current_video_codec, payload.video_codec.clone());
        }
        debug!(
            audio_codec = ?payload.audio_codec,
            video_codec = ?payload.video_codec,
            media_session_id = ?payload.media_session_id,
            keyframe_interval = ?payload.keyframe_interval,
            "voice session update"
        );
    }
    // OP21: DavePrepareTransition — a transition is upcoming, respond with OP23
    21 => {
        let payload: TransitionPayload = match serde_json::from_value(d.clone()) {
            Ok(payload) => payload,
            Err(error) => {
                warn!(error = %error, "ignoring malformed DAVE OP21 payload");
                return;
            }
        };
        info!(
            "DAVE OP21: prepare transition id={} pv={}",
            payload.transition_id, payload.protocol_version
        );
        let send_ready = {
            let mut guard = dave.lock();
            if let Some(ref mut dm) = *guard {
                dm.prepare_transition(payload.transition_id, payload.protocol_version)
            } else {
                false
            }
        };
        if send_ready {
            send_transition_ready(ws_cmd_tx, payload.transition_id, "prepare").await;
        }
    }
    // OP22: DaveExecuteTransition — finalize the pending transition
    22 => {
        let payload: TransitionPayload = match serde_json::from_value(d.clone()) {
            Ok(payload) => payload,
            Err(error) => {
                warn!(error = %error, "ignoring malformed DAVE OP22 payload");
                return;
            }
        };
        info!(
            "DAVE OP22: execute transition received, transition_id={}",
            payload.transition_id
        );
        let transitioned = {
            let mut guard = dave.lock();
            if let Some(ref mut dm) = *guard {
                dm.execute_transition(payload.transition_id)
            } else {
                false
            }
        };
        if transitioned {
            let ready = {
                let guard = dave.lock();
                guard.as_ref().is_some_and(DaveManager::is_ready)
            };
            if ready {
                let _ = event_tx.send(VoiceEvent::DaveReady { role }).await;
            }
        }
    }
    // OP24: DavePrepareEpoch — a new DAVE epoch is upcoming
    24 => {
        let payload: EpochPayload = match serde_json::from_value(d.clone()) {
            Ok(payload) => payload,
            Err(error) => {
                warn!(error = %error, "ignoring malformed DAVE OP24 payload");
                return;
            }
        };
        info!(
            "DAVE OP24: prepare epoch pv={} epoch={}",
            payload.protocol_version, payload.epoch
        );

        if payload.protocol_version > 0 {
            let pkg_to_send = {
                let mut guard = dave.lock();
                if guard.is_none() {
                    match DaveManager::new(payload.protocol_version, bot_user_id, channel_id) {
                        Ok((dm, pkg)) => {
                            *guard = Some(dm);
                            Some(pkg)
                        }
                        Err(e) => {
                            error!("Failed to create DaveManager: {e}");
                            None
                        }
                    }
                } else {
                    if let Some(ref mut dm) = *guard {
                        match dm.reinit() {
                            Ok(recovery) => Some(recovery.key_package),
                            Err(e) => {
                                error!("Failed to reinit DaveManager for new epoch: {e}");
                                None
                            }
                        }
                    } else {
                        None
                    }
                }
            };

            if let Some(pkg) = pkg_to_send {
                let mut op26_payload = vec![26u8];
                op26_payload.extend_from_slice(&pkg);
                let _ = ws_cmd_tx.send(WsCommand::SendBinary(op26_payload)).await;
                info!(
                    "OP24: Sent DAVE OP26 KeyPackage to Discord ({} bytes)",
                    pkg.len()
                );
            }
        }
    }
    _ => {
        debug!("Unknown voice WS opcode: {op}");
    }
}

}

#[allow(clippy::too_many_lines)] async fn handle_binary_opcode( data: &[u8], event_tx: &mpsc::Sender, ws_cmd_tx: &mpsc::Sender, dave: &Arc<Mutex<Option>>, role: TransportRole, ws_sequence: &Arc, ) { // Incoming binary frames from Discord Voice WebSocket have the format: // [ seq (2 bytes, BE) | opcode (1 byte) | payload (N bytes) ] if data.len() < 3 { warn!("Received truncated binary frame (len {})", data.len()); return; }

let seq = u16::from_be_bytes([data[0], data[1]]);
ws_sequence.store(i32::from(seq), Ordering::Relaxed);
let opcode = data[2];
let payload = &data[3..];
info!("Handling binary opcode: {} (seq: {})", opcode, seq);

match opcode {
    // OP25: MLS External Sender Package (server → client)
    25 => {
        info!(
            "DAVE binary OP25: external sender ({} bytes)",
            payload.len()
        );
        let set_sender_ok = {
            let mut guard = dave.lock();
            if let Some(ref mut dm) = *guard {
                if let Err(e) = dm.set_external_sender(payload) {
                    error!("DAVE set_external_sender: {e}");
                    false
                } else {
                    true
                }
            } else {
                false
            }
        };

        // We already sent OP26 when the session/epoch was initialized.
        // Sending a second OP26 here can create an extra transition that drifts
        // decrypt state and yields NoValidCryptorFound on inbound audio.
        if set_sender_ok {
            debug!("DAVE: external sender accepted; skipping duplicate OP26");
        }
    }
    // OP27: MLS Proposals (server → client)
    27 => {
        if payload.is_empty() {
            warn!("DAVE binary OP27: truncated payload");
            return;
        }
        let optype = payload[0];
        let proposals_payload = &payload[1..];
        info!(
            "DAVE binary OP27: proposals (optype: {}, {} bytes)",
            optype,
            proposals_payload.len()
        );

        let operation = if optype == 0 {
            davey::ProposalsOperationType::APPEND
        } else {
            davey::ProposalsOperationType::REVOKE
        };

        let response = {
            let mut guard = dave.lock();
            if let Some(ref mut dm) = *guard {
                match dm.process_proposals(operation, proposals_payload, None) {
                    Ok(Some(cr)) => Some(cr.data),
                    Ok(None) => {
                        debug!("DAVE: no commit needed for proposals");
                        None
                    }
                    Err(e) => {
                        error!("DAVE process_proposals: {e}");
                        None
                    }
                }
            } else {
                None
            }
        };
        if let Some(commit_data) = response {
            let mut frame = Vec::with_capacity(1 + commit_data.len());
            frame.push(28); // OP28
            frame.extend_from_slice(&commit_data);
            let _ = ws_cmd_tx.send(WsCommand::SendBinary(frame)).await;
            debug!("DAVE: sent commit OP28 ({} bytes)", commit_data.len());
        }
    }
    // OP29: MLS Announce Commit Transition (server → client)
    29 => {
        if payload.len() < 2 {
            warn!("DAVE binary OP29: truncated payload");
            return;
        }
        let transition_id = u16::from_be_bytes([payload[0], payload[1]]);
        let commit_payload = &payload[2..];

        info!(
            "DAVE binary OP29: announce commit (transition_id: {}, {} bytes)",
            transition_id,
            commit_payload.len()
        );

        // Process commit under lock, collect any recovery action, then drop lock
        let (ready, success, recovery_action) =
            {
                let mut guard = dave.lock();
                if let Some(ref mut dm) = *guard {
                    match dm.process_commit(commit_payload) {
                        Ok(()) => {
                            dm.store_pending_transition(transition_id);
                            info!(
                                role = role.as_str(),
                                transition_id,
                                known_users = ?dm.known_user_ids(),
                                pv = dm.protocol_version(),
                                ready = dm.is_ready(),
                                "DAVE: commit processed, MLS group members"
                            );
                            (dm.is_ready(), true, None)
                        }
                        Err(e) => {
                            error!("DAVE process_commit: {e}");
                            let recovery = dm.reinit().map_err(|error| {
                            error!(error = %error, "DAVE reinit failed after commit error");
                            error
                        }).ok();
                            (false, false, recovery)
                        }
                    }
                } else {
                    (false, false, None)
                }
            };
        // Lock is dropped — safe to await

        if let Some(recovery) = recovery_action {
            send_recovery_action(ws_cmd_tx, recovery, "failed commit").await;
        }

        // Match discord.js behavior: for non-zero transitions, confirm readiness with OP23.
        if success && transition_id != 0 {
            send_transition_ready(ws_cmd_tx, transition_id, "commit").await;
        }

        if ready {
            let _ = event_tx.send(VoiceEvent::DaveReady { role }).await;
        }
    }
    // OP30: MLS Welcome (server → client)
    30 => {
        if payload.len() < 2 {
            warn!("DAVE binary OP30: truncated payload");
            return;
        }
        let transition_id = u16::from_be_bytes([payload[0], payload[1]]);
        let welcome_payload = &payload[2..];

        info!(
            "DAVE binary OP30: welcome (transition_id: {}, {} bytes)",
            transition_id,
            welcome_payload.len()
        );

        // Process welcome under lock, collect any recovery action, then drop lock
        let (ready, success, recovery_action) = {
            let mut guard = dave.lock();
            if let Some(ref mut dm) = *guard {
                match dm.process_welcome(welcome_payload) {
                    Ok(()) => {
                        dm.store_pending_transition(transition_id);
                        info!(
                            role = role.as_str(),
                            transition_id,
                            known_users = ?dm.known_user_ids(),
                            pv = dm.protocol_version(),
                            ready = dm.is_ready(),
                            "DAVE: welcome processed, MLS group members"
                        );
                        (dm.is_ready(), true, None)
                    }
                    Err(e) => {
                        if is_already_in_group_error(&e) {
                            // AlreadyInGroup is only benign when we already processed
                            // the corresponding OP29 for this transition id.
                            if dm.has_pending_transition_id(transition_id) {
                                debug!(
                                    "DAVE process_welcome: AlreadyInGroup for pending transition {} (expected as committer)",
                                    transition_id
                                );
                                dm.store_pending_transition(transition_id);
                                (dm.is_ready(), true, None)
                            } else {
                                warn!(
                                    "DAVE process_welcome: AlreadyInGroup for non-pending transition {}; ignoring stale welcome",
                                    transition_id
                                );
                                (dm.is_ready(), false, None)
                            }
                        } else {
                            error!("DAVE process_welcome failed: {e}");
                            let recovery = dm.reinit().map_err(|error| {
                                error!(error = %error, "DAVE reinit failed after welcome error");
                                error
                            }).ok();
                            (false, false, recovery)
                        }
                    }
                }
            } else {
                (false, false, None)
            }
        };
        // Lock is dropped — safe to await

        if let Some(recovery) = recovery_action {
            send_recovery_action(ws_cmd_tx, recovery, "failed welcome").await;
        }

        // Match discord.js behavior: for non-zero transitions, confirm readiness with OP23.
        if success && transition_id != 0 {
            send_transition_ready(ws_cmd_tx, transition_id, "welcome").await;
        }

        if ready {
            let _ = event_tx.send(VoiceEvent::DaveReady { role }).await;
        }
    }
    // OP31: MLS Invalid Commit Welcome
    31 => {
        warn!(
            "DAVE binary OP31: invalid commit welcome ({} bytes)",
            payload.len()
        );
    }
    _ => {
        debug!(
            "Unknown binary voice opcode: {} ({} bytes)",
            opcode,
            payload.len()
        );
    }
}

}

async fn send_transition_ready( ws_cmd_tx: &mpsc::Sender, transition_id: u16, reason: &str, ) { let _ = ws_cmd_tx .send(WsCommand::SendJson(json!({ "op": 23, "d": { "transition_id": transition_id } }))) .await; info!( "DAVE: sent OP23 transition ready for {} transition {}", reason, transition_id ); }

async fn send_recovery_action( ws_cmd_tx: &mpsc::Sender, recovery: crate::dave::RecoveryAction, reason: &str, ) { let mut op31 = vec![31u8]; op31.extend_from_slice(&recovery.transition_id.to_be_bytes()); let _ = ws_cmd_tx.send(WsCommand::SendBinary(op31)).await;

let mut op26 = vec![26u8];
op26.extend_from_slice(&recovery.key_package);
let _ = ws_cmd_tx.send(WsCommand::SendBinary(op26)).await;

warn!("DAVE: recovery from {}, sent OP31 + OP26", reason);

}

fn try_reinit_dave( dave: &Arc<Mutex<Option>>, reason: &str, ) -> Optioncrate::dave::RecoveryAction { let mut guard = dave.lock(); let dm = guard.as_mut()?;

match dm.reinit() {
    Ok(recovery) => Some(recovery),
    Err(error) => {
        error!(reason, error = %error, "DAVE reinit failed");
        None
    }
}

}

#[derive(Clone)] struct VideoFrameCandidate { frame: Vec, depacketizer_keyframe: bool, used_fallback_payload: bool, }

struct VideoFrameDecryptOutcome { frame: Option<Vec>, depacketizer_keyframe: bool, needs_recovery: bool, /// True only when DAVE successfully decrypted the frame (not passthrough). dave_decrypted: bool, }

fn ordered_audio_candidate_user_ids( current_user_id: Option, bot_user_id: u64, known_user_ids: &[u64], ) -> Vec { let mut ordered = Vec::new(); if let Some(current_user_id) = current_user_id { if current_user_id != bot_user_id { ordered.push(current_user_id); } }

for &candidate_user_id in known_user_ids {
    if candidate_user_id == bot_user_id
        || Some(candidate_user_id) == current_user_id
        || ordered.contains(&candidate_user_id)
    {
        continue;
    }
    ordered.push(candidate_user_id);
}

ordered

}

fn try_decrypt_audio_payload_for_user( dm: &mut DaveManager, user_id: u64, primary_payload: &[u8], fallback_payload: Option<&[u8]>, ssrc: u32, ) -> Option<(Vec, bool)> { let can_decrypt = dm.is_ready() && (dm.protocol_version() != 0 || dm.can_passthrough(user_id)); if !can_decrypt { return None; }

if let Ok(decrypted) = dm.decrypt(user_id, primary_payload) {
    return Some((decrypted, false));
}

if let Some(fallback_payload) = fallback_payload {
    if let Ok(decrypted) = dm.decrypt(user_id, fallback_payload) {
        debug!(
            user_id,
            ssrc, "UDP: DAVE audio decrypt recovered using alternate RTP ext handling"
        );
        return Some((decrypted, true));
    }
}

None

}

fn try_decrypt_video_candidate_for_user( dm: &mut DaveManager, user_id: u64, candidates: &[&VideoFrameCandidate], ssrc: u32, codec: VideoCodecKind, ) -> Option<(Vec, bool)> { for candidate in candidates { if let Ok(frame) = dm.decrypt_video(user_id, &candidate.frame) { if candidate.used_fallback_payload { debug!( user_id, ssrc, codec = codec.as_str(), "UDP: DAVE video decrypt recovered using alternate RTP ext handling" ); } return Some((frame, candidate.depacketizer_keyframe)); }

    if codec == VideoCodecKind::H264 {
        for (variant_name, variant_frame) in [
            (
                "first_long_rest_short",
                rewrite_h264_annexb_start_codes(&candidate.frame, 4, 3),
            ),
            (
                "all_short",
                rewrite_h264_annexb_start_codes(&candidate.frame, 3, 3),
            ),
        ] {
            let Some(variant_frame) = variant_frame else {
                continue;
            };
            if variant_frame == candidate.frame {
                continue;
            }
            if let Ok(frame) = dm.decrypt_video(user_id, &variant_frame) {
                info!(
                    user_id,
                    ssrc,
                    codec = codec.as_str(),
                    variant = variant_name,
                    "UDP: DAVE video decrypt recovered using alternate H264 start-code layout"
                );
                return Some((frame, candidate.depacketizer_keyframe));
            }
        }
    }
}

None

}

fn decrypt_video_frame_candidates( dave: &Arc<Mutex<Option>>, video_ssrc_map: &Arc<Mutex<HashMap<u32, RemoteVideoTrackBinding>>>, binding: &mut RemoteVideoTrackBinding, ssrc: u32, codec: VideoCodecKind, primary_candidate: Option, alternate_candidate: Option, ) -> VideoFrameDecryptOutcome { let mut ordered_candidates = Vec::new(); if let Some(primary_candidate) = primary_candidate.as_ref() { ordered_candidates.push(primary_candidate); } if let Some(alternate_candidate) = alternate_candidate.as_ref() { let duplicate_of_primary = primary_candidate .as_ref() .is_some_and(|primary| primary.frame == alternate_candidate.frame); if !duplicate_of_primary { ordered_candidates.push(alternate_candidate); } }

let fallback_candidate = primary_candidate.as_ref().or(alternate_candidate.as_ref());
let Some(pass_through_candidate) = fallback_candidate else {
    return VideoFrameDecryptOutcome {
        frame: None,
        depacketizer_keyframe: false,
        needs_recovery: false,
        dave_decrypted: false,
    };
};

let mut guard = dave.lock();
match &mut *guard {
    Some(dm) => {
        dm.maybe_auto_execute_downgrade();
        let current_user_id = binding.user_id;
        let can_decrypt = dm.is_ready()
            && (dm.protocol_version() != 0 || dm.can_passthrough(current_user_id));
        if !can_decrypt {
            return VideoFrameDecryptOutcome {
                frame: Some(pass_through_candidate.frame.clone()),
                depacketizer_keyframe: pass_through_candidate.depacketizer_keyframe,
                needs_recovery: false,
                dave_decrypted: false,
            };
        }

        if let Some((frame, depacketizer_keyframe)) = try_decrypt_video_candidate_for_user(
            dm,
            current_user_id,
            &ordered_candidates,
            ssrc,
            codec,
        ) {
            return VideoFrameDecryptOutcome {
                frame: Some(frame),
                depacketizer_keyframe,
                needs_recovery: false,
                dave_decrypted: true,
            };
        }

        for candidate_user_id in dm.known_user_ids() {
            if candidate_user_id == current_user_id || candidate_user_id == dm.user_id() {
                continue;
            }
            if let Some((frame, depacketizer_keyframe)) = try_decrypt_video_candidate_for_user(
                dm,
                candidate_user_id,
                &ordered_candidates,
                ssrc,
                codec,
            ) {
                if let Some(remapped_binding) = video_ssrc_map.lock().get_mut(&ssrc) {
                    remapped_binding.user_id = candidate_user_id;
                }
                debug!(
                    ssrc,
                    codec = codec.as_str(),
                    old_user_id = current_user_id,
                    new_user_id = candidate_user_id,
                    "UDP: remapped video ssrc after successful DAVE decrypt"
                );
                binding.user_id = candidate_user_id;
                return VideoFrameDecryptOutcome {
                    frame: Some(frame),
                    depacketizer_keyframe,
                    needs_recovery: false,
                    dave_decrypted: true,
                };
            }
        }

        let known_users = dm.known_user_ids();
        let candidate_count = ordered_candidates.len();
        let frame_bytes = ordered_candidates
            .first()
            .map(|c| c.frame.len())
            .unwrap_or(0);
        // Check if the frame has DAVE magic marker (last 2 bytes = 0xFA 0xFA)
        let has_dave_marker = ordered_candidates.first().is_some_and(|c| {
            c.frame.len() >= 2
                && c.frame[c.frame.len() - 2] == 0xFA
                && c.frame[c.frame.len() - 1] == 0xFA
        });

        // Extract DAVE trailer details from the first candidate for diagnostics
        let (trailer_supp_size, trailer_hex_tail, frame_hex_head) =
            if let Some(candidate) = ordered_candidates.first() {
                let f = &candidate.frame;
                let supp = if has_dave_marker && f.len() >= 3 {
                    Some(f[f.len() - 3] as usize)
                } else {
                    None
                };
                // Last 24 bytes (or less) in hex for trailer inspection
                let tail_start = f.len().saturating_sub(24);
                let tail_hex: String = f[tail_start..]
                    .iter()
                    .map(|b| format!("{b:02x}"))
                    .collect::<Vec<_>>()
                    .join(" ");
                // First 24 bytes for start-code / NAL header inspection
                let head_len = f.len().min(24);
                let head_hex: String = f[..head_len]
                    .iter()
                    .map(|b| format!("{b:02x}"))
                    .collect::<Vec<_>>()
                    .join(" ");
                (supp, tail_hex, head_hex)
            } else {
                (None, String::new(), String::new())
            };

        // Count internal DAVE markers in the frame body — if > 1 the
        // sender might be encrypting per-NAL or per-packet rather than
        // per-frame, and our depacketized assembly is wrong.
        let internal_marker_count = ordered_candidates
            .first()
            .map(|c| {
                let f = &c.frame;
                if f.len() < 4 {
                    return 0u32;
                }
                let mut count = 0u32;
                for i in 0..f.len() - 1 {
                    if f[i] == 0xFA && f[i + 1] == 0xFA {
                        count += 1;
                    }
                }
                count
            })
            .unwrap_or(0);

        debug!(
            user_id = current_user_id,
            ssrc,
            codec = codec.as_str(),
            frame_bytes,
            has_dave_marker,
            trailer_supp_size,
            internal_marker_count,
            trailer_hex_tail,
            frame_hex_head,
            candidate_count,
            known_users = ?known_users,
            pv = dm.protocol_version(),
            "UDP drop: DAVE video decrypt failed for all candidate users"
        );
        VideoFrameDecryptOutcome {
            frame: None,
            depacketizer_keyframe: false,
            needs_recovery: dm.track_decrypt_failure(),
            dave_decrypted: false,
        }
    }
    None => VideoFrameDecryptOutcome {
        frame: Some(pass_through_candidate.frame.clone()),
        depacketizer_keyframe: pass_through_candidate.depacketizer_keyframe,
        needs_recovery: false,
        dave_decrypted: false,
    },
}

}

fn is_already_in_group_error(error: &anyhow::Error) -> bool { let message = format!("{error:?}"); message.contains("AlreadyInGroup") || message.contains("already") }

async fn ws_write_loop( mut ws_write: futures_util::stream::SplitSink<WsStream, Message>, mut cmd_rx: mpsc::Receiver, shutdown: Arc, heartbeat_interval_ms: f64, role: TransportRole, ws_sequence: Arc, event_tx: mpsc::Sender, disconnect_sent: Arc, ) { let hb_dur = Duration::from_millis(heartbeat_interval_ms as u64); let mut hb_interval = time::interval(hb_dur); // Consume first immediate tick so we don't send a heartbeat instantly. // Discord expects the first heartbeat after heartbeat_interval * jitter. hb_interval.tick().await;

loop {
    tokio::select! {
        _ = hb_interval.tick() => {
            if shutdown.load(Ordering::Relaxed) { break; }
            let ts = std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap_or_default()
                .as_millis() as u64;

            // Read the latest sequence from shared state (-1 means no sequence yet).
            let seq = ws_sequence.load(Ordering::Relaxed);

            let hb = if seq >= 0 {
                json!({
                    "op": 3,
                    "d": {
                        "t": ts,
                        "seq_ack": seq
                    }
                })
            } else {
                json!({
                    "op": 3,
                    "d": {
                        "t": ts
                    }
                })
            };
            if let Err(error) = ws_write.send(Message::Text(hb.to_string())).await {
                send_disconnect_once(
                    &event_tx,
                    &disconnect_sent,
                    role,
                    format!("WS heartbeat send failed: {error}"),
                )
                .await;
                break;
            }
        }
        cmd = cmd_rx.recv() => {
            match cmd {
                Some(WsCommand::SendJson(v)) => {
                    if let Err(error) = ws_write.send(Message::Text(v.to_string())).await {
                        send_disconnect_once(
                            &event_tx,
                            &disconnect_sent,
                            role,
                            format!("WS command send failed: {error}"),
                        )
                        .await;
                        break;
                    }
                }
                Some(WsCommand::SendBinary(data)) => {
                    if let Err(error) = ws_write.send(Message::Binary(data)).await {
                        send_disconnect_once(
                            &event_tx,
                            &disconnect_sent,
                            role,
                            format!("WS binary send failed: {error}"),
                        )
                        .await;
                        break;
                    }
                }
                None => break,
            }
        }
    }
}
info!("Voice WS write loop exited");

}

#[allow(clippy::too_many_lines)] #[allow(clippy::too_many_arguments)] async fn udp_recv_loop( socket: Arc, crypto: Arc, dave: Arc<Mutex<Option>>, ssrc_map: Arc<Mutex<HashMap<u32, u64>>>, video_ssrc_map: Arc<Mutex<HashMap<u32, RemoteVideoTrackBinding>>>, event_tx: mpsc::Sender, ws_cmd_tx: mpsc::Sender, shutdown: Arc, role: TransportRole, disconnect_sent: Arc, ) { let mut buf = [0u8; 65_536]; let mut video_depacketizers = VideoDepacketizers::default(); let mut fallback_video_depacketizers = VideoDepacketizers::default(); let mut observed_transport_decrypt_failures = HashSet::::new(); let mut video_frame_emit_count: u64 = 0; let mut video_keyframe_count: u64 = 0; let mut dave_video_decrypt_ok: u64 = 0; let mut dave_video_decrypt_fail: u64 = 0; let mut dave_video_passthrough: u64 = 0;

// ── per-packet DAVE marker diagnostic ────────────────────────────
// Tracks whether individual RTP video payloads carry DAVE 0xFA 0xFA
// markers, which would indicate the sender encrypts per-packet
// rather than per-frame. If true, our depacketize-then-decrypt
// pipeline is fundamentally wrong and we need to decrypt per-packet
// before depacketization.
let mut per_packet_dave_marker_total: u64 = 0;
let mut per_packet_dave_marker_hits: u64 = 0;
let mut per_packet_dave_probe_logged: bool = false;
// Counter for byte-level frame dump logging (capped to avoid log flood)
let mut frame_byte_dump_ok_count: u64 = 0;
let mut frame_byte_dump_fail_count: u64 = 0;
const MAX_FRAME_BYTE_DUMPS: u64 = 10;

loop {
    if shutdown.load(Ordering::Relaxed) {
        break;
    }
    let n = match socket.recv(&mut buf).await {
        Ok(n) => n,
        Err(e) => {
            if shutdown.load(Ordering::Relaxed) {
                break;
            }
            send_disconnect_once(
                &event_tx,
                &disconnect_sent,
                role,
                format!("UDP recv error: {e}"),
            )
            .await;
            break;
        }
    };
    let packet = &buf[..n];

    let Some((sequence, timestamp, ssrc, header_size, marker)) = parse_rtp_header(packet)
    else {
        debug!("UDP drop: failed to parse RTP header");
        continue;
    };

    let payload_type = packet[1] & 0x7F;

    // RTCP packets (SR=200, RR=201, SDES=202, BYE=203, APP=204) share
    // the UDP socket via RTP/RTCP mux (RFC 5761).  Their payload-type
    // byte (masked to 7 bits) falls in 72-76.  We don't process inbound
    // RTCP — just skip before attempting RTP transport decryption, which
    // would fail because RTCP has a different AAD layout.
    if (72..=76).contains(&payload_type) {
        trace!(payload_type, ssrc, "UDP skip: inbound RTCP packet");
        continue;
    }

    if VideoCodecKind::is_rtx_payload_type(payload_type) {
        trace!(
            payload_type,
            ssrc, "UDP drop: RTX payload not yet supported"
        );
        continue;
    }

    let decrypted = match crypto.decrypt(packet, header_size) {
        Ok(p) => p,
        Err(e) => {
            if (payload_type == OPUS_PT
                || VideoCodecKind::from_payload_type(payload_type).is_some())
                && observed_transport_decrypt_failures.insert(payload_type)
            {
                info!(
                    role = role.as_str(),
                    payload_type,
                    header_size,
                    error = %e,
                    "clankvox_transport_decrypt_failed"
                );
            }
            debug!("UDP drop: Transport crypto decrypt failed: {e}");
            continue;
        }
    };

    // Strip RTP padding BEFORE extension stripping / depacketization.
    // Under `rtpsize` AEAD modes the padding is inside the encrypted
    // envelope.  If not stripped, padding bytes corrupt the H264 frame
    // body and cause DAVE AES-GCM tag verification failures (~60% of
    // video frames).
    let decrypted = strip_rtp_padding(packet, decrypted);

    let Some((primary_payload, fallback_payload)) =
        strip_rtp_extension_payload(packet, decrypted)
    else {
        debug!("UDP drop: RTP extension body exceeds decrypted payload");
        continue;
    };

    if payload_type == OPUS_PT {
        let user_id = ssrc_map.lock().get(&ssrc).copied();
        let fallback_payload = fallback_payload.as_deref();

        let (opus_frame_opt, remapped_user_id, needs_recovery) = {
            let mut guard = dave.lock();
            match &mut *guard {
                Some(dm) => {
                    dm.maybe_auto_execute_downgrade();

                    if !dm.is_ready() {
                        (Some(primary_payload.clone()), None, false)
                    } else {
                        let candidate_user_ids = ordered_audio_candidate_user_ids(
                            user_id,
                            dm.user_id(),
                            &dm.known_user_ids(),
                        );
                        let mut recovered: Option<(Vec<u8>, u64)> = None;

                        for candidate_uid in candidate_user_ids {
                            if let Some((decrypted, _used_fallback_payload)) =
                                try_decrypt_audio_payload_for_user(
                                    dm,
                                    candidate_uid,
                                    &primary_payload,
                                    fallback_payload,
                                    ssrc,
                                )
                            {
                                recovered = Some((decrypted, candidate_uid));
                                break;
                            }
                        }

                        if let Some((decrypted, candidate_uid)) = recovered {
                            if user_id != Some(candidate_uid) {
                                ssrc_map.lock().insert(ssrc, candidate_uid);
                                info!(
                                    ssrc,
                                    old_user_id = user_id,
                                    new_user_id = candidate_uid,
                                    "UDP: remapped audio ssrc after successful DAVE decrypt"
                                );
                            }
                            (Some(decrypted), Some(candidate_uid), false)
                        } else if let Some(uid) = user_id {
                            debug!("UDP drop: DAVE audio decrypt failed for {uid}");
                            let recovery = dm.track_decrypt_failure();
                            (None, None, recovery)
                        } else {
                            debug!(
                                ssrc,
                                candidate_user_count = dm.known_user_ids().len(),
                                "UDP drop: DAVE audio decrypt could not resolve user for unmapped ssrc"
                            );
                            let recovery = dm.track_decrypt_failure();
                            (None, None, recovery)
                        }
                    }
                }
                None => (Some(primary_payload.clone()), None, false),
            }
        };

        let Some(opus_frame) = opus_frame_opt else {
            if needs_recovery {
                let recovery = try_reinit_dave(&dave, "udp audio decrypt failures");
                if let Some(recovery) = recovery {
                    send_recovery_action(&ws_cmd_tx, recovery, "udp audio decrypt failures")
                        .await;
                    warn!(
                        "DAVE: recovery initiated from UDP recv after {} failures",
                        crate::dave::FAILURE_TOLERANCE
                    );
                }
            }
            continue;
        };

        if let Some(remapped_user_id) = remapped_user_id.filter(|uid| Some(*uid) != user_id) {
            let _ = event_tx
                .send(VoiceEvent::SsrcUpdate {
                    role,
                    ssrc,
                    user_id: remapped_user_id,
                })
                .await;
        }

        let _ = event_tx
            .send(VoiceEvent::OpusReceived {
                role,
                ssrc,
                opus_frame,
                rtp_sequence: sequence,
            })
            .await;
        continue;
    }

    let Some(codec) = VideoCodecKind::from_payload_type(payload_type) else {
        trace!(payload_type, ssrc, "UDP drop: unsupported RTP payload type");
        continue;
    };

    let Some(mut binding) = video_ssrc_map.lock().get(&ssrc).cloned() else {
        trace!(
            payload_type,
            ssrc, "UDP drop: video packet from unknown ssrc"
        );
        continue;
    };

    // ── per-packet DAVE marker probe ─────────────────────────────
    // Check if this individual RTP payload has a DAVE trailer. If the
    // sender encrypts per-packet (not per-frame), each payload ends
    // with [tag(8)][nonce(leb128)][ranges…][supp_size(1)][0xFA 0xFA].
    // This probe runs on the first 500 video packets to diagnose
    // whether our depacketize-then-decrypt approach is wrong.
    let has_per_pkt_marker = primary_payload.len() >= 11
        && primary_payload[primary_payload.len() - 2] == 0xFA
        && primary_payload[primary_payload.len() - 1] == 0xFA;
    if per_packet_dave_marker_total < 500 {
        per_packet_dave_marker_total += 1;
        if has_per_pkt_marker {
            per_packet_dave_marker_hits += 1;
        }
        if per_packet_dave_marker_total == 500 && !per_packet_dave_probe_logged {
            per_packet_dave_probe_logged = true;
            let pct = per_packet_dave_marker_hits * 100 / per_packet_dave_marker_total;
            info!(
                per_packet_dave_marker_hits,
                per_packet_dave_marker_total,
                pct,
                "clankvox_per_packet_dave_marker_probe"
            );
        }
    }

    // ── per-packet DAVE decrypt path ─────────────────────────────
    // If the RTP payload carries a DAVE marker AND is a complete
    // single-NAL or STAP-A packet (not an FU-A fragment), the sender
    // may have encrypted at the per-packet level. Try DAVE decrypt on
    // the raw payload before depacketization.
    //
    // We skip FU-A (nal_type 28) because FU-A end fragments happen to
    // end with the DAVE trailer (0xFA 0xFA) from the encrypted frame,
    // but they are just fragments — not independently encrypted.
    let is_fu_a = !primary_payload.is_empty() && (primary_payload[0] & 0x1F) == 28;
    let should_try_per_pkt = has_per_pkt_marker && marker && !is_fu_a;
    let mut per_pkt_dave_ok = false;
    let per_pkt_decrypted_payload;
    let depacketize_payload: &[u8] = if should_try_per_pkt {
        // Try per-packet DAVE decrypt
        let decrypted = {
            let mut guard = dave.lock();
            match &mut *guard {
                Some(dm) if dm.is_ready() && dm.protocol_version() != 0 => {
                    dm.decrypt_video(binding.user_id, &primary_payload).ok()
                }
                _ => None,
            }
        };
        if let Some(d) = decrypted {
            per_pkt_dave_ok = true;
            per_pkt_decrypted_payload = d;
            &per_pkt_decrypted_payload
        } else {
            // Per-packet decrypt failed; fall through to normal path
            &primary_payload
        }
    } else {
        &primary_payload
    };

    let primary_candidate = video_depacketizers
        .push(ssrc, codec, sequence, timestamp, marker, depacketize_payload)
        .map(|(frame, depacketizer_keyframe)| VideoFrameCandidate {
            frame,
            depacketizer_keyframe,
            used_fallback_payload: false,
        });
    let alternate_payload = fallback_payload.as_deref().unwrap_or(&primary_payload);
    let alternate_candidate = fallback_video_depacketizers
        .push(ssrc, codec, sequence, timestamp, marker, alternate_payload)
        .map(|(frame, depacketizer_keyframe)| VideoFrameCandidate {
            frame,
            depacketizer_keyframe,
            used_fallback_payload: fallback_payload.is_some(),
        });

    // Skip DAVE decrypt + frame emit entirely when neither depacketizer
    // produced a complete frame — most RTP packets are mid-frame FU-A
    // fragments and calling decrypt_video_frame_candidates on them just
    // burns a mutex lock for a guaranteed None result.
    if primary_candidate.is_none() && alternate_candidate.is_none() {
        continue;
    }

    // Capture frame bytes for diagnostic dumps before candidates are consumed
    let diag_frame_head = primary_candidate.as_ref().map(|c| {
        let len = c.frame.len().min(32);
        c.frame[..len]
            .iter()
            .map(|b| format!("{b:02x}"))
            .collect::<Vec<_>>()
            .join(" ")
    });
    let diag_frame_tail = primary_candidate.as_ref().map(|c| {
        let start = c.frame.len().saturating_sub(32);
        c.frame[start..]
            .iter()
            .map(|b| format!("{b:02x}"))
            .collect::<Vec<_>>()
            .join(" ")
    });
    let diag_frame_bytes = primary_candidate.as_ref().map(|c| c.frame.len());

    // If per-packet DAVE decrypt already succeeded for every packet in
    // this frame, the assembled frame is already plain codec data and
    // must NOT go through frame-level DAVE decrypt (it has no trailer).
    // Check the assembled frame: if it doesn't have a DAVE marker and
    // per-packet decrypt was active, emit it directly.
    let assembled_has_dave_marker = primary_candidate.as_ref().is_some_and(|c| {
        c.frame.len() >= 2
            && c.frame[c.frame.len() - 2] == 0xFA
            && c.frame[c.frame.len() - 1] == 0xFA
    });
    let (video_frame_opt, depacketizer_keyframe, needs_recovery, dave_decrypted) =
        if per_pkt_dave_ok && !assembled_has_dave_marker {
            // Per-packet decrypt already handled — bypass frame-level DAVE
            let candidate = primary_candidate.unwrap_or_else(|| {
                alternate_candidate.expect("at least one candidate exists")
            });
            (
                Some(candidate.frame),
                candidate.depacketizer_keyframe,
                false,
                true,
            )
        } else {
            // Standard path: frame-level DAVE decrypt
            let VideoFrameDecryptOutcome {
                frame,
                depacketizer_keyframe,
                needs_recovery,
                dave_decrypted,
            } = decrypt_video_frame_candidates(
                &dave,
                &video_ssrc_map,
                &mut binding,
                ssrc,
                codec,
                primary_candidate,
                alternate_candidate,
            );
            (frame, depacketizer_keyframe, needs_recovery, dave_decrypted)
        };

    // Track DAVE video decrypt stats + byte-level dumps
    if video_frame_opt.is_some() {
        if dave_decrypted {
            dave_video_decrypt_ok += 1;
            // Log first N successful frame byte dumps for comparison with failures
            if frame_byte_dump_ok_count < MAX_FRAME_BYTE_DUMPS {
                frame_byte_dump_ok_count += 1;
                if let (Some(head), Some(tail), Some(fbytes)) =
                    (&diag_frame_head, &diag_frame_tail, diag_frame_bytes)
                {
                    info!(
                        ssrc,
                        codec = codec.as_str(),
                        frame_bytes = fbytes,
                        ok_head = head.as_str(),
                        ok_tail = tail.as_str(),
                        "clankvox_dave_video_decrypt_ok_frame_bytes"
                    );
                }
            }
        } else {
            dave_video_passthrough += 1;
        }
    } else {
        dave_video_decrypt_fail += 1;
        // Log first N failed frame byte dumps
        if frame_byte_dump_fail_count < MAX_FRAME_BYTE_DUMPS {
            frame_byte_dump_fail_count += 1;
            if let (Some(head), Some(tail), Some(fbytes)) =
                (&diag_frame_head, &diag_frame_tail, diag_frame_bytes)
            {
                info!(
                    ssrc,
                    codec = codec.as_str(),
                    frame_bytes = fbytes,
                    fail_head = head.as_str(),
                    fail_tail = tail.as_str(),
                    "clankvox_dave_video_decrypt_fail_frame_bytes"
                );
            }
        }
    }
    let dave_total = dave_video_decrypt_ok + dave_video_decrypt_fail + dave_video_passthrough;
    if dave_total > 0 && (dave_total <= 5 || dave_total % 100 == 0) {
        let success_pct = if dave_total > 0 {
            dave_video_decrypt_ok * 100 / dave_total
        } else {
            0
        };
        info!(
            dave_video_decrypt_ok,
            dave_video_decrypt_fail,
            dave_video_passthrough,
            dave_total,
            success_pct,
            role = role.as_str(),
            "clankvox_dave_video_decrypt_stats"
        );
        // Also dump the davey-internal per-user decrypt stats
        if let Some(dm) = dave.lock().as_ref() {
            dm.log_decrypt_stats();
        }
    }

    let Some(frame) = video_frame_opt else {
        if needs_recovery {
            let recovery = try_reinit_dave(&dave, "udp video decrypt failures");
            if let Some(recovery) = recovery {
                send_recovery_action(&ws_cmd_tx, recovery, "udp video decrypt failures").await;
            }
        }
        continue;
    };

    // Prepend cached SPS+PPS AFTER DAVE decrypt so the DAVE trailer's
    // unencrypted ranges reference correct offsets in the original frame.
    let frame = if codec == VideoCodecKind::H264 {
        video_depacketizers.prepend_cached_h264_params(ssrc, frame)
    } else {
        frame
    };

    let keyframe = match codec {
        VideoCodecKind::H264 => {
            // Only IDR (NAL type 5) counts as a keyframe for rate-
            // limiting.  SPS+PPS are prepended to every frame after DAVE
            // decrypt so ffmpeg can always decode, but that prepend must
            // NOT cause every frame to bypass the fps gate.
            depacketizer_keyframe || h264_annexb_has_idr_slice(&frame)
        }
        VideoCodecKind::Vp8 => {
            depacketizer_keyframe || frame.first().is_some_and(|byte| byte & 0x01 == 0)
        }
    };

    video_frame_emit_count += 1;
    if keyframe {
        video_keyframe_count += 1;
    }
    // Log NAL types for the first 5 H264 frames and periodically after that
    if codec == VideoCodecKind::H264
        && (video_frame_emit_count <= 5 || video_frame_emit_count % 100 == 0)
    {
        let nal_types = collect_annexb_nal_types(&frame);
        info!(
            ssrc,
            frame_bytes = frame.len(),
            keyframe,
            depacketizer_keyframe,
            video_frame_emit_count,
            video_keyframe_count,
            nal_types = ?nal_types,
            "clankvox_h264_frame_nal_diagnostic"
        );
    }

    let _ = event_tx
        .send(VoiceEvent::VideoFrameReceived {
            role,
            user_id: binding.user_id,
            ssrc,
            codec: codec.as_str().to_string(),
            keyframe,
            frame,
            rtp_timestamp: timestamp,
            stream_type: binding.descriptor.stream_type.clone(),
            dave_decrypted,
            rid: binding.descriptor.rid.clone(),
        })
        .await;
}
info!("UDP recv loop exited");

}

#[cfg(test)] mod tests { use std::collections::HashMap; use std::sync::Arc;

use futures_util::stream;
use parking_lot::Mutex;

use super::{
    HelloPayload, SessionDescriptionPayload, VideoFrameCandidate, VoiceOpcode,
    decrypt_video_frame_candidates, ordered_audio_candidate_user_ids, parse_user_id,
    parse_voice_opcode, recv_ready, recv_session_description,
};
use crate::rtp::VideoCodecKind;
use crate::video::VideoStreamDescriptor;
use crate::video_state::RemoteVideoTrackBinding;
use tokio_tungstenite::tungstenite::Message;

#[test]
fn ordered_audio_candidate_user_ids_tries_known_users_when_ssrc_map_is_missing() {
    let ordered = ordered_audio_candidate_user_ids(None, 999, &[999, 42, 43]);
    assert_eq!(ordered, vec![42, 43]);
}

#[test]
fn ordered_audio_candidate_user_ids_prefers_current_mapping_before_other_known_users() {
    let ordered = ordered_audio_candidate_user_ids(Some(42), 999, &[999, 42, 43]);
    assert_eq!(ordered, vec![42, 43]);
}

#[test]
fn parse_voice_opcode_rejects_invalid_secret_key_bytes() {
    let text = r#"{"op":4,"d":{"secret_key":[1,999],"dave_protocol_version":1}}"#;

    let parsed = parse_voice_opcode::<SessionDescriptionPayload>(text);
    assert!(parsed.is_err());
}

#[test]
fn parse_voice_opcode_reads_hello_payload() {
    let text = r#"{"op":8,"d":{"heartbeat_interval":2500.0}}"#;

    let parsed: VoiceOpcode<HelloPayload> = parse_voice_opcode(text).expect("hello payload");
    assert_eq!(parsed.op, 8);
    assert_eq!(parsed.d.heartbeat_interval, Some(2500.0));
}

#[test]
fn parse_user_id_rejects_non_numeric_values() {
    assert_eq!(parse_user_id("42", "test"), Some(42));
    assert_eq!(parse_user_id("bad", "test"), None);
}

#[tokio::test]
async fn recv_ready_buffers_non_target_text_frames() {
    let mut ws = stream::iter(vec![
        Ok(Message::Text(r#"{"op":6,"d":{}}"#.into())),
        Ok(Message::Text(
            r#"{"op":2,"d":{"ssrc":9689,"ip":"104.29.137.71","port":19296,"modes":["aead_aes256_gcm_rtpsize"]}}"#
                .into(),
        )),
    ]);
    let mut overflow = Vec::new();

    let ready = recv_ready(&mut ws, &mut overflow)
        .await
        .expect("ready payload");

    assert_eq!(ready.ssrc, 9689);
    assert_eq!(ready.ip, "104.29.137.71");
    assert_eq!(ready.port, 19296);
    assert_eq!(ready.modes, vec!["aead_aes256_gcm_rtpsize"]);
    assert_eq!(overflow.len(), 1);
}

#[tokio::test]
async fn recv_session_description_buffers_non_target_text_frames() {
    let mut ws = stream::iter(vec![
        Ok(Message::Text(r#"{"op":18,"d":{"streams":[]}}"#.into())),
        Ok(Message::Text(
            r#"{"op":4,"d":{"secret_key":[1,2,3,4],"dave_protocol_version":1}}"#.into(),
        )),
    ]);
    let mut overflow = Vec::new();

    let session_description = recv_session_description(&mut ws, &mut overflow)
        .await
        .expect("session description payload");

    assert_eq!(session_description.secret_key, vec![1, 2, 3, 4]);
    assert_eq!(session_description.dave_protocol_version, 1);
    assert_eq!(overflow.len(), 1);
}

#[test]
fn decrypt_video_frame_candidates_prefers_primary_candidate_without_dave() {
    let descriptor = VideoStreamDescriptor {
        ssrc: 4201,
        rtx_ssrc: None,
        rid: None,
        quality: None,
        stream_type: Some("screen".into()),
        active: Some(true),
        max_bitrate: None,
        max_framerate: None,
        max_resolution: None,
    };
    let dave = Arc::new(Mutex::new(None));
    let video_ssrc_map = Arc::new(Mutex::new(HashMap::from([(
        descriptor.ssrc,
        RemoteVideoTrackBinding {
            user_id: 42,
            descriptor: descriptor.clone(),
        },
    )])));
    let mut binding = RemoteVideoTrackBinding {
        user_id: 42,
        descriptor,
    };

    let outcome = decrypt_video_frame_candidates(
        &dave,
        &video_ssrc_map,
        &mut binding,
        4201,
        VideoCodecKind::H264,
        Some(VideoFrameCandidate {
            frame: vec![1, 2, 3],
            depacketizer_keyframe: true,
            used_fallback_payload: false,
        }),
        Some(VideoFrameCandidate {
            frame: vec![9, 9, 9],
            depacketizer_keyframe: false,
            used_fallback_payload: true,
        }),
    );

    assert_eq!(outcome.frame, Some(vec![1, 2, 3]));
    assert!(outcome.depacketizer_keyframe);
    assert!(!outcome.needs_recovery);
    assert_eq!(binding.user_id, 42);
}

#[test]
fn decrypt_video_frame_candidates_uses_alternate_candidate_without_dave() {
    let descriptor = VideoStreamDescriptor {
        ssrc: 4301,
        rtx_ssrc: None,
        rid: None,
        quality: None,
        stream_type: Some("screen".into()),
        active: Some(true),
        max_bitrate: None,
        max_framerate: None,
        max_resolution: None,
    };
    let dave = Arc::new(Mutex::new(None));
    let video_ssrc_map = Arc::new(Mutex::new(HashMap::from([(
        descriptor.ssrc,
        RemoteVideoTrackBinding {
            user_id: 42,
            descriptor: descriptor.clone(),
        },
    )])));
    let mut binding = RemoteVideoTrackBinding {
        user_id: 42,
        descriptor,
    };

    let outcome = decrypt_video_frame_candidates(
        &dave,
        &video_ssrc_map,
        &mut binding,
        4301,
        VideoCodecKind::Vp8,
        None,
        Some(VideoFrameCandidate {
            frame: vec![7, 8, 9],
            depacketizer_keyframe: true,
            used_fallback_payload: true,
        }),
    );

    assert_eq!(outcome.frame, Some(vec![7, 8, 9]));
    assert!(outcome.depacketizer_keyframe);
    assert!(!outcome.needs_recovery);
    assert_eq!(binding.user_id, 42);
}

}