src/capture_supervisor.rs

use std::collections::hash_map::Entry; use std::collections::BTreeMap;

use audiopus::coder::Decoder as OpusDecoder; use audiopus::packet::Packet as OpusPacket; use audiopus::{Channels, MutSignals, SampleRate}; use base64::Engine as _; use tokio::time;

use crate::app_state::AppState; use crate::capture::{ normalize_sample_rate, normalize_silence_duration_ms, SpeakingState, UserCaptureState, SPEAKING_TIMEOUT_MS, }; use crate::ipc::{send_msg, OutMsg}; use crate::ipc_protocol::CaptureCommand; use crate::video::{RemoteVideoState, UserVideoSubscription}; use crate::video_decoder::PersistentVideoDecoder; use crate::voice_conn::{TransportRole, VoiceEvent};

/// Maximum number of lost RTP packets for which we attempt FEC/PLC recovery. /// Gaps larger than this are likely DTX silence periods or reconnects — /// concealment would produce garbage. const MAX_RECOVERABLE_GAP: i16 = 5;

const FIRST_KEYFRAME_REASSERT_INTERVAL_MS: u64 = 2_000; /// Interval between periodic PLI requests after the first keyframe has been /// received. With DAVE decrypt failures causing ~45-55% frame loss, the /// H264 reference chain accumulates corruption quickly. Aggressive PLI /// ensures the decoder resyncs via IDR frames every 2 seconds. const PERIODIC_KEYFRAME_PLI_INTERVAL_MS: u64 = 2_000;

/// Classification of an incoming RTP sequence number relative to the last /// accepted packet for the same SSRC. #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum RtpSeqClass { /// First packet for this SSRC — no history yet. First, /// Next expected sequence — no loss. Sequential, /// Forward gap: lost_count packets were skipped (1..=MAX_RECOVERABLE_GAP). ForwardLoss { lost_count: u16 }, /// Forward gap too large to recover — likely DTX or reconnect. ForwardLarge, /// Duplicate of the last accepted packet. Duplicate, /// Stale / reordered — a packet older than the last accepted. Stale, }

/// Classify an incoming RTP sequence relative to the last accepted one. /// /// Uses signed distance (i16 cast of wrapping_sub) to correctly handle u16 /// wraparound. Positive distance = forward gap; negative = stale/reordered. fn classify_rtp_sequence(prev_seq: Option, incoming: u16) -> RtpSeqClass { let Some(prev) = prev_seq else { return RtpSeqClass::First; }; let expected = prev.wrapping_add(1); if incoming == expected { return RtpSeqClass::Sequential; } if incoming == prev { return RtpSeqClass::Duplicate; } // Signed distance: positive means the incoming packet is ahead of expected, // negative means it is behind (stale/reordered). let distance = incoming.wrapping_sub(expected) as i16; if distance > 0 && distance <= MAX_RECOVERABLE_GAP { RtpSeqClass::ForwardLoss { lost_count: distance as u16, } } else if distance > MAX_RECOVERABLE_GAP { RtpSeqClass::ForwardLarge } else { // distance <= 0 (or very large u16 wrapping → negative i16) RtpSeqClass::Stale } }

fn update_speaking_state( speaking_states: &mut std::collections::HashMap<u64, SpeakingState>, user_id: u64, now: time::Instant, ) -> bool { let speaking = speaking_states.entry(user_id).or_insert(SpeakingState { last_packet_at: None, is_speaking: false, }); speaking.last_packet_at = Some(now); if speaking.is_speaking { false } else { speaking.is_speaking = true; true } }

fn should_reassert_sink_wants_for_waiting_keyframe( subscription: &mut UserVideoSubscription, keyframe: bool, now: time::Instant, ) -> bool { if keyframe { subscription.last_keyframe_forwarded_at = Some(now); subscription.last_sink_wants_reasserted_at = None; return false; }

// Before first keyframe: request aggressively at 2s intervals
let interval_ms = if subscription.last_keyframe_forwarded_at.is_some() {
    // After first keyframe: request periodically so the per-frame
    // decoder gets fresh independently-decodable keyframes for the
    // vision scanner.
    PERIODIC_KEYFRAME_PLI_INTERVAL_MS
} else {
    FIRST_KEYFRAME_REASSERT_INTERVAL_MS
};
let reassert_interval = std::time::Duration::from_millis(interval_ms);
match subscription.last_sink_wants_reasserted_at {
    Some(last_reasserted_at) if now.duration_since(last_reasserted_at) < reassert_interval => {
        false
    }
    _ => {
        subscription.last_sink_wants_reasserted_at = Some(now);
        true
    }
}

}

impl AppState { fn emit_user_video_state(&self, user_id: u64, state: &RemoteVideoState) { let stream_ssrcs = state .streams .iter() .map(|stream| stream.ssrc) .collect::<Vec<_>>(); let active_stream_count = state .streams .iter() .filter(|stream| stream.is_active()) .count(); tracing::info!( user_id, audio_ssrc = state.audio_ssrc, video_ssrc = state.video_ssrc, codec = ?state.codec.as_deref(), stream_count = state.streams.len(), active_stream_count, stream_ssrcs = ?stream_ssrcs, "clankvox_native_video_state_emitted" ); send_msg(&OutMsg::UserVideoState { user_id: user_id.to_string(), audio_ssrc: state.audio_ssrc, video_ssrc: state.video_ssrc, codec: state.codec.clone(), streams: state.streams.clone(), }); }

fn emit_user_video_end(&self, user_id: u64, state: Option<&RemoteVideoState>) {
    let ssrc = state.and_then(|state| {
        state
            .video_ssrc
            .or_else(|| state.streams.first().map(|stream| stream.ssrc))
    });
    tracing::info!(
        user_id,
        ssrc = ssrc,
        had_cached_state = state.is_some(),
        "clankvox_native_video_end_emitted"
    );
    send_msg(&OutMsg::UserVideoEnd {
        user_id: user_id.to_string(),
        ssrc,
    });
}

fn remove_user_video_runtime_state(&mut self, user_id: u64) {
    let ended_state = self.remote_video_states.remove(&user_id);
    self.user_video_decoders.remove(&user_id);
    self.emit_user_video_end(user_id, ended_state.as_ref());
}

fn refresh_video_sink_wants(&self, reason: &str) {
    if self.voice_conn.is_none() && self.stream_watch_conn.is_none() {
        tracing::info!(
            reason = reason,
            subscribed_user_count = self.user_video_subscriptions.len(),
            remote_video_user_count = self.remote_video_states.len(),
            "clankvox_video_sink_wants_skipped_no_connection"
        );
        return;
    }

    // Collect all wants, partitioned by which transport should
    // carry them.  Screen-share SSRCs go to stream_watch (if
    // connected), everything else (webcam / unknown) to voice.
    let mut voice_wants: BTreeMap<u32, u8> = BTreeMap::new();
    let mut voice_pixels: BTreeMap<u32, f64> = BTreeMap::new();
    let mut sw_wants: BTreeMap<u32, u8> = BTreeMap::new();
    let mut sw_pixels: BTreeMap<u32, f64> = BTreeMap::new();
    let have_sw = self.stream_watch_conn.is_some();

    for (&user_id, remote_state) in &self.remote_video_states {
        // Mark all known SSRCs as "quality 0" (= don't send but
        // acknowledge existence) on the appropriate transport.
        for stream in &remote_state.streams {
            let is_screen = stream
                .stream_type
                .as_deref()
                .is_some_and(|t| t.eq_ignore_ascii_case("screen"));
            if is_screen && have_sw {
                sw_wants.entry(stream.ssrc).or_insert(0);
            } else {
                voice_wants.entry(stream.ssrc).or_insert(0);
            }
        }
        if let Some(video_ssrc) = remote_state.video_ssrc {
            voice_wants.entry(video_ssrc).or_insert(0);
        }

        let Some(subscription) = self.user_video_subscriptions.get(&user_id) else {
            continue;
        };

        if let Some(stream) = remote_state.preferred_stream(subscription) {
            let is_screen = stream
                .stream_type
                .as_deref()
                .is_some_and(|t| t.eq_ignore_ascii_case("screen"));
            let (wants, pixels) = if is_screen && have_sw {
                (&mut sw_wants, &mut sw_pixels)
            } else {
                (&mut voice_wants, &mut voice_pixels)
            };
            wants.insert(stream.ssrc, subscription.preferred_quality);
            if let Some(pixel_count) = subscription
                .preferred_pixel_count
                .or_else(|| stream.pixel_count_hint())
            {
                pixels.insert(stream.ssrc, f64::from(pixel_count));
            }
        } else if let Some(video_ssrc) = remote_state.video_ssrc {
            voice_wants.insert(video_ssrc, subscription.preferred_quality);
            if let Some(pixel_count) = subscription.preferred_pixel_count {
                voice_pixels.insert(video_ssrc, f64::from(pixel_count));
            }
        }
    }

    // Send wants to each transport that has entries.
    let voice_wants_vec = voice_wants.into_iter().collect::<Vec<_>>();
    let voice_pixels_vec = voice_pixels.into_iter().collect::<Vec<_>>();
    let sw_wants_vec = sw_wants.into_iter().collect::<Vec<_>>();
    let sw_pixels_vec = sw_pixels.into_iter().collect::<Vec<_>>();

    let total_wanted = voice_wants_vec.len() + sw_wants_vec.len();
    tracing::info!(
        reason = reason,
        subscribed_user_count = self.user_video_subscriptions.len(),
        remote_video_user_count = self.remote_video_states.len(),
        wanted_ssrc_count = total_wanted,
        wanted_streams = ?voice_wants_vec,
        sw_wanted_streams = ?sw_wants_vec,
        pixel_count_overrides = ?voice_pixels_vec,
        "clankvox_video_sink_wants_updated"
    );

    if !voice_wants_vec.is_empty() || !have_sw {
        if let Some(conn) = self.voice_conn.as_ref() {
            if let Err(error) =
                conn.update_media_sink_wants(&voice_wants_vec, &voice_pixels_vec)
            {
                tracing::warn!(reason = reason, error = %error, "failed to update voice media sink wants");
            }
        }
    }
    if !sw_wants_vec.is_empty() {
        if let Some(conn) = self.stream_watch_conn.as_ref() {
            if let Err(error) = conn.update_media_sink_wants(&sw_wants_vec, &sw_pixels_vec) {
                tracing::warn!(reason = reason, error = %error, "failed to update stream_watch media sink wants");
            }
        }
    }
}

pub(crate) fn handle_capture_command(&mut self, msg: CaptureCommand) {
    match msg {
        CaptureCommand::SubscribeUser {
            user_id,
            silence_duration_ms,
            sample_rate,
        } => {
            let Some(user_id) =
                crate::app_state::parse_user_id_field(&user_id, "subscribe_user")
            else {
                return;
            };
            let sample_rate = normalize_sample_rate(sample_rate);
            let silence_duration_ms = normalize_silence_duration_ms(silence_duration_ms);
            let state = self
                .user_capture_states
                .entry(user_id)
                .or_insert_with(|| UserCaptureState::new(sample_rate, silence_duration_ms));
            state.sample_rate = sample_rate;
            state.silence_duration_ms = silence_duration_ms;
        }
        CaptureCommand::UnsubscribeUser { user_id } => {
            let Some(user_id) =
                crate::app_state::parse_user_id_field(&user_id, "unsubscribe_user")
            else {
                return;
            };
            if let Some(state) = self.user_capture_states.remove(&user_id) {
                if state.stream_active {
                    send_msg(&OutMsg::UserAudioEnd {
                        user_id: user_id.to_string(),
                    });
                }
            }
        }
        CaptureCommand::SubscribeUserVideo {
            user_id,
            max_frames_per_second,
            preferred_quality,
            preferred_pixel_count,
            preferred_stream_type,
            jpeg_quality,
        } => {
            let Some(user_id) =
                crate::app_state::parse_user_id_field(&user_id, "subscribe_user_video")
            else {
                return;
            };
            let subscription = UserVideoSubscription::new(
                max_frames_per_second,
                preferred_quality,
                preferred_pixel_count,
                preferred_stream_type,
                jpeg_quality,
            );
            // Update JPEG quality on any existing decoder for this user
            if let Some(decoder) = self.user_video_decoders.get_mut(&user_id) {
                decoder.set_jpeg_quality(subscription.jpeg_quality);
            }
            let had_cached_remote_state = self.remote_video_states.contains_key(&user_id);
            tracing::info!(
                user_id,
                max_frames_per_second = subscription.max_frames_per_second,
                preferred_quality = subscription.preferred_quality,
                preferred_pixel_count = subscription.preferred_pixel_count,
                preferred_stream_type = ?subscription.preferred_stream_type.as_deref(),
                had_cached_remote_state,
                "clankvox_native_video_subscribe_requested"
            );
            self.user_video_subscriptions.insert(user_id, subscription);
            if let Some(state) = self.remote_video_states.get(&user_id) {
                self.emit_user_video_state(user_id, state);
            }
            self.refresh_video_sink_wants("subscribe_user_video");
        }
        CaptureCommand::UnsubscribeUserVideo { user_id } => {
            let Some(user_id) =
                crate::app_state::parse_user_id_field(&user_id, "unsubscribe_user_video")
            else {
                return;
            };
            let had_subscription = self.user_video_subscriptions.remove(&user_id).is_some();
            tracing::info!(
                user_id,
                had_subscription,
                "clankvox_native_video_unsubscribe_requested"
            );
            self.refresh_video_sink_wants("unsubscribe_user_video");
        }
    }
}

pub(crate) fn handle_voice_event(&mut self, event: VoiceEvent) {
    match event {
        VoiceEvent::Ready { role, ssrc } => {
            tracing::info!(role = role.as_str(), ssrc, "Transport ready");
            match role {
                TransportRole::Voice => {
                    self.reset_reconnect();
                    send_msg(&OutMsg::ConnectionState {
                        status: "ready".into(),
                    });
                    self.emit_transport_state(TransportRole::Voice, "ready", None);
                    send_msg(&OutMsg::Ready);

                    match crate::audio_pipeline::AudioSendState::new() {
                        Ok(state) => {
                            *self.audio_send_state.lock() = Some(state);
                            crate::audio_pipeline::emit_playback_armed(
                                "connection_ready",
                                &self.audio_send_state,
                            );
                        }
                        Err(error) => {
                            tracing::error!("Failed to init audio send state: {}", error)
                        }
                    }
                }
                TransportRole::StreamWatch => {
                    self.emit_transport_state(TransportRole::StreamWatch, "ready", None);
                }
                TransportRole::StreamPublish => {
                    self.emit_transport_state(TransportRole::StreamPublish, "ready", None);
                    self.maybe_start_stream_publish_pipeline();
                }
            }
            self.refresh_video_sink_wants(match role {
                TransportRole::Voice => "voice_ready",
                TransportRole::StreamWatch => "stream_watch_ready",
                TransportRole::StreamPublish => "stream_publish_ready",
            });
        }
        VoiceEvent::SsrcUpdate {
            role,
            ssrc,
            user_id,
        } => {
            if role == TransportRole::Voice
                && self.ssrc_map.insert(ssrc, user_id) != Some(user_id)
            {
                self.opus_decoders.remove(&ssrc);
                self.last_rtp_seq.remove(&ssrc);
            }
        }
        VoiceEvent::VideoStateUpdate {
            role,
            user_id,
            audio_ssrc,
            video_ssrc,
            codec,
            streams,
        } => {
            // When a stream_watch transport exists, screen-share video
            // state arrives on that transport.  Voice-transport video
            // state updates for screen shares are redundant — but
            // webcam ("video") streams only appear on the voice
            // transport and must be allowed through.
            if role == TransportRole::Voice && self.stream_watch_conn.is_some() {
                let has_non_screen_stream = streams.iter().any(|s| {
                    s.stream_type
                        .as_deref()
                        .is_some_and(|t| !t.eq_ignore_ascii_case("screen"))
                });
                if !has_non_screen_stream {
                    return;
                }
            }
            if self.self_user_id == Some(user_id) {
                return;
            }

            let previous = self.remote_video_states.get(&user_id).cloned();
            let clear_video_state = video_ssrc.is_none() && streams.is_empty();
            let incoming_stream_ssrcs =
                streams.iter().map(|stream| stream.ssrc).collect::<Vec<_>>();
            let incoming_active_stream_count =
                streams.iter().filter(|stream| stream.is_active()).count();
            let previous_stream_count = previous
                .as_ref()
                .map(|state| state.streams.len())
                .unwrap_or_default();
            tracing::info!(
                user_id,
                clear_video_state,
                audio_ssrc = audio_ssrc,
                video_ssrc = video_ssrc,
                codec = ?codec.as_deref(),
                incoming_stream_count = streams.len(),
                incoming_active_stream_count,
                incoming_stream_ssrcs = ?incoming_stream_ssrcs,
                previous_stream_count,
                "clankvox_native_video_state_received"
            );
            let state = RemoteVideoState {
                audio_ssrc: if clear_video_state {
                    None
                } else {
                    audio_ssrc.or_else(|| previous.as_ref().and_then(|state| state.audio_ssrc))
                },
                video_ssrc: if clear_video_state {
                    None
                } else {
                    video_ssrc.or_else(|| previous.as_ref().and_then(|state| state.video_ssrc))
                },
                codec: if clear_video_state {
                    None
                } else {
                    codec.or_else(|| previous.as_ref().and_then(|state| state.codec.clone()))
                },
                streams: if clear_video_state {
                    Vec::new()
                } else if streams.is_empty() {
                    previous
                        .as_ref()
                        .map(|state| state.streams.clone())
                        .unwrap_or_default()
                } else {
                    streams
                },
            };

            if state.has_streams() {
                self.remote_video_states.insert(user_id, state.clone());
                self.emit_user_video_state(user_id, &state);
            } else {
                let ended_state = self.remote_video_states.remove(&user_id).or(previous);
                self.emit_user_video_end(user_id, ended_state.as_ref());
            }

            self.refresh_video_sink_wants(match role {
                TransportRole::Voice => "video_state_update",
                TransportRole::StreamWatch => "stream_watch_video_state_update",
                TransportRole::StreamPublish => "stream_publish_video_state_update",
            });
        }
        VoiceEvent::ClientDisconnect { role, user_id } => {
            if self.self_user_id != Some(user_id) {
                match role {
                    TransportRole::Voice => self.remove_user_runtime_state(user_id),
                    TransportRole::StreamWatch => self.remove_user_video_runtime_state(user_id),
                    TransportRole::StreamPublish => {}
                }
                self.refresh_video_sink_wants(match role {
                    TransportRole::Voice => "client_disconnect",
                    TransportRole::StreamWatch => "stream_watch_client_disconnect",
                    TransportRole::StreamPublish => "stream_publish_client_disconnect",
                });
            }
        }
        VoiceEvent::OpusReceived {
            role,
            ssrc,
            opus_frame,
            rtp_sequence,
        } => {
            if role != TransportRole::Voice {
                return;
            }
            let Some(&user_id) = self.ssrc_map.get(&ssrc) else {
                tracing::debug!("Dropped Opus frame from unknown ssrc: {ssrc}");
                return;
            };
            if self.self_user_id == Some(user_id) {
                return;
            }

            // --- RTP sequence classification ---
            let seq_class =
                classify_rtp_sequence(self.last_rtp_seq.get(&ssrc).copied(), rtp_sequence);

            // Drop stale and duplicate packets — feeding them to the
            // decoder would corrupt its internal state and produce
            // out-of-order or doubled audio.  Speaking state is NOT
            // updated for these packets so that duplicates/reorders
            // cannot artificially stretch SpeakingStart/SpeakingEnd timing.
            match seq_class {
                RtpSeqClass::Duplicate => {
                    tracing::debug!(ssrc, rtp_sequence, "Dropped duplicate RTP packet");
                    return;
                }
                RtpSeqClass::Stale => {
                    tracing::debug!(ssrc, rtp_sequence, "Dropped stale/reordered RTP packet");
                    return;
                }
                _ => {}
            }

            // Speaking state is updated only after duplicate/stale
            // filtering so that discarded packets cannot stretch
            // speaking activity.  This fires BEFORE the user_capture_states
            // gate so that the initial SpeakingStart reaches TypeScript and
            // triggers subscribe_user (bootstrap).
            if update_speaking_state(&mut self.speaking_states, user_id, time::Instant::now()) {
                send_msg(&OutMsg::SpeakingStart {
                    user_id: user_id.to_string(),
                });
            }

            // Gate audio decode/forwarding on subscription — only users
            // that TypeScript has subscribed via subscribe_user get their
            // Opus decoded and forwarded as UserAudio PCM.
            let Some(state) = self.user_capture_states.get(&user_id) else {
                return;
            };
            let target_sample_rate = state.sample_rate;

            // Ensure an Opus decoder exists for this SSRC.
            if let Entry::Vacant(entry) = self.opus_decoders.entry(ssrc) {
                let decoder = match OpusDecoder::new(SampleRate::Hz48000, Channels::Stereo) {
                    Ok(decoder) => decoder,
                    Err(error) => {
                        tracing::error!(
                            "failed to init Opus decoder for ssrc={}: {:?}",
                            ssrc,
                            error
                        );
                        return;
                    }
                };
                entry.insert(decoder);
            }

            let decoder = self
                .opus_decoders
                .get_mut(&ssrc)
                .expect("decoder inserted above");

            // Helper: convert decoded stereo PCM to LLM-ready output.
            let convert_frame = |decoded: &[i16], target_sample_rate: u32| {
                crate::audio_pipeline::convert_decoded_to_llm(decoded, target_sample_rate)
            };

            // --- FEC / PLC for forward packet loss ---
            // Recovery frames are buffered (not emitted) until the
            // current anchor packet decodes successfully. If the anchor
            // fails, the recovery audio is discarded so we never emit
            // orphaned concealment frames without the real packet that
            // anchors them.
            let mut recovery_frames: Vec<(Vec<u8>, u16, usize, usize)> = Vec::new();
            if let RtpSeqClass::ForwardLoss { lost_count } = seq_class {
                let plc_count = lost_count.saturating_sub(1) as usize;
                if plc_count > 0 {
                    tracing::debug!(
                        ssrc,
                        lost_count,
                        plc_count,
                        "Opus PLC: synthesizing {plc_count} concealment frame(s)"
                    );
                }
                for _ in 0..plc_count {
                    let mut plc_buf = vec![0i16; 5760];
                    let plc_signals = MutSignals::try_from(plc_buf.as_mut_slice())
                        .expect("non-empty signal buffer");
                    if let Ok(plc_samples) = decoder.decode(None, plc_signals, false) {
                        let total = plc_samples * 2;
                        recovery_frames
                            .push(convert_frame(&plc_buf[..total], target_sample_rate));
                    }
                }

                // Recover the frame immediately before the current packet
                // using in-band FEC.
                let mut fec_buf = vec![0i16; 5760];
                let fec_packet = match OpusPacket::try_from(opus_frame.as_slice()) {
                    Ok(p) => p,
                    Err(error) => {
                        tracing::debug!("Invalid Opus packet (FEC) ssrc={}: {:?}", ssrc, error);
                        return;
                    }
                };
                let fec_signals = MutSignals::try_from(fec_buf.as_mut_slice())
                    .expect("non-empty signal buffer");
                if let Ok(fec_samples) = decoder.decode(Some(fec_packet), fec_signals, true) {
                    let total = fec_samples * 2;
                    recovery_frames.push(convert_frame(&fec_buf[..total], target_sample_rate));
                    tracing::debug!(ssrc, lost_count, "Opus FEC: recovered prior frame");
                }
            }

            // --- Normal decode of the current packet ---
            let mut pcm_stereo = vec![0i16; 5760];
            let decode_result = {
                let packet = match OpusPacket::try_from(opus_frame.as_slice()) {
                    Ok(packet) => packet,
                    Err(error) => {
                        tracing::debug!("Invalid Opus packet for ssrc={}: {:?}", ssrc, error);
                        return;
                    }
                };
                let signals = MutSignals::try_from(pcm_stereo.as_mut_slice())
                    .expect("non-empty signal buffer");
                decoder.decode(Some(packet), signals, false)
            };

            match decode_result {
                Ok(samples_per_channel) => {
                    // Anchor decode succeeded — emit any buffered recovery
                    // frames first (in chronological order), then the
                    // current packet.
                    for (pcm, peak, active, total) in recovery_frames {
                        if !pcm.is_empty() {
                            send_msg(&OutMsg::UserAudio {
                                user_id: user_id.to_string(),
                                pcm,
                                signal_peak_abs: peak,
                                signal_active_sample_count: active,
                                signal_sample_count: total,
                            });
                        }
                    }

                    let total_samples = samples_per_channel * 2;
                    let (llm_pcm, peak, active, total) =
                        convert_frame(&pcm_stereo[..total_samples], target_sample_rate);
                    if !llm_pcm.is_empty() {
                        send_msg(&OutMsg::UserAudio {
                            user_id: user_id.to_string(),
                            pcm: llm_pcm,
                            signal_peak_abs: peak,
                            signal_active_sample_count: active,
                            signal_sample_count: total,
                        });
                    }

                    // Only advance the sequence tracker after a successful
                    // decode — failed decodes should not corrupt gap detection.
                    self.last_rtp_seq.insert(ssrc, rtp_sequence);

                    if let Some(state) = self.user_capture_states.get_mut(&user_id) {
                        state.touch_audio(time::Instant::now());
                    }
                }
                Err(error) => {
                    // Anchor decode failed — discard buffered recovery
                    // frames (they were decoded into the Opus decoder's
                    // state but we do not emit them without a valid anchor).
                    if !recovery_frames.is_empty() {
                        tracing::debug!(
                            ssrc,
                            rtp_sequence,
                            recovery_count = recovery_frames.len(),
                            "Opus anchor decode failed; discarding {count} recovery frame(s)",
                            count = recovery_frames.len()
                        );
                    }
                    tracing::debug!("Opus decode error for ssrc={}: {:?}", ssrc, error);
                }
            }
        }
        VoiceEvent::VideoFrameReceived {
            role,
            user_id,
            ssrc,
            codec,
            keyframe,
            frame,
            rtp_timestamp,
            stream_type,
            rid,
            dave_decrypted,
        } => {
            // When a stream_watch transport is active, screen-share
            // video frames arrive on that transport.  Voice-transport
            // frames for screen shares would be duplicates.  But
            // webcam frames only arrive on the voice transport —
            // allow them through based on stream_type.
            if role == TransportRole::Voice && self.stream_watch_conn.is_some() {
                let is_screen = stream_type
                    .as_deref()
                    .is_some_and(|t| t.eq_ignore_ascii_case("screen"));
                // Also allow through when stream_type is unknown (None)
                // since webcam streams sometimes lack a type tag.
                if is_screen {
                    return;
                }
            }
            if self.self_user_id == Some(user_id) {
                return;
            }

            if !self.user_video_subscriptions.contains_key(&user_id) {
                return;
            }

            let is_h264 = codec.eq_ignore_ascii_case("h264");

            if is_h264 {
                // ── Persistent H264 decode path ──
                //
                // Feed EVERY frame to the decoder so it maintains full
                // reference-frame state.  Only rate-limit the JPEG
                // emission over IPC.

                // Read subscription values before taking mutable borrows
                // on other AppState fields (avoids borrow conflicts).
                let max_fps = self.user_video_subscriptions[&user_id].max_frames_per_second;
                let sub_jpeg_quality = self.user_video_subscriptions[&user_id].jpeg_quality;

                if let Some(subscription) = self.user_video_subscriptions.get_mut(&user_id) {
                    subscription.forwarded_frame_count =
                        subscription.forwarded_frame_count.saturating_add(1);
                    if subscription.forwarded_frame_count == 1 {
                        tracing::info!(
                            user_id,
                            ssrc,
                            codec = %codec,
                            keyframe,
                            frame_bytes = frame.len(),
                            rtp_timestamp,
                            stream_type = ?stream_type.as_deref(),
                            rid = ?rid.as_deref(),
                            max_frames_per_second = max_fps,
                            "clankvox_first_video_frame_forwarded"
                        );
                    }
                }

                // Lazily create or retrieve the decoder.  If init fails,
                // skip H264 decode for this user instead of panicking.
                //
                // Decode + extract scalar state inside a scoped block so
                // the mutable borrow on `user_video_decoders` is released
                // before we call `self.refresh_video_sink_wants()` etc.
                let (decoded, needs_pli, frames_decoded) = {
                    let decoder = match self.user_video_decoders.entry(user_id) {
                        std::collections::hash_map::Entry::Occupied(entry) => entry.into_mut(),
                        std::collections::hash_map::Entry::Vacant(entry) => {
                            match PersistentVideoDecoder::new() {
                                Ok(mut d) => {
                                    d.set_jpeg_quality(sub_jpeg_quality);
                                    tracing::info!(
                                        user_id,
                                        ssrc,
                                        jpeg_quality = sub_jpeg_quality,
                                        "clankvox_persistent_h264_decoder_created"
                                    );
                                    entry.insert(d)
                                }
                                Err(e) => {
                                    tracing::error!(
                                        user_id,
                                        error = %e,
                                        "clankvox_persistent_h264_decoder_init_failed"
                                    );
                                    return;
                                }
                            }
                        }
                    };

                    let decoded = decoder.decode_frame(&frame);
                    let needs_pli = decoder.take_pending_pli();
                    let frames_decoded = decoder.frames_decoded();
                    (decoded, needs_pli, frames_decoded)
                };

                // If the decoder was reset after sustained errors, it
                // needs a fresh keyframe.  Send PLI once.
                if needs_pli {
                    tracing::info!(user_id, ssrc, "clankvox_decoder_reset_requesting_pli");
                    self.refresh_video_sink_wants("decoder_reset_pli");
                    if let Some(conn) = self.video_conn() {
                        if let Err(error) = conn.send_rtcp_pli(ssrc) {
                            tracing::warn!(
                                ssrc,
                                error = %error,
                                "clankvox_decoder_reset_pli_failed"
                            );
                        }
                    }
                }

                // Rate-limit JPEG emission — only send over IPC at the
                // configured FPS.  The decoder still ingested every frame
                // above so inter-frame state is intact.
                let Some(decoded) = decoded else {
                    return;
                };

                let now = time::Instant::now();
                let min_gap =
                    std::time::Duration::from_secs_f64(1.0 / f64::from(max_fps.max(1)));
                let should_emit =
                    if let Some(subscription) = self.user_video_subscriptions.get(&user_id) {
                        match subscription.last_frame_sent_at {
                            Some(last) => now.duration_since(last) >= min_gap,
                            None => true,
                        }
                    } else {
                        return;
                    };
                if !should_emit {
                    return;
                }
                if let Some(subscription) = self.user_video_subscriptions.get_mut(&user_id) {
                    subscription.last_frame_sent_at = Some(now);
                }

                if frames_decoded == 1 {
                    tracing::info!(
                        user_id,
                        ssrc,
                        width = decoded.width,
                        height = decoded.height,
                        jpeg_bytes = decoded.jpeg_data.len(),
                        change_score = decoded.change_score,
                        "clankvox_first_h264_frame_decoded"
                    );
                }

                // Periodic change-score logging for threshold tuning
                // (every 60th frame ≈ 30 s at 2 fps).
                if frames_decoded % 60 == 0 {
                    tracing::debug!(
                        user_id,
                        ssrc,
                        frames_decoded,
                        change_score = %format!("{:.4}", decoded.change_score),
                        ema_change_score = %format!("{:.4}", decoded.ema_change_score),
                        is_scene_cut = decoded.is_scene_cut,
                        "clankvox_frame_diff_periodic"
                    );
                }

                let jpeg_base64 =
                    base64::engine::general_purpose::STANDARD.encode(&decoded.jpeg_data);
                send_msg(&OutMsg::DecodedVideoFrame {
                    user_id: user_id.to_string(),
                    ssrc,
                    width: decoded.width,
                    height: decoded.height,
                    jpeg_base64,
                    rtp_timestamp,
                    stream_type,
                    rid,
                    change_score: decoded.change_score,
                    ema_change_score: decoded.ema_change_score,
                    is_scene_cut: decoded.is_scene_cut,
                });
            } else {
                // ── Non-H264 (VP8): forward raw frame for TS-side ffmpeg decode ──
                let Some(subscription) = self.user_video_subscriptions.get_mut(&user_id) else {
                    return;
                };

                let now = time::Instant::now();
                let min_gap = std::time::Duration::from_secs_f64(
                    1.0 / f64::from(subscription.max_frames_per_second.max(1)),
                );
                if let Some(last_frame_sent_at) = subscription.last_frame_sent_at {
                    if now.duration_since(last_frame_sent_at) < min_gap && !keyframe {
                        return;
                    }
                }
                subscription.last_frame_sent_at = Some(now);

                subscription.forwarded_frame_count =
                    subscription.forwarded_frame_count.saturating_add(1);
                if subscription.forwarded_frame_count == 1 {
                    tracing::info!(
                        user_id,
                        ssrc,
                        codec = %codec,
                        keyframe,
                        frame_bytes = frame.len(),
                        rtp_timestamp,
                        stream_type = ?stream_type.as_deref(),
                        rid = ?rid.as_deref(),
                        max_frames_per_second = subscription.max_frames_per_second,
                        "clankvox_first_video_frame_forwarded"
                    );
                }

                let should_reassert_sink_wants =
                    should_reassert_sink_wants_for_waiting_keyframe(
                        subscription,
                        keyframe,
                        now,
                    );

                let frame_base64 = base64::engine::general_purpose::STANDARD.encode(frame);
                send_msg(&OutMsg::UserVideoFrame {
                    user_id: user_id.to_string(),
                    ssrc,
                    codec,
                    keyframe,
                    frame_base64,
                    rtp_timestamp,
                    stream_type: stream_type.clone(),
                    rid: rid.clone(),
                    dave_decrypted,
                });
                if should_reassert_sink_wants {
                    tracing::info!(
                        user_id,
                        ssrc,
                        forwarded_frame_count = subscription.forwarded_frame_count,
                        "clankvox_waiting_for_first_keyframe_reasserting_sink_wants"
                    );
                    self.refresh_video_sink_wants("waiting_for_first_keyframe");
                    if let Some(conn) = self.video_conn() {
                        if let Err(error) = conn.send_rtcp_pli(ssrc) {
                            tracing::warn!(
                                ssrc,
                                error = %error,
                                "clankvox_rtcp_pli_failed"
                            );
                        }
                    }
                }
            }
        }
        VoiceEvent::DaveReady { role } => {
            tracing::info!(role = role.as_str(), "DAVE E2EE session is ready");
            // For stream watch: the initial keyframe burst from Discord
            // often arrives before the DAVE session is ready, so those
            // frames fail decrypt and are lost.  Immediately request a
            // fresh keyframe now that we can actually decrypt.
            if role == TransportRole::StreamWatch || role == TransportRole::Voice {
                if let Some(conn) = self.video_conn() {
                    for remote_state in self.remote_video_states.values() {
                        for stream in &remote_state.streams {
                            tracing::info!(
                                role = role.as_str(),
                                ssrc = stream.ssrc,
                                "clankvox_dave_ready_pli_requesting_keyframe"
                            );
                            if let Err(error) = conn.send_rtcp_pli(stream.ssrc) {
                                tracing::warn!(
                                    ssrc = stream.ssrc,
                                    error = %error,
                                    "clankvox_dave_ready_pli_failed"
                                );
                            }
                        }
                        if let Some(video_ssrc) = remote_state.video_ssrc {
                            if !remote_state.streams.iter().any(|s| s.ssrc == video_ssrc) {
                                tracing::info!(
                                    role = role.as_str(),
                                    ssrc = video_ssrc,
                                    "clankvox_dave_ready_pli_requesting_keyframe"
                                );
                                if let Err(error) = conn.send_rtcp_pli(video_ssrc) {
                                    tracing::warn!(
                                        ssrc = video_ssrc,
                                        error = %error,
                                        "clankvox_dave_ready_pli_failed"
                                    );
                                }
                            }
                        }
                    }
                }
            }
        }
        VoiceEvent::Disconnected { role, reason } => match role {
            TransportRole::Voice => self.handle_disconnected(&reason),
            TransportRole::StreamWatch => {
                tracing::warn!(reason = %reason, "Stream watch transport disconnected");
                self.clear_stream_watch_connection();
                self.emit_transport_state(
                    TransportRole::StreamWatch,
                    "disconnected",
                    Some(&reason),
                );
                self.refresh_video_sink_wants("stream_watch_disconnected");
            }
            TransportRole::StreamPublish => {
                tracing::warn!(reason = %reason, "Stream publish transport disconnected");
                self.stop_stream_publish_runtime("stream_publish_transport_disconnected");
                self.clear_stream_publish_connection();
                self.emit_transport_state(
                    TransportRole::StreamPublish,
                    "disconnected",
                    Some(&reason),
                );
            }
        },
    }
}

pub(crate) fn on_capture_tick(&mut self, now: time::Instant) {
    let mut speaking_ended_users: Vec<u64> = Vec::new();
    for (&user_id, state) in &mut self.speaking_states {
        if !state.is_speaking {
            continue;
        }
        if let Some(last_at) = state.last_packet_at {
            let silent_ms = now.duration_since(last_at).as_millis() as u64;
            if silent_ms >= SPEAKING_TIMEOUT_MS {
                state.is_speaking = false;
                speaking_ended_users.push(user_id);
            }
        }
    }
    for user_id in speaking_ended_users {
        send_msg(&OutMsg::SpeakingEnd {
            user_id: user_id.to_string(),
        });
    }

    for (user_id, state) in &mut self.user_capture_states {
        if !state.stream_active {
            continue;
        }
        let Some(last_audio_at) = state.last_audio_at else {
            state.last_audio_at = Some(now);
            continue;
        };
        let silent_for_ms = now.duration_since(last_audio_at).as_millis() as u64;
        if silent_for_ms >= u64::from(state.silence_duration_ms) {
            state.stream_active = false;
            state.last_audio_at = None;
            send_msg(&OutMsg::UserAudioEnd {
                user_id: user_id.to_string(),
            });
        }
    }
}

}

#[cfg(test)] mod tests { use std::collections::HashMap; use std::time::Duration;

use tokio::time;

use crate::capture::SpeakingState;
use crate::video::UserVideoSubscription;

use super::{
    classify_rtp_sequence, should_reassert_sink_wants_for_waiting_keyframe,
    update_speaking_state, RtpSeqClass,
};

#[test]
fn update_speaking_state_only_triggers_on_first_packet_of_burst() {
    let mut speaking_states: HashMap<u64, SpeakingState> = HashMap::new();
    let first_packet_at = time::Instant::now();

    assert!(update_speaking_state(
        &mut speaking_states,
        42,
        first_packet_at
    ));
    assert!(!update_speaking_state(
        &mut speaking_states,
        42,
        first_packet_at + Duration::from_millis(20)
    ));

    let state = speaking_states.get(&42).expect("speaking state inserted");
    assert!(state.is_speaking);
    assert_eq!(
        state.last_packet_at,
        Some(first_packet_at + Duration::from_millis(20))
    );
}

#[test]
fn waiting_for_first_keyframe_reasserts_sink_wants_until_keyframe_arrives() {
    let mut subscription =
        UserVideoSubscription::new(2, 100, Some(921_600), Some("screen".into()), None);
    let started_at = time::Instant::now();

    assert!(should_reassert_sink_wants_for_waiting_keyframe(
        &mut subscription,
        false,
        started_at
    ));
    assert!(!should_reassert_sink_wants_for_waiting_keyframe(
        &mut subscription,
        false,
        started_at + Duration::from_millis(500)
    ));
    assert!(should_reassert_sink_wants_for_waiting_keyframe(
        &mut subscription,
        false,
        started_at + Duration::from_secs(2)
    ));
    assert!(!should_reassert_sink_wants_for_waiting_keyframe(
        &mut subscription,
        true,
        started_at + Duration::from_secs(3)
    ));
    assert_eq!(
        subscription.last_keyframe_forwarded_at,
        Some(started_at + Duration::from_secs(3))
    );
    assert_eq!(subscription.last_sink_wants_reasserted_at, None);
}

// --- RTP sequence classification tests ---

#[test]
fn rtp_seq_first_packet_returns_first() {
    assert_eq!(classify_rtp_sequence(None, 100), RtpSeqClass::First);
    assert_eq!(classify_rtp_sequence(None, 0), RtpSeqClass::First);
    assert_eq!(classify_rtp_sequence(None, u16::MAX), RtpSeqClass::First);
}

#[test]
fn rtp_seq_sequential_packet() {
    assert_eq!(
        classify_rtp_sequence(Some(100), 101),
        RtpSeqClass::Sequential
    );
    assert_eq!(classify_rtp_sequence(Some(0), 1), RtpSeqClass::Sequential);
}

#[test]
fn rtp_seq_sequential_wraps_u16() {
    assert_eq!(
        classify_rtp_sequence(Some(u16::MAX), 0),
        RtpSeqClass::Sequential
    );
    assert_eq!(
        classify_rtp_sequence(Some(65534), 65535),
        RtpSeqClass::Sequential
    );
}

#[test]
fn rtp_seq_duplicate_detected() {
    assert_eq!(
        classify_rtp_sequence(Some(100), 100),
        RtpSeqClass::Duplicate
    );
    assert_eq!(classify_rtp_sequence(Some(0), 0), RtpSeqClass::Duplicate);
    assert_eq!(
        classify_rtp_sequence(Some(u16::MAX), u16::MAX),
        RtpSeqClass::Duplicate
    );
}

#[test]
fn rtp_seq_forward_loss_small_gaps() {
    // Gap of 1 lost packet: prev=100, expected=101, got 102
    assert_eq!(
        classify_rtp_sequence(Some(100), 102),
        RtpSeqClass::ForwardLoss { lost_count: 1 }
    );
    // Gap of 3 lost packets
    assert_eq!(
        classify_rtp_sequence(Some(100), 104),
        RtpSeqClass::ForwardLoss { lost_count: 3 }
    );
    // Gap of exactly MAX_RECOVERABLE_GAP (5)
    assert_eq!(
        classify_rtp_sequence(Some(100), 106),
        RtpSeqClass::ForwardLoss { lost_count: 5 }
    );
}

#[test]
fn rtp_seq_forward_loss_across_wraparound() {
    // prev=65534, expected=65535, got 0 → gap of 1 lost packet
    assert_eq!(
        classify_rtp_sequence(Some(65534), 0),
        RtpSeqClass::ForwardLoss { lost_count: 1 }
    );
    // prev=65533, expected=65534, got 0 → gap of 2
    assert_eq!(
        classify_rtp_sequence(Some(65533), 0),
        RtpSeqClass::ForwardLoss { lost_count: 2 }
    );
}

#[test]
fn rtp_seq_forward_large_gap() {
    // Gap of 6 (> MAX_RECOVERABLE_GAP): prev=100, expected=101, got 107
    assert_eq!(
        classify_rtp_sequence(Some(100), 107),
        RtpSeqClass::ForwardLarge
    );
    assert_eq!(
        classify_rtp_sequence(Some(100), 200),
        RtpSeqClass::ForwardLarge
    );
    assert_eq!(
        classify_rtp_sequence(Some(100), 1000),
        RtpSeqClass::ForwardLarge
    );
}

#[test]
fn rtp_seq_stale_reordered_packet() {
    assert_eq!(classify_rtp_sequence(Some(100), 99), RtpSeqClass::Stale);
    assert_eq!(classify_rtp_sequence(Some(100), 98), RtpSeqClass::Stale);
    assert_eq!(classify_rtp_sequence(Some(100), 50), RtpSeqClass::Stale);
}

#[test]
fn rtp_seq_stale_across_wraparound() {
    // prev=5, expected=6, got 65535 → stale (late arrival from before wrap)
    assert_eq!(classify_rtp_sequence(Some(5), 65535), RtpSeqClass::Stale);
    assert_eq!(classify_rtp_sequence(Some(5), 65534), RtpSeqClass::Stale);
}

#[test]
fn rtp_seq_large_forward_near_half_u16_is_forward() {
    // Distance ~32000 (positive i16) → ForwardLarge
    assert_eq!(
        classify_rtp_sequence(Some(0), 32000),
        RtpSeqClass::ForwardLarge
    );
}

#[test]
fn rtp_seq_large_backward_near_half_u16_is_stale() {
    // prev=32000, expected=32001, got 0 → wrapping_sub maps to negative i16 → stale
    assert_eq!(classify_rtp_sequence(Some(32000), 0), RtpSeqClass::Stale);
}

}