src/playback_supervisor.rs

use base64::Engine as _; use tokio::time; use tracing::{info, warn};

use crate::app_state::AppState; use crate::audio_pipeline::{ clear_audio_send_buffer, clear_music_send_buffer, clear_tts_send_buffer, convert_llm_to_48k_mono, emit_playback_armed, has_buffered_music_output, resume_music_output, suppress_music_output, }; use crate::ipc::{OutMsg, send_buffer_depth, send_msg, send_tts_playback_state}; use crate::ipc_protocol::PlaybackCommand; use crate::music::{MusicEvent, drain_music_pcm_queue, is_music_output_drained};

impl AppState { pub(crate) fn handle_playback_command(&mut self, msg: PlaybackCommand) -> bool { match msg { PlaybackCommand::Audio { pcm_base64, sample_rate, } => { let now = time::Instant::now(); if self.music.pending_url.is_some() { self.music.pending_audio_seen = true; self.music.pending_last_audio_at = Some(now); }

            if self.music.active && !self.music.paused {
                let is_ducked = {
                    let guard = self.audio_send_state.lock();
                    guard
                        .as_ref()
                        .is_some_and(crate::audio_pipeline::AudioSendState::is_music_ducked)
                };
                if !is_ducked {
                    return false;
                }
            }

            let engine = base64::engine::general_purpose::STANDARD;
            if let Ok(raw) = engine.decode(&pcm_base64) {
                let samples = convert_llm_to_48k_mono(&raw, sample_rate);
                if !samples.is_empty() {
                    let mut emit_tts_buffered = false;
                    {
                        let mut guard = self.audio_send_state.lock();
                        if let Some(ref mut state) = *guard {
                            state.push_pcm(samples);
                            if state.tts_buffer_samples() > 0 && !self.tts_playback_buffered {
                                self.tts_playback_buffered = true;
                                emit_tts_buffered = true;
                            }
                        }
                    }
                    if emit_tts_buffered {
                        send_tts_playback_state("buffered", "tts_pcm_enqueued");
                    }
                }
            }
            false
        }
        PlaybackCommand::StopPlayback => {
            self.music.reset();
            drain_music_pcm_queue(&self.music_pcm_rx);
            clear_audio_send_buffer(&self.audio_send_state);
            if self.tts_playback_buffered {
                self.tts_playback_buffered = false;
                send_tts_playback_state("idle", "stop_playback");
            }
            send_msg(&OutMsg::PlayerState {
                status: "idle".into(),
            });
            emit_playback_armed("stop_playback", &self.audio_send_state);
            false
        }
        PlaybackCommand::StopTtsPlayback => {
            clear_tts_send_buffer(&self.audio_send_state);
            if self.tts_playback_buffered {
                self.tts_playback_buffered = false;
                send_tts_playback_state("idle", "stop_tts_playback");
            }
            false
        }
        PlaybackCommand::MusicPlay {
            url,
            resolved_direct_url,
        } => {
            let normalized_url = url.trim().to_string();
            if normalized_url.is_empty() {
                send_msg(&OutMsg::MusicError {
                    message: "music_play missing url".to_string(),
                });
                return false;
            }

            clear_music_send_buffer(&self.audio_send_state);
            self.music
                .queue_pending_start(normalized_url.clone(), resolved_direct_url);
            self.start_music_pipeline(&normalized_url, resolved_direct_url, false);
            tracing::info!(
                "music_play queued pending start url={} direct={} (waiting for announcement drain)",
                normalized_url,
                resolved_direct_url
            );
            false
        }
        PlaybackCommand::MusicStop => {
            if self.music.player.is_some() && self.music.active {
                let mut guard = self.audio_send_state.lock();
                if let Some(ref mut state) = *guard {
                    let _ = state.set_music_gain(0.0, 300);
                }
                self.music.pending_stop = true;
            } else {
                self.music.reset();
                drain_music_pcm_queue(&self.music_pcm_rx);
                clear_audio_send_buffer(&self.audio_send_state);
                if self.tts_playback_buffered {
                    self.tts_playback_buffered = false;
                    send_tts_playback_state("idle", "music_stop");
                }
                send_msg(&OutMsg::PlayerState {
                    status: "idle".into(),
                });
                send_msg(&OutMsg::MusicIdle);
                emit_playback_armed("music_stop", &self.audio_send_state);
            }
            false
        }
        PlaybackCommand::MusicPause => {
            self.music.clear_pending_start();
            self.music.pending_stop = false;
            let was_finishing = self.music.finishing;
            let player_alive = self
                .music
                .player
                .as_ref()
                .is_some_and(crate::music::MusicPlayer::is_alive);
            let buffered_music_output = !self.music_pcm_rx.is_empty()
                || has_buffered_music_output(&self.audio_send_state);
            if player_alive || self.music.active || was_finishing {
                info!(
                    "music_pause: player_alive={} active={} was_finishing={} buffered_output={}",
                    player_alive, self.music.active, was_finishing, buffered_music_output
                );
                if player_alive
                    && !self
                        .music
                        .player
                        .as_ref()
                        .is_some_and(crate::music::MusicPlayer::pause)
                {
                    warn!("music_pause: failed to pause music process group");
                }
                self.music.paused = true;
                self.music.active = false;
                self.music.finishing =
                    was_finishing || (!player_alive && buffered_music_output);
                suppress_music_output(&self.audio_send_state);
                send_msg(&OutMsg::PlayerState {
                    status: "paused".into(),
                });
                emit_playback_armed("music_pause", &self.audio_send_state);
            }
            false
        }
        PlaybackCommand::MusicResume => {
            self.music.clear_pending_start();
            self.music.pending_stop = false;
            self.music.finishing = false;

            let player_alive = self
                .music
                .player
                .as_ref()
                .is_some_and(crate::music::MusicPlayer::is_alive);
            let resumed_in_place = player_alive
                && self
                    .music
                    .player
                    .as_ref()
                    .is_some_and(crate::music::MusicPlayer::resume);
            let buffered_music_output = !self.music_pcm_rx.is_empty()
                || has_buffered_music_output(&self.audio_send_state);

            if resumed_in_place {
                info!("music_resume: player alive, resuming from position");
                resume_music_output(&self.audio_send_state);
                self.music.paused = false;
                self.music.active = true;
                send_msg(&OutMsg::PlayerState {
                    status: "playing".into(),
                });
            } else if !player_alive && buffered_music_output {
                info!(
                    "music_resume: player dead, resuming buffered output from current position"
                );
                resume_music_output(&self.audio_send_state);
                self.music.paused = false;
                self.music.active = true;
                self.music.finishing = true;
                send_msg(&OutMsg::PlayerState {
                    status: "playing".into(),
                });
            } else if let Some(url) = self.music.active_url.clone() {
                self.music.stop_player();
                warn!(
                    "music_resume: player dead, restarting pipeline from url={}",
                    url
                );
                self.start_music_pipeline(&url, self.music.active_resolved_direct_url, true);
                self.music.paused = false;
                self.music.active = true;
                send_msg(&OutMsg::PlayerState {
                    status: "playing".into(),
                });
            } else {
                warn!("music_resume: no player and no url, cannot resume");
            }
            false
        }
        PlaybackCommand::MusicSetGain { target, fade_ms } => {
            let clamped = target.clamp(0.0, 1.0);
            let mut guard = self.audio_send_state.lock();
            if let Some(ref mut state) = *guard {
                if let Some(reached) = state.set_music_gain(clamped, fade_ms) {
                    drop(guard);
                    send_msg(&OutMsg::MusicGainReached { gain: reached });
                }
            }
            false
        }
        PlaybackCommand::Destroy => {
            self.music.stop_player();
            self.stream_publish.reset();
            self.clear_voice_connection();
            self.clear_stream_publish_connection();
            true
        }
    }
}

pub(crate) fn handle_music_event(&mut self, event: MusicEvent) {
    match event {
        MusicEvent::Idle => {
            info!(
                "music_event_idle: active={} paused={} finishing={}",
                self.music.active, self.music.paused, self.music.finishing
            );
            self.music.stop_player();
            self.music.paused = false;
            self.music.finishing = self.music.active;
            self.music.pending_stop = false;
            self.music.clear_pending_start();
            if !self.music.finishing {
                self.music.active_url = None;
                self.music.active_resolved_direct_url = false;
                send_msg(&OutMsg::PlayerState {
                    status: "idle".into(),
                });
                send_msg(&OutMsg::MusicIdle);
                emit_playback_armed("music_idle", &self.audio_send_state);
            }
        }
        MusicEvent::Error(message) => {
            self.music.reset();
            drain_music_pcm_queue(&self.music_pcm_rx);
            send_msg(&OutMsg::MusicError { message });
            send_msg(&OutMsg::PlayerState {
                status: "idle".into(),
            });
            emit_playback_armed("music_error", &self.audio_send_state);
        }
        MusicEvent::FirstPcm {
            startup_ms,
            resolved_direct_url,
        } => {
            let now = time::Instant::now();
            self.music.pending_first_pcm_at = Some(now);
            if let Some(received_at) = self.music.pending_received_at {
                tracing::info!(
                    "music_play prepared url={} direct={} startupMs={} requestToFirstPcmMs={}",
                    self.music.pending_url.as_deref().unwrap_or("unknown"),
                    resolved_direct_url,
                    startup_ms,
                    now.duration_since(received_at).as_millis() as u64
                );
            } else {
                tracing::info!(
                    "music_play prepared url={} direct={} startupMs={}",
                    self.music.pending_url.as_deref().unwrap_or("unknown"),
                    resolved_direct_url,
                    startup_ms
                );
            }
        }
    }
}

/// Evaluate the pending-music-start state machine: wait for announcement
/// audio to arrive, then for a gap in TTS audio, then for the TTS buffer to
/// drain, before committing the music start.  Various safety/timeout paths
/// ensure music eventually starts even if the announcement never arrives or
/// the buffer never drains.
fn tick_pending_music_start(&mut self, now: time::Instant) {
    let Some(url) = self.music.pending_url.clone() else {
        return;
    };

    let mut start_music = false;
    let mut reason = "pending_unknown";

    if let Some(received_at) = self.music.pending_received_at {
        let elapsed_ms = now.duration_since(received_at).as_millis() as u64;
        if elapsed_ms > 15_000 {
            start_music = true;
            reason = "pending_safety_timeout";
        } else if !self.music.pending_audio_seen {
            if elapsed_ms > 5_000 {
                start_music = true;
                reason = "pending_no_announcement_audio";
            }
        } else {
            let last_audio_at = self.music.pending_last_audio_at.unwrap_or(received_at);
            let gap_ms = now.duration_since(last_audio_at).as_millis() as u64;
            if !self.music.pending_waiting_for_drain && gap_ms > 500 {
                self.music.pending_waiting_for_drain = true;
                self.music.pending_drain_started_at = Some(now);
            }
            if self.music.pending_waiting_for_drain {
                let audio_buffer_empty = {
                    let guard = self.audio_send_state.lock();
                    guard
                        .as_ref()
                        .is_none_or(crate::audio_pipeline::AudioSendState::tts_is_empty)
                };
                let drain_elapsed_ms = self
                    .music
                    .pending_drain_started_at
                    .map_or(0, |started| now.duration_since(started).as_millis() as u64);
                if audio_buffer_empty {
                    start_music = true;
                    reason = "pending_announcement_drain_complete";
                } else if drain_elapsed_ms > 5_000 {
                    start_music = true;
                    reason = "pending_drain_timeout";
                }
            }
        }
    } else {
        start_music = true;
        reason = "pending_missing_timestamp";
    }

    if start_music {
        let total_wait_ms = self.music.pending_received_at.map_or(0, |received_at| {
            now.duration_since(received_at).as_millis() as u64
        });
        let prepared_lead_ms = self.music.pending_first_pcm_at.map_or(0, |first_pcm_at| {
            now.duration_since(first_pcm_at).as_millis() as u64
        });
        let committed_direct_url = self.music.pending_resolved_direct_url;
        self.music.clear_pending_start();
        self.music.finishing = false;
        self.music.active_url = Some(url.clone());
        self.music.active_resolved_direct_url = committed_direct_url;
        self.music.active = true;
        self.music.paused = false;

        {
            let mut guard = self.audio_send_state.lock();
            if let Some(ref mut state) = *guard {
                state.begin_music_fade_in(1500);
            }
        }

        send_msg(&OutMsg::PlayerState {
            status: "playing".into(),
        });
        tracing::info!(
            "music_play committed url={} reason={} totalWaitMs={} preparedLeadMs={} direct={}",
            url,
            reason,
            total_wait_ms,
            prepared_lead_ms,
            committed_direct_url
        );
    }
}

/// Complete a pending stop once the music gain fade-out finishes: reset
/// music state, drain queues, clear audio buffers, and notify the TS side.
fn tick_pending_stop(&mut self) {
    if !self.music.pending_stop {
        return;
    }
    let fade_done = {
        let guard = self.audio_send_state.lock();
        guard
            .as_ref()
            .is_none_or(crate::audio_pipeline::AudioSendState::is_music_fade_out_complete)
    };
    if fade_done {
        self.music.reset();
        drain_music_pcm_queue(&self.music_pcm_rx);
        clear_audio_send_buffer(&self.audio_send_state);
        if self.tts_playback_buffered {
            self.tts_playback_buffered = false;
            send_tts_playback_state("idle", "music_track_finished");
        }
        send_msg(&OutMsg::PlayerState {
            status: "idle".into(),
        });
        send_msg(&OutMsg::MusicIdle);
        emit_playback_armed("music_stop", &self.audio_send_state);
    }
}

/// Periodically report TTS/music buffer depth to the TS side and
/// synchronise the `tts_playback_buffered` flag with actual buffer state.
fn tick_buffer_depth_report(&mut self) {
    self.buffer_depth_tick_counter += 1;
    if self.buffer_depth_tick_counter < Self::BUFFER_DEPTH_REPORT_INTERVAL {
        return;
    }
    self.buffer_depth_tick_counter = 0;

    let guard = self.audio_send_state.lock();
    if let Some(ref state) = *guard {
        let tts = state.tts_buffer_samples();
        let music = state.music_buffer_samples();
        if tts > 0 || music > 0 {
            self.buffer_depth_was_nonempty = true;
            drop(guard);
            send_buffer_depth(tts, music, "periodic_nonempty");
            if tts > 0 && !self.tts_playback_buffered {
                self.tts_playback_buffered = true;
                send_tts_playback_state("buffered", "periodic_nonempty");
            }
        } else if self.buffer_depth_was_nonempty {
            self.buffer_depth_was_nonempty = false;
            drop(guard);
            send_buffer_depth(0, 0, "periodic_drained");
            if self.tts_playback_buffered {
                self.tts_playback_buffered = false;
                send_tts_playback_state("idle", "periodic_drained");
            }
        }
    } else if self.buffer_depth_was_nonempty {
        self.buffer_depth_was_nonempty = false;
        drop(guard);
        send_buffer_depth(0, 0, "audio_send_state_missing");
        if self.tts_playback_buffered {
            self.tts_playback_buffered = false;
            send_tts_playback_state("idle", "audio_send_state_missing");
        }
    }
}

pub(crate) async fn on_audio_tick(&mut self) {
    let now = time::Instant::now();
    self.on_capture_tick(now);
    self.drain_stream_publish_runtime_events();

    self.tick_pending_music_start(now);

    if self.music.active && !self.music.paused {
        let mut guard = self.audio_send_state.lock();
        if let Some(ref mut state) = *guard {
            while state.can_accept_music_chunk() {
                let Ok(chunk) = self.music_pcm_rx.try_recv() else {
                    break;
                };
                state.push_music_pcm(chunk);
            }
        }
    }

    if self.music.finishing
        && is_music_output_drained(&self.music_pcm_rx, &self.audio_send_state)
    {
        self.music.finishing = false;
        self.music.active = false;
        self.music.paused = false;
        self.music.active_url = None;
        self.music.active_resolved_direct_url = false;
        send_msg(&OutMsg::PlayerState {
            status: "idle".into(),
        });
        send_msg(&OutMsg::MusicIdle);
        emit_playback_armed("music_idle", &self.audio_send_state);
    }

    {
        let mut guard = self.audio_send_state.lock();
        if let Some(ref mut state) = *guard {
            if let Some(reached) = state.maybe_take_music_gain_reached() {
                drop(guard);
                send_msg(&OutMsg::MusicGainReached { gain: reached });
            }
        }
    }

    self.tick_pending_stop();
    self.tick_buffer_depth_report();

    let (opus_frame, tts_just_drained) = {
        let mut guard = self.audio_send_state.lock();
        match *guard {
            Some(ref mut state) => {
                let frame = state.next_opus_frame();
                let drained = frame.is_none() && state.tts_just_drained();
                (frame, drained)
            }
            None => (None, false),
        }
    };

    // Emit an immediate drain notification so the TS side learns the TTS
    // buffer is empty without waiting up to 500ms for the periodic report.
    // Only send tts_playback_state — do not emit a synthetic buffer_depth
    // event here because it would need to lie about musicSamples (which
    // may still be non-zero if music is playing).  The periodic report
    // will send accurate combined depths on its normal cadence.
    if tts_just_drained && self.tts_playback_buffered {
        self.tts_playback_buffered = false;
        send_tts_playback_state("idle", "tts_drained");
    }

    if let Some(opus) = opus_frame {
        let encrypted = {
            let mut guard = self.dave.lock();
            match *guard {
                Some(ref mut dave_manager) if dave_manager.is_ready() => {
                    dave_manager.encrypt_opus(&opus).unwrap_or_else(|error| {
                        tracing::debug!("DAVE encrypt fallback: {}", error);
                        opus.clone()
                    })
                }
                _ => opus,
            }
        };

        if let Some(ref conn) = self.voice_conn {
            if let Err(error) = conn.send_rtp_frame(&encrypted).await {
                tracing::debug!("RTP send error: {}", error);
            }
        }
    }

    self.send_pending_stream_publish_frame().await;
}

}

#[cfg(test)] mod tests { use std::sync::Arc;

use crossbeam_channel as crossbeam;
use parking_lot::Mutex;
use tokio::sync::mpsc;

use super::AppState;
use crate::audio_pipeline::{AUDIO_FRAME_SAMPLES, AudioSendState, MAX_MUSIC_BUFFER_SAMPLES};
use crate::ipc_protocol::PlaybackCommand;
use crate::stream_publish::{StreamPublishEvent, StreamPublishFrame};

fn make_app_state_with_music_queue_capacity(queue_capacity: usize) -> AppState {
    let (_voice_event_tx, voice_event_rx) = mpsc::channel(4);
    drop(voice_event_rx);
    let (music_event_tx, music_event_rx) = mpsc::channel(4);
    drop(music_event_rx);

    let audio_send_state = Arc::new(Mutex::new(Some(
        AudioSendState::new().expect("audio state"),
    )));
    let (music_pcm_tx, music_pcm_rx) = crossbeam::bounded::<Vec<i16>>(queue_capacity);
    let (stream_publish_frame_tx, stream_publish_frame_rx) =
        crossbeam::bounded::<StreamPublishFrame>(4);
    let (stream_publish_event_tx, stream_publish_event_rx) =
        crossbeam::bounded::<StreamPublishEvent>(4);

    AppState::new(
        Arc::new(Mutex::new(None)),
        _voice_event_tx,
        audio_send_state,
        music_pcm_tx,
        music_pcm_rx,
        music_event_tx,
        stream_publish_frame_tx,
        stream_publish_frame_rx,
        stream_publish_event_tx,
        stream_publish_event_rx,
    )
}

fn make_app_state() -> AppState {
    make_app_state_with_music_queue_capacity(4)
}

#[test]
fn wake_word_pause_preserves_buffered_music_tail_when_player_is_dead() {
    let mut state = make_app_state();
    {
        let mut guard = state.audio_send_state.lock();
        let audio_state = guard.as_mut().expect("audio state");
        audio_state.push_music_pcm(vec![123; 960]);
    }
    state.music.active = true;
    state.music.finishing = true;
    state.music.active_url = Some("https://cdn.example.com/track.mp4".to_string());
    state.music.active_resolved_direct_url = true;

    assert!(!state.handle_playback_command(PlaybackCommand::MusicPause));
    assert!(state.music.paused);
    assert!(!state.music.active);
    assert!(state.music.finishing);
    {
        let guard = state.audio_send_state.lock();
        let audio_state = guard.as_ref().expect("audio state");
        assert_eq!(audio_state.music_buffer_samples(), 960);
        assert!(audio_state.is_music_output_suppressed());
    }

    assert!(!state.handle_playback_command(PlaybackCommand::MusicResume));
    assert!(
        state.music.player.is_none(),
        "buffered resume should not restart the pipeline"
    );
    assert!(!state.music.paused);
    assert!(state.music.active);
    assert!(state.music.finishing);
    {
        let mut guard = state.audio_send_state.lock();
        let audio_state = guard.as_mut().expect("audio state");
        assert!(!audio_state.is_music_output_suppressed());
        let frame = audio_state
            .next_opus_frame()
            .expect("buffered music should resume from preserved PCM");
        assert!(!frame.is_empty());
        assert_eq!(audio_state.music_buffer_samples(), 0);
    }
}

#[tokio::test]
async fn on_audio_tick_caps_music_prefetch_to_live_window() {
    let mut state = make_app_state_with_music_queue_capacity(128);
    state.music.active = true;

    for _ in 0..128 {
        state
            .music_pcm_tx
            .send(vec![321; AUDIO_FRAME_SAMPLES])
            .expect("queue music chunk");
    }

    state.on_audio_tick().await;

    {
        let guard = state.audio_send_state.lock();
        let audio_state = guard.as_ref().expect("audio state");
        assert_eq!(
            audio_state.music_buffer_samples(),
            MAX_MUSIC_BUFFER_SAMPLES - AUDIO_FRAME_SAMPLES
        );
    }
    assert!(
        state.music_pcm_rx.len() > 0,
        "backpressure should leave upstream music PCM queued instead of draining the full track"
    );
}

}