src/app_state.rs

use std::collections::HashMap; use std::sync::Arc;

use audiopus::coder::Decoder as OpusDecoder; use crossbeam_channel as crossbeam; use parking_lot::Mutex; use tokio::sync::mpsc; use tokio::time; use tracing::warn;

use crate::audio_pipeline::AudioSendState; use crate::capture::{SpeakingState, UserCaptureState}; use crate::dave::DaveManager; use crate::ipc::{send_error, send_gateway_voice_state_update, send_msg, ErrorCode, OutMsg}; use crate::music::{drain_music_pcm_queue, MusicEvent, MusicState}; use crate::stream_publish::{StreamPublishEvent, StreamPublishFrame, StreamPublishState}; use crate::video::{RemoteVideoState, UserVideoSubscription}; use crate::video_decoder::PersistentVideoDecoder; use crate::voice_conn::{TransportRole, VoiceConnection, VoiceEvent};

pub(crate) fn schedule_reconnect( reconnect_deadline: &mut Optiontime::Instant, reconnect_attempt: &mut u32, guild_id: Option, channel_id: Option, self_mute: bool, reason: &str, ) { let (Some(guild_id), Some(channel_id)) = (guild_id, channel_id) else { warn!(reason = reason, "reconnect skipped: missing guild/channel"); return; };

*reconnect_attempt = reconnect_attempt.saturating_add(1);
let backoff_shift = reconnect_attempt.saturating_sub(1).min(4);
let backoff_ms = 1_000u64 << backoff_shift;
*reconnect_deadline = Some(time::Instant::now() + std::time::Duration::from_millis(backoff_ms));

send_msg(&OutMsg::ConnectionState {
    status: "reconnecting".into(),
});
send_msg(&OutMsg::TransportState {
    role: TransportRole::Voice,
    status: "reconnecting".into(),
    reason: Some(reason.to_string()),
});
send_gateway_voice_state_update(guild_id, channel_id, self_mute);
warn!(
    attempt = *reconnect_attempt,
    backoff_ms = backoff_ms,
    reason = reason,
    "scheduled clankvox voice reconnect"
);

}

#[derive(Default, Clone)] pub(crate) struct PendingConnection { pub(crate) endpoint: Option, pub(crate) token: Option, pub(crate) session_id: Option, pub(crate) user_id: Option, }

#[derive(Default, Clone)] pub(crate) struct PendingStreamWatchConnection { pub(crate) endpoint: Option, pub(crate) token: Option, pub(crate) session_id: Option, pub(crate) user_id: Option, pub(crate) server_id: Option, pub(crate) dave_channel_id: Option, }

#[derive(Default, Clone)] pub(crate) struct PendingStreamPublishConnection { pub(crate) endpoint: Option, pub(crate) token: Option, pub(crate) session_id: Option, pub(crate) user_id: Option, pub(crate) server_id: Option, pub(crate) dave_channel_id: Option, }

pub(crate) fn parse_user_id_field(user_id: &str, context: &str) -> Option { if let Ok(uid) = user_id.parse::() { Some(uid) } else { send_error( ErrorCode::InvalidRequest, format!("{context} requires a numeric user_id, got {user_id:?}"), ); None } }

pub(crate) struct AppState { pub(crate) pending_conn: PendingConnection, pub(crate) guild_id: Option, pub(crate) channel_id: Option, pub(crate) self_mute: bool, pub(crate) reconnect_deadline: Optiontime::Instant, pub(crate) reconnect_attempt: u32, pub(crate) dave: Arc<Mutex<Option>>, pub(crate) voice_conn: Option, pub(crate) stream_watch_pending_conn: PendingStreamWatchConnection, pub(crate) stream_watch_dave: Arc<Mutex<Option>>, pub(crate) stream_watch_conn: Option, pub(crate) stream_publish_pending_conn: PendingStreamPublishConnection, pub(crate) stream_publish_dave: Arc<Mutex<Option>>, pub(crate) stream_publish_conn: Option, pub(crate) voice_event_tx: mpsc::Sender, pub(crate) audio_send_state: Arc<Mutex<Option>>, pub(crate) music_pcm_tx: crossbeam::Sender<Vec>, pub(crate) music_pcm_rx: crossbeam::Receiver<Vec>, pub(crate) music_event_tx: mpsc::Sender, pub(crate) stream_publish_frame_tx: crossbeam::Sender, pub(crate) stream_publish_frame_rx: crossbeam::Receiver, pub(crate) stream_publish_event_tx: crossbeam::Sender, pub(crate) stream_publish_event_rx: crossbeam::Receiver, pub(crate) music: MusicState, pub(crate) stream_publish: StreamPublishState, pub(crate) opus_decoders: HashMap<u32, OpusDecoder>, /// Last RTP sequence number seen per audio SSRC, for gap detection / FEC / PLC. pub(crate) last_rtp_seq: HashMap<u32, u16>, pub(crate) ssrc_map: HashMap<u32, u64>, pub(crate) self_user_id: Option, pub(crate) user_capture_states: HashMap<u64, UserCaptureState>, pub(crate) user_video_subscriptions: HashMap<u64, UserVideoSubscription>, pub(crate) user_video_decoders: HashMap<u64, PersistentVideoDecoder>, pub(crate) remote_video_states: HashMap<u64, RemoteVideoState>, pub(crate) speaking_states: HashMap<u64, SpeakingState>, pub(crate) buffer_depth_tick_counter: u32, pub(crate) buffer_depth_was_nonempty: bool, pub(crate) tts_playback_buffered: bool, pub(crate) stream_publish_frames_sent: u64, }

impl AppState { pub(crate) const BUFFER_DEPTH_REPORT_INTERVAL: u32 = 25;

pub(crate) fn new(
    dave: Arc<Mutex<Option<DaveManager>>>,
    voice_event_tx: mpsc::Sender<VoiceEvent>,
    audio_send_state: Arc<Mutex<Option<AudioSendState>>>,
    music_pcm_tx: crossbeam::Sender<Vec<i16>>,
    music_pcm_rx: crossbeam::Receiver<Vec<i16>>,
    music_event_tx: mpsc::Sender<MusicEvent>,
    stream_publish_frame_tx: crossbeam::Sender<StreamPublishFrame>,
    stream_publish_frame_rx: crossbeam::Receiver<StreamPublishFrame>,
    stream_publish_event_tx: crossbeam::Sender<StreamPublishEvent>,
    stream_publish_event_rx: crossbeam::Receiver<StreamPublishEvent>,
) -> Self {
    Self {
        pending_conn: PendingConnection::default(),
        guild_id: None,
        channel_id: None,
        self_mute: false,
        reconnect_deadline: None,
        reconnect_attempt: 0,
        dave,
        voice_conn: None,
        stream_watch_pending_conn: PendingStreamWatchConnection::default(),
        stream_watch_dave: Arc::new(Mutex::new(None)),
        stream_watch_conn: None,
        stream_publish_pending_conn: PendingStreamPublishConnection::default(),
        stream_publish_dave: Arc::new(Mutex::new(None)),
        stream_publish_conn: None,
        voice_event_tx,
        audio_send_state,
        music_pcm_tx,
        music_pcm_rx,
        music_event_tx,
        stream_publish_frame_tx,
        stream_publish_frame_rx,
        stream_publish_event_tx,
        stream_publish_event_rx,
        music: MusicState::default(),
        stream_publish: StreamPublishState::default(),
        opus_decoders: HashMap::new(),
        last_rtp_seq: HashMap::new(),
        ssrc_map: HashMap::new(),
        self_user_id: None,
        user_capture_states: HashMap::new(),
        user_video_subscriptions: HashMap::new(),
        user_video_decoders: HashMap::new(),
        remote_video_states: HashMap::new(),
        speaking_states: HashMap::new(),
        buffer_depth_tick_counter: 0,
        buffer_depth_was_nonempty: false,
        tts_playback_buffered: false,
        stream_publish_frames_sent: 0,
    }
}

pub(crate) fn start_music_pipeline(
    &mut self,
    url: &str,
    resolved_direct_url: bool,
    clear_output_buffers: bool,
) {
    crate::music::start_music_pipeline(
        crate::music::MusicPipelineRequest {
            url,
            resolved_direct_url,
            clear_output_buffers,
        },
        crate::music::MusicPipelineContext {
            music_player: &mut self.music.player,
            music_pcm_rx: &self.music_pcm_rx,
            music_pcm_tx: &self.music_pcm_tx,
            music_event_tx: &self.music_event_tx,
            audio_send_state: &self.audio_send_state,
        },
    );
}

pub(crate) fn schedule_reconnect(&mut self, reason: &str) {
    schedule_reconnect(
        &mut self.reconnect_deadline,
        &mut self.reconnect_attempt,
        self.guild_id,
        self.channel_id,
        self.self_mute,
        reason,
    );
}

pub(crate) fn reset_reconnect(&mut self) {
    self.reconnect_deadline = None;
    self.reconnect_attempt = 0;
}

pub(crate) fn clear_voice_connection(&mut self) {
    if let Some(ref conn) = self.voice_conn {
        conn.shutdown();
    }
    self.voice_conn = None;
    *self.dave.lock() = None;
    *self.audio_send_state.lock() = None;
}

pub(crate) fn clear_stream_watch_connection(&mut self) {
    if let Some(ref conn) = self.stream_watch_conn {
        conn.shutdown();
    }
    self.stream_watch_conn = None;
    *self.stream_watch_dave.lock() = None;
}

pub(crate) fn clear_stream_publish_connection(&mut self) {
    if let Some(ref conn) = self.stream_publish_conn {
        conn.shutdown();
    }
    self.stream_publish_conn = None;
    *self.stream_publish_dave.lock() = None;
}

pub(crate) fn video_conn(&self) -> Option<&VoiceConnection> {
    self.stream_watch_conn.as_ref().or(self.voice_conn.as_ref())
}

pub(crate) fn emit_transport_state(
    &self,
    role: TransportRole,
    status: &str,
    reason: Option<&str>,
) {
    send_msg(&OutMsg::TransportState {
        role,
        status: status.to_string(),
        reason: reason.map(ToString::to_string),
    });
}

pub(crate) fn clear_transport_runtime_state(&mut self, reason: &str) {
    let cleared_audio_ssrcs = self.ssrc_map.len();
    let cleared_decoders = self.opus_decoders.len();
    let cleared_speaking_users = self.speaking_states.len();
    let cleared_video_users = self.remote_video_states.len();
    let cleared_video_decoders = self.user_video_decoders.len();
    self.ssrc_map.clear();
    self.opus_decoders.clear();
    self.last_rtp_seq.clear();
    self.speaking_states.clear();
    self.remote_video_states.clear();
    self.user_video_decoders.clear();

    tracing::info!(
        reason = reason,
        cleared_audio_ssrcs,
        cleared_decoders,
        cleared_speaking_users,
        cleared_video_users,
        cleared_video_decoders,
        "cleared voice transport runtime state"
    );
}

pub(crate) fn remove_user_runtime_state(&mut self, user_id: u64) {
    let removed_ssrcs = self
        .ssrc_map
        .iter()
        .filter_map(|(&ssrc, &mapped_uid)| (mapped_uid == user_id).then_some(ssrc))
        .collect::<Vec<_>>();
    self.ssrc_map.retain(|_, v| *v != user_id);
    for ssrc in removed_ssrcs {
        self.opus_decoders.remove(&ssrc);
        self.last_rtp_seq.remove(&ssrc);
    }

    let uid_str = user_id.to_string();
    if let Some(ss) = self.speaking_states.remove(&user_id) {
        if ss.is_speaking {
            send_msg(&OutMsg::SpeakingEnd {
                user_id: uid_str.clone(),
            });
        }
    }

    if let Some(state) = self.user_capture_states.remove(&user_id) {
        if state.stream_active {
            send_msg(&OutMsg::UserAudioEnd {
                user_id: uid_str.clone(),
            });
        }
    }

    self.user_video_subscriptions.remove(&user_id);
    self.user_video_decoders.remove(&user_id);
    if let Some(state) = self.remote_video_states.remove(&user_id) {
        let end_ssrc = state
            .video_ssrc
            .or_else(|| state.streams.first().map(|stream| stream.ssrc));
        send_msg(&OutMsg::UserVideoEnd {
            user_id: uid_str.clone(),
            ssrc: end_ssrc,
        });
    }

    send_msg(&OutMsg::ClientDisconnect { user_id: uid_str });
}

pub(crate) fn handle_disconnected(&mut self, reason: &str) {
    tracing::warn!("Voice disconnected: {}", reason);
    send_error(ErrorCode::VoiceRuntimeError, reason.to_string());
    send_msg(&OutMsg::ConnectionState {
        status: "disconnected".into(),
    });
    self.emit_transport_state(TransportRole::Voice, "disconnected", Some(reason));
    self.music.reset();
    self.stream_publish.reset();
    drain_music_pcm_queue(&self.music_pcm_rx);
    self.clear_voice_connection();
    self.clear_stream_publish_connection();
    self.clear_transport_runtime_state(reason);

    self.schedule_reconnect(reason);
}

}

#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum TryConnectOutcome { AlreadyConnected, MissingData, Connected, Failed, }

#[cfg(test)] mod tests { use std::sync::Arc;

use crossbeam_channel as crossbeam;
use parking_lot::Mutex;
use tokio::sync::mpsc;

use crate::audio_pipeline::AudioSendState;
use crate::capture::SpeakingState;
use crate::dave::DaveManager;
use crate::music::MusicEvent;
use crate::stream_publish::{StreamPublishEvent, StreamPublishFrame};
use crate::video::{RemoteVideoState, UserVideoSubscription, VideoStreamDescriptor};
use crate::voice_conn::VoiceEvent;

use super::AppState;

fn test_app_state() -> AppState {
    let dave: Arc<Mutex<Option<DaveManager>>> = Arc::new(Mutex::new(None));
    let (voice_event_tx, _voice_event_rx) = mpsc::channel::<VoiceEvent>(4);
    let audio_send_state = Arc::new(Mutex::new(None::<AudioSendState>));
    let (music_pcm_tx, music_pcm_rx) = crossbeam::bounded::<Vec<i16>>(4);
    let (music_event_tx, _music_event_rx) = mpsc::channel::<MusicEvent>(4);
    let (stream_publish_frame_tx, stream_publish_frame_rx) =
        crossbeam::bounded::<StreamPublishFrame>(4);
    let (stream_publish_event_tx, stream_publish_event_rx) =
        crossbeam::bounded::<StreamPublishEvent>(4);

    AppState::new(
        dave,
        voice_event_tx,
        audio_send_state,
        music_pcm_tx,
        music_pcm_rx,
        music_event_tx,
        stream_publish_frame_tx,
        stream_publish_frame_rx,
        stream_publish_event_tx,
        stream_publish_event_rx,
    )
}

#[test]
fn clear_transport_runtime_state_drops_transport_state_but_keeps_subscriptions() {
    let mut state = test_app_state();
    state.ssrc_map.insert(111, 42);
    state.speaking_states.insert(
        42,
        SpeakingState {
            last_packet_at: None,
            is_speaking: true,
        },
    );
    state.user_video_subscriptions.insert(
        42,
        UserVideoSubscription::new(15, 80, Some(1_280 * 720), Some("screen".into()), None),
    );
    state.remote_video_states.insert(
        42,
        RemoteVideoState {
            audio_ssrc: Some(211),
            video_ssrc: Some(311),
            codec: Some("h264".into()),
            streams: vec![VideoStreamDescriptor {
                ssrc: 311,
                rtx_ssrc: Some(411),
                rid: Some("f".into()),
                quality: Some(100),
                stream_type: Some("screen".into()),
                active: Some(true),
                max_bitrate: Some(4_000_000),
                max_framerate: Some(30),
                max_resolution: None,
            }],
        },
    );

    state.clear_transport_runtime_state("test");

    assert!(state.ssrc_map.is_empty());
    assert!(state.opus_decoders.is_empty());
    assert!(state.speaking_states.is_empty());
    assert!(state.remote_video_states.is_empty());
    assert!(state.user_video_subscriptions.contains_key(&42));
}

}