src/music.rs

use std::collections::VecDeque; use std::io::{self, BufRead}; use std::process::Stdio; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};

use crossbeam_channel as crossbeam; use parking_lot::Mutex; use tokio::sync::mpsc; use tokio::time; use tracing::{info, warn};

use crate::audio_pipeline::{AudioSendState, clear_audio_send_buffer};

const MUSIC_PIPELINE_STDERR_TAIL_LINES: usize = 24;

#[derive(Debug)] pub(crate) enum MusicEvent { Idle, Error(String), FirstPcm { startup_ms: u64, resolved_direct_url: bool, }, }

pub(crate) fn drain_music_pcm_queue(music_pcm_rx: &crossbeam::Receiver<Vec>) { while music_pcm_rx.try_recv().is_ok() {} }

pub(crate) fn is_music_output_drained( music_pcm_rx: &crossbeam::Receiver<Vec>, audio_send_state: &Arc<Mutex<Option>>, ) -> bool { if !music_pcm_rx.is_empty() { return false; } let guard = audio_send_state.lock(); guard .as_ref() .is_none_or(|state| state.music_buffer_samples() == 0) }

#[derive(Clone, Copy)] pub(crate) struct MusicPipelineRequest<'a> { pub(crate) url: &'a str, pub(crate) resolved_direct_url: bool, pub(crate) clear_output_buffers: bool, }

pub(crate) struct MusicPipelineContext<'a> { pub(crate) music_player: &'a mut Option, pub(crate) music_pcm_rx: &'a crossbeam::Receiver<Vec>, pub(crate) music_pcm_tx: &'a crossbeam::Sender<Vec>, pub(crate) music_event_tx: &'a mpsc::Sender, pub(crate) audio_send_state: &'a Arc<Mutex<Option>>, }

pub(crate) fn start_music_pipeline( request: MusicPipelineRequest<'>, context: MusicPipelineContext<'>, ) { let MusicPipelineRequest { url, resolved_direct_url, clear_output_buffers, } = request; let MusicPipelineContext { music_player, music_pcm_rx, music_pcm_tx, music_event_tx, audio_send_state, } = context;

if let Some(player) = music_player {
    player.stop();
}
*music_player = None;
drain_music_pcm_queue(music_pcm_rx);
if clear_output_buffers {
    clear_audio_send_buffer(audio_send_state);
}
*music_player = Some(MusicPlayer::start(
    url,
    music_pcm_tx.clone(),
    music_event_tx.clone(),
    resolved_direct_url,
));

}

pub(crate) struct MusicPlayer { stop: Arc, paused: Arc, child_pid: Arc, thread: Option<std::thread::JoinHandle<()>>, }

use crate::process_compat::{self, ProcessSignal};

impl MusicPlayer { #[allow(clippy::too_many_lines)] fn start( url: &str, pcm_tx: crossbeam::Sender<Vec>, music_event_tx: mpsc::Sender, resolved_direct_url: bool, ) -> Self { let stop = Arc::new(AtomicBool::new(false)); let stop_clone = stop.clone(); let paused = Arc::new(AtomicBool::new(false)); let paused_thread = paused.clone(); let child_pid = Arc::new(AtomicU32::new(0)); let child_pid_thread = child_pid.clone(); let url = url.to_string();

    let thread = std::thread::spawn(move || {
        let pipeline_command = build_music_pipeline_command(&url, resolved_direct_url);
        let pipeline_started_at = time::Instant::now();
        let child = {
            let mut cmd = process_compat::shell_command(&pipeline_command);
            cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
            cmd.spawn()
        };

        let mut child = match child {
            Ok(c) => c,
            Err(e) => {
                let _ = music_event_tx.blocking_send(MusicEvent::Error(format!(
                    "yt-dlp/ffmpeg spawn failed: {e}"
                )));
                return;
            }
        };
        child_pid_thread.store(child.id(), Ordering::SeqCst);

        let stderr_tail = Arc::new(Mutex::new(VecDeque::<String>::new()));
        let mut stderr_thread = child.stderr.take().map(|stderr| {
            let stderr_tail = stderr_tail.clone();
            std::thread::spawn(move || {
                let reader = io::BufReader::new(stderr);
                for line_result in reader.lines() {
                    let line = match line_result {
                        Ok(value) => value.trim().to_string(),
                        Err(_) => break,
                    };
                    if line.is_empty() {
                        continue;
                    }
                    let mut tail = stderr_tail.lock();
                    if tail.len() >= MUSIC_PIPELINE_STDERR_TAIL_LINES {
                        tail.pop_front();
                    }
                    tail.push_back(line);
                }
            })
        });

        let Some(stdout) = child.stdout.take() else {
            let _ = music_event_tx.blocking_send(MusicEvent::Error(
                "music pipeline missing stdout".to_string(),
            ));
        process_compat::terminate_child(&mut child, "music");
            let _ = child.wait();
            if let Some(handle) = stderr_thread.take() {
                let _ = handle.join();
            }
            child_pid_thread.store(0, Ordering::SeqCst);
            return;
        };

        let mut reader = io::BufReader::with_capacity(48000 * 2, stdout);
        let mut chunk = vec![0u8; 960 * 2];
        let mut first_pcm_reported = false;

        loop {
            if stop_clone.load(Ordering::Relaxed) {
                break;
            }
            match io::Read::read_exact(&mut reader, &mut chunk) {
                Ok(()) => {
                    if !first_pcm_reported {
                        first_pcm_reported = true;
                        let startup_ms = pipeline_started_at.elapsed().as_millis() as u64;
                        info!(
                            "music pipeline first pcm startup_ms={} direct={}",
                            startup_ms, resolved_direct_url
                        );
                        let _ = music_event_tx.blocking_send(MusicEvent::FirstPcm {
                            startup_ms,
                            resolved_direct_url,
                        });
                    }
                    let mut samples = Vec::with_capacity(960);
                    for i in 0..960 {
                        samples.push(i16::from_le_bytes([chunk[i * 2], chunk[i * 2 + 1]]));
                    }
                    if pcm_tx.send(samples).is_err() {
                        break;
                    }
                }
                Err(_) => break,
            }
        }

        process_compat::terminate_child(&mut child, "music");
        let wait_result = child.wait();
        if let Some(handle) = stderr_thread.take() {
            let _ = handle.join();
        }
        child_pid_thread.store(0, Ordering::SeqCst);
        paused_thread.store(false, Ordering::SeqCst);

        let stderr_summary = {
            let tail = stderr_tail.lock();
            if tail.is_empty() {
                String::new()
            } else {
                format!(
                    " | stderr tail: {}",
                    tail.iter().cloned().collect::<Vec<_>>().join(" || ")
                )
            }
        };

        if !stop_clone.load(Ordering::Relaxed) {
            match wait_result {
                Ok(status) if status.success() => {
                    let _ = music_event_tx.blocking_send(MusicEvent::Idle);
                }
                Ok(status) => {
                    let _ = music_event_tx.blocking_send(MusicEvent::Error(format!(
                        "music pipeline exited with status {status}{stderr_summary}"
                    )));
                }
                Err(error) => {
                    let _ = music_event_tx.blocking_send(MusicEvent::Error(format!(
                        "music pipeline wait failed: {error}{stderr_summary}"
                    )));
                }
            }
        }
    });

    MusicPlayer {
        stop,
        paused,
        child_pid,
        thread: Some(thread),
    }
}

pub(crate) fn is_alive(&self) -> bool {
    self.child_pid.load(Ordering::SeqCst) != 0
}

pub(crate) fn pause(&self) -> bool {
    if self.paused.load(Ordering::SeqCst) {
        return self.is_alive();
    }
    let pid = self.child_pid.load(Ordering::SeqCst);
    if pid == 0 {
        return false;
    }
    match process_compat::signal_process_group(pid, ProcessSignal::Suspend) {
        Ok(()) => {
            self.paused.store(true, Ordering::SeqCst);
            true
        }
        Err(error) => {
            if error.kind() != io::ErrorKind::NotFound {
                warn!(pid, error = %error, "failed to pause music process group");
            }
            false
        }
    }
}

pub(crate) fn resume(&self) -> bool {
    if !self.paused.load(Ordering::SeqCst) {
        return self.is_alive();
    }
    let pid = self.child_pid.load(Ordering::SeqCst);
    if pid == 0 {
        self.paused.store(false, Ordering::SeqCst);
        return false;
    }
    match process_compat::signal_process_group(pid, ProcessSignal::Resume) {
        Ok(()) => {
            self.paused.store(false, Ordering::SeqCst);
            true
        }
        Err(error) => {
            if error.kind() != io::ErrorKind::NotFound {
                warn!(pid, error = %error, "failed to resume music process group");
            }
            false
        }
    }
}

pub(crate) fn stop(&mut self) {
    self.stop.store(true, Ordering::SeqCst);
    let was_paused = self.paused.swap(false, Ordering::SeqCst);
    if let Some(thread) = self.thread.take() {
        if !thread.is_finished() {
            let pid = self.child_pid.load(Ordering::SeqCst);
            // A suspended process won't handle termination until resumed.
            if was_paused {
                let _ = process_compat::signal_process_group(pid, ProcessSignal::Resume);
            }
            if let Err(error) = process_compat::signal_process_group(pid, ProcessSignal::Terminate) {
                if error.kind() != io::ErrorKind::NotFound {
                    warn!(pid, error = %error, "failed to stop music process group");
                }
            }
        }
        if thread.is_finished() {
            let _ = thread.join();
        } else {
            std::thread::spawn(move || {
                let _ = thread.join();
            });
        }
    }
}

}

impl Drop for MusicPlayer { fn drop(&mut self) { self.stop(); } }

#[derive(Default)] #[allow(clippy::struct_excessive_bools)] // Music state machine flags are inherently boolean. pub(crate) struct MusicState { pub(crate) player: Option, pub(crate) active: bool, pub(crate) paused: bool, pub(crate) finishing: bool, pub(crate) active_url: Option, pub(crate) active_resolved_direct_url: bool, pub(crate) pending_url: Option, pub(crate) pending_received_at: Optiontime::Instant, pub(crate) pending_audio_seen: bool, pub(crate) pending_last_audio_at: Optiontime::Instant, pub(crate) pending_waiting_for_drain: bool, pub(crate) pending_drain_started_at: Optiontime::Instant, pub(crate) pending_first_pcm_at: Optiontime::Instant, pub(crate) pending_resolved_direct_url: bool, pub(crate) pending_stop: bool, }

impl MusicState { pub(crate) fn stop_player(&mut self) { if let Some(ref mut player) = self.player { player.stop(); } self.player = None; }

pub(crate) fn clear_pending_start(&mut self) {
    self.pending_url = None;
    self.pending_received_at = None;
    self.pending_audio_seen = false;
    self.pending_last_audio_at = None;
    self.pending_waiting_for_drain = false;
    self.pending_drain_started_at = None;
    self.pending_first_pcm_at = None;
    self.pending_resolved_direct_url = false;
}

pub(crate) fn reset(&mut self) {
    self.stop_player();
    self.active = false;
    self.paused = false;
    self.finishing = false;
    self.active_url = None;
    self.active_resolved_direct_url = false;
    self.pending_stop = false;
    self.clear_pending_start();
}

pub(crate) fn queue_pending_start(&mut self, url: String, resolved_direct_url: bool) {
    self.stop_player();
    self.active = false;
    self.paused = false;
    self.finishing = false;
    self.pending_stop = false;
    self.active_url = Some(url.clone());
    self.active_resolved_direct_url = resolved_direct_url;
    self.pending_url = Some(url);
    self.pending_received_at = Some(time::Instant::now());
    self.pending_audio_seen = false;
    self.pending_last_audio_at = None;
    self.pending_waiting_for_drain = false;
    self.pending_drain_started_at = None;
    self.pending_first_pcm_at = None;
    self.pending_resolved_direct_url = resolved_direct_url;
}

}

pub(crate) fn build_music_pipeline_command(url: &str, resolved_direct_url: bool) -> String { let quoted_url = process_compat::shell_quote(url); if resolved_direct_url { format!("ffmpeg -nostdin -loglevel error -i {quoted_url} -f s16le -ar 48000 -ac 1 pipe:1") } else { let yt_arg = process_compat::shell_quote_arg("youtube:player_client=android"); format!( "yt-dlp --no-warnings --quiet --no-playlist --extractor-args {yt_arg} -f bestaudio/best -o - {quoted_url} | ffmpeg -nostdin -loglevel error -i pipe:0 -f s16le -ar 48000 -ac 1 pipe:1" ) } }

#[cfg(test)] mod tests { use std::sync::Arc;

use crossbeam_channel as crossbeam;
use parking_lot::Mutex;

use super::{build_music_pipeline_command, is_music_output_drained};
use crate::audio_pipeline::AudioSendState;

#[test]
fn music_output_not_drained_while_pcm_queue_has_chunks() {
    let (music_pcm_tx, music_pcm_rx) = crossbeam::bounded::<Vec<i16>>(4);
    let audio_send_state = Arc::new(Mutex::new(Some(
        AudioSendState::new().expect("audio state"),
    )));

    music_pcm_tx.send(vec![0; 960]).expect("queue chunk");

    assert!(!is_music_output_drained(&music_pcm_rx, &audio_send_state));
}

#[test]
fn music_output_not_drained_while_mixer_buffer_has_music() {
    let (_music_pcm_tx, music_pcm_rx) = crossbeam::bounded::<Vec<i16>>(4);
    let audio_send_state = Arc::new(Mutex::new(Some(
        AudioSendState::new().expect("audio state"),
    )));
    {
        let mut guard = audio_send_state.lock();
        let state = guard.as_mut().expect("state");
        state.push_music_pcm(vec![0; 960]);
    }

    assert!(!is_music_output_drained(&music_pcm_rx, &audio_send_state));
}

#[test]
fn music_output_drained_when_queue_and_mixer_are_empty() {
    let (_music_pcm_tx, music_pcm_rx) = crossbeam::bounded::<Vec<i16>>(4);
    let audio_send_state = Arc::new(Mutex::new(Some(
        AudioSendState::new().expect("audio state"),
    )));

    assert!(is_music_output_drained(&music_pcm_rx, &audio_send_state));
}

#[test]
fn direct_music_pipeline_command_skips_ytdlp() {
    let command = build_music_pipeline_command("https://cdn.example.com/audio.m4a", true);
    assert!(command.starts_with("ffmpeg "));
    assert!(!command.contains("yt-dlp"));
}

#[test]
fn unresolved_music_pipeline_command_uses_ytdlp() {
    let command = build_music_pipeline_command("https://www.youtube.com/watch?v=abc123", false);
    assert!(command.contains("yt-dlp"));
    assert!(command.contains("| ffmpeg "));
}

}