src/stream_publish.rs

use std::collections::VecDeque; use std::io::{self, BufRead, Read, Write}; use std::process::Stdio; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};

use crossbeam_channel as crossbeam; use tracing::{info, warn};

use crate::app_state::AppState; use crate::ipc_protocol::StreamPublishCommand; use crate::voice_conn::TransportRole;

const STREAM_PUBLISH_STDERR_TAIL_LINES: usize = 24; pub(crate) const STREAM_PUBLISH_TARGET_FPS: u32 = 30; const STREAM_PUBLISH_TARGET_WIDTH: u32 = 1280; const STREAM_PUBLISH_TARGET_HEIGHT: u32 = 720; const STREAM_PUBLISH_VIDEO_BITRATE_KBPS: u32 = 2_500; const STREAM_PUBLISH_BROWSER_FRAME_MAX_BYTES: usize = 6 * 1024 * 1024;

#[derive(Debug, Clone)] pub(crate) struct StreamPublishFrame { pub(crate) access_unit: Vec, pub(crate) timestamp_increment: u32, }

#[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum StreamPublishSource { Url { url: String, resolved_direct_url: bool, }, Visualizer { url: String, resolved_direct_url: bool, visualizer_mode: String, }, BrowserFrames { mime_type: String, }, }

#[derive(Debug, Clone)] pub(crate) enum StreamPublishEvent { Idle, Error(String), FirstFrame { startup_ms: u64, fps: u32 }, }

#[derive(Default)] pub(crate) struct StreamPublishState { pub(crate) player: Option, pub(crate) pending_source: Option, pub(crate) active_source: Option, pub(crate) active: bool, pub(crate) paused: bool, }

impl StreamPublishState { pub(crate) fn queue_pending_start(&mut self, source: StreamPublishSource) { self.pending_source = Some(source); }

pub(crate) fn clear_pending_start(&mut self) {
    self.pending_source = None;
}

pub(crate) fn stop_player(&mut self) {
    if let Some(player) = self.player.take() {
        player.stop();
    }
}

pub(crate) fn reset(&mut self) {
    self.stop_player();
    self.pending_source = None;
    self.active_source = None;
    self.active = false;
    self.paused = false;
}

}

enum StreamPublishPlayerMode { Url, BrowserFrames { mime_type: String, stdin: Arc<parking_lot::Mutex<Optionstd::process::ChildStdin>>, timestamp_increments: Arc<parking_lot::Mutex<VecDeque>>, last_captured_at_ms: Arc, }, }

pub(crate) struct StreamPublishPlayer { stop: Arc, paused: Arc, child_pid: Arc, thread: Option<std::thread::JoinHandle<()>>, mode: StreamPublishPlayerMode, }

use crate::process_compat::{self, ProcessSignal};

use crate::h264::find_next_start_code;

fn find_next_aud_start(data: &[u8], from: usize) -> Option { let mut search_from = from; while let Some((index, start_code_len)) = find_next_start_code(data, search_from) { let nal_start = index + start_code_len; if data.get(nal_start).is_some_and(|byte| (byte & 0x1f) == 9) { return Some(index); } search_from = nal_start; } None }

fn drain_h264_access_units(buffer: &mut Vec, flush_tail: bool) -> Vec<Vec> { let Some(first_aud) = find_next_aud_start(buffer, 0) else { return Vec::new(); }; if first_aud > 0 { buffer.drain(..first_aud); }

let mut out = Vec::new();
while let Some(next_aud) = find_next_aud_start(buffer, 4) {
    if next_aud == 0 {
        break;
    }
    let access_unit = buffer.drain(..next_aud).collect::<Vec<_>>();
    if !access_unit.is_empty() {
        out.push(access_unit);
    }
}

if flush_tail && !buffer.is_empty() {
    out.push(std::mem::take(buffer));
}

out

}

pub(crate) fn build_stream_publish_pipeline_command( url: &str, resolved_direct_url: bool, ) -> String { let quoted_url = process_compat::shell_quote(url); let ffmpeg_tail = format!( "ffmpeg -nostdin -loglevel error -re -i {{input}} -an -sn -dn -vf "scale=w={STREAM_PUBLISH_TARGET_WIDTH}:h={STREAM_PUBLISH_TARGET_HEIGHT}:force_original_aspect_ratio=decrease:flags=lanczos,pad={STREAM_PUBLISH_TARGET_WIDTH}:{STREAM_PUBLISH_TARGET_HEIGHT}:(ow-iw)/2:(oh-ih)/2:black,fps={STREAM_PUBLISH_TARGET_FPS}" -c:v libx264 -preset veryfast -tune zerolatency -pix_fmt yuv420p -profile:v baseline -level 3.1 -g {STREAM_PUBLISH_TARGET_FPS} -keyint_min {STREAM_PUBLISH_TARGET_FPS} -sc_threshold 0 -b:v {STREAM_PUBLISH_VIDEO_BITRATE_KBPS}k -maxrate {STREAM_PUBLISH_VIDEO_BITRATE_KBPS}k -bufsize {}k -f h264 -bsf:v h264_metadata=aud=insert pipe:1", STREAM_PUBLISH_VIDEO_BITRATE_KBPS * 2 );

if resolved_direct_url {
    ffmpeg_tail.replace("{input}", &quoted_url)
} else {
    let yt_arg = process_compat::shell_quote_arg("youtube:player_client=android");
    format!(
        "yt-dlp --no-warnings --quiet --no-playlist --extractor-args {yt_arg} -f \"bestvideo[ext=mp4][vcodec*=avc1]/bestvideo[vcodec*=avc1]/bestvideo/best\" -o - {quoted_url} | {}",
        ffmpeg_tail.replace("{input}", "pipe:0")
    )
}

}

pub(crate) fn build_stream_publish_visualizer_pipeline_command( url: &str, resolved_direct_url: bool, visualizer_mode: &str, ) -> String { let quoted_url = process_compat::shell_quote(url); let visualizer_filter = match visualizer_mode { "spectrum" => format!( "showspectrum=s={STREAM_PUBLISH_TARGET_WIDTH}x{STREAM_PUBLISH_TARGET_HEIGHT}:mode=combined:slide=scroll:color=intensity" ), "waves" => format!( "showwaves=s={STREAM_PUBLISH_TARGET_WIDTH}x{STREAM_PUBLISH_TARGET_HEIGHT}:mode=cline:rate={STREAM_PUBLISH_TARGET_FPS}:colors=0x00FFAA|0x00AAFF" ), "vectorscope" => format!( "avectorscope=s={STREAM_PUBLISH_TARGET_WIDTH}x{STREAM_PUBLISH_TARGET_HEIGHT}:mode=lissajous:rate={STREAM_PUBLISH_TARGET_FPS}:draw=line" ), // "cqt" and everything else _ => format!( "showcqt=s={STREAM_PUBLISH_TARGET_WIDTH}x{STREAM_PUBLISH_TARGET_HEIGHT}:fps={STREAM_PUBLISH_TARGET_FPS}:count=6:bar_g=6" ), }; let ffmpeg_tail = format!( "ffmpeg -nostdin -loglevel error -re -i {{input}} -vn -filter_complex "{visualizer_filter},format=yuv420p" -c:v libx264 -preset veryfast -tune zerolatency -pix_fmt yuv420p -profile:v baseline -level 3.1 -g {STREAM_PUBLISH_TARGET_FPS} -keyint_min {STREAM_PUBLISH_TARGET_FPS} -sc_threshold 0 -b:v {STREAM_PUBLISH_VIDEO_BITRATE_KBPS}k -maxrate {STREAM_PUBLISH_VIDEO_BITRATE_KBPS}k -bufsize {}k -f h264 -bsf:v h264_metadata=aud=insert pipe:1", STREAM_PUBLISH_VIDEO_BITRATE_KBPS * 2 );

if resolved_direct_url {
    ffmpeg_tail.replace("{input}", &quoted_url)
} else {
    let yt_arg = process_compat::shell_quote_arg("youtube:player_client=android");
    let audio_arg = process_compat::shell_quote_arg("bestaudio/best");
    format!(
        "yt-dlp --no-warnings --quiet --no-playlist --extractor-args {yt_arg} -f {audio_arg} -o - {quoted_url} | {}",
        ffmpeg_tail.replace("{input}", "pipe:0")
    )
}

}

pub(crate) fn build_stream_publish_browser_pipeline_command(mime_type: &str) -> String { let codec = match normalize_browser_frame_mime_type(mime_type) { Some("image/png") => "png", _ => "png", }; format!( "ffmpeg -nostdin -loglevel error -f image2pipe -codec:v {codec} -framerate {STREAM_PUBLISH_TARGET_FPS} -i pipe:0 -an -sn -dn -vf "scale=w={STREAM_PUBLISH_TARGET_WIDTH}:h={STREAM_PUBLISH_TARGET_HEIGHT}:force_original_aspect_ratio=decrease:flags=lanczos,pad={STREAM_PUBLISH_TARGET_WIDTH}:{STREAM_PUBLISH_TARGET_HEIGHT}:(ow-iw)/2:(oh-ih)/2:black" -c:v libx264 -preset veryfast -tune zerolatency -pix_fmt yuv420p -profile:v baseline -level 3.1 -g {STREAM_PUBLISH_TARGET_FPS} -keyint_min {STREAM_PUBLISH_TARGET_FPS} -sc_threshold 0 -b:v {STREAM_PUBLISH_VIDEO_BITRATE_KBPS}k -maxrate {STREAM_PUBLISH_VIDEO_BITRATE_KBPS}k -bufsize {}k -f h264 -bsf:v h264_metadata=aud=insert pipe:1", STREAM_PUBLISH_VIDEO_BITRATE_KBPS * 2 ) }

fn normalize_browser_frame_mime_type(mime_type: &str) -> Option<&'static str> { match mime_type.trim().to_ascii_lowercase().as_str() { "image/png" => Some("image/png"), _ => None, } }

fn decode_stream_publish_browser_frame(frame_base64: &str) -> Result<Vec, String> { let normalized = frame_base64.trim(); if normalized.is_empty() { return Err("stream_publish_browser_frame_missing_bytes".to_string()); } let decoded = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, normalized) .map_err(|error| format!("stream_publish_browser_frame_invalid_base64: {error}"))?; if decoded.is_empty() { return Err("stream_publish_browser_frame_empty".to_string()); } if decoded.len() > STREAM_PUBLISH_BROWSER_FRAME_MAX_BYTES { return Err(format!( "stream_publish_browser_frame_too_large:{}", decoded.len() )); } Ok(decoded) }

fn compute_browser_frame_timestamp_increment( last_captured_at_ms: &AtomicU64, captured_at_ms: u64, ) -> u32 { let default_increment = 90_000 / STREAM_PUBLISH_TARGET_FPS; if captured_at_ms == 0 { return default_increment; } let previous = last_captured_at_ms.swap(captured_at_ms, Ordering::SeqCst); if previous == 0 || captured_at_ms <= previous { return default_increment; } let delta_ms = captured_at_ms.saturating_sub(previous).clamp(1, 5_000); let increment = ((delta_ms as u128) * 90_000u128) / 1_000u128; increment .clamp(1, u128::from(u32::MAX)) .try_into() .unwrap_or(default_increment) }

impl StreamPublishPlayer { pub(crate) fn start_url( url: &str, frame_tx: crossbeam::Sender, event_tx: crossbeam::Sender, resolved_direct_url: bool, ) -> Self { let stop = Arc::new(AtomicBool::new(false)); let stop_clone = stop.clone(); let paused = Arc::new(AtomicBool::new(false)); let paused_thread = paused.clone(); let child_pid = Arc::new(AtomicU32::new(0)); let child_pid_thread = child_pid.clone(); let url = url.to_string();

    let thread = std::thread::spawn(move || {
        let pipeline_command = build_stream_publish_pipeline_command(&url, resolved_direct_url);
        let pipeline_started_at = tokio::time::Instant::now();
        let child = {
            let mut cmd = process_compat::shell_command(&pipeline_command);
            cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
            cmd.spawn()
        };

        let mut child = match child {
            Ok(child) => child,
            Err(error) => {
                let _ = event_tx.send(StreamPublishEvent::Error(format!(
                    "stream publish yt-dlp/ffmpeg spawn failed: {error}"
                )));
                return;
            }
        };
        child_pid_thread.store(child.id(), Ordering::SeqCst);

        let stderr_tail = Arc::new(parking_lot::Mutex::new(VecDeque::<String>::new()));
        let mut stderr_thread = child.stderr.take().map(|stderr| {
            let stderr_tail = stderr_tail.clone();
            std::thread::spawn(move || {
                let reader = io::BufReader::new(stderr);
                for line_result in reader.lines() {
                    let line = match line_result {
                        Ok(value) => value.trim().to_string(),
                        Err(_) => break,
                    };
                    if line.is_empty() {
                        continue;
                    }
                    let mut tail = stderr_tail.lock();
                    if tail.len() >= STREAM_PUBLISH_STDERR_TAIL_LINES {
                        tail.pop_front();
                    }
                    tail.push_back(line);
                }
            })
        });

        let Some(mut stdout) = child.stdout.take() else {
            let _ = event_tx.send(StreamPublishEvent::Error(
                "stream publish pipeline missing stdout".to_string(),
            ));
            process_compat::terminate_child(&mut child, "stream_publish");
            let _ = child.wait();
            if let Some(handle) = stderr_thread.take() {
                let _ = handle.join();
            }
            child_pid_thread.store(0, Ordering::SeqCst);
            return;
        };

        let mut first_frame_reported = false;
        let mut read_buffer = [0u8; 16 * 1024];
        let mut h264_buffer = Vec::<u8>::with_capacity(256 * 1024);

        loop {
            if stop_clone.load(Ordering::Relaxed) {
                break;
            }

            match stdout.read(&mut read_buffer) {
                Ok(0) => break,
                Ok(bytes_read) => {
                    h264_buffer.extend_from_slice(&read_buffer[..bytes_read]);
                    for access_unit in drain_h264_access_units(&mut h264_buffer, false) {
                        if !first_frame_reported {
                            first_frame_reported = true;
                            let startup_ms = pipeline_started_at.elapsed().as_millis() as u64;
                            info!(
                                startup_ms,
                                fps = STREAM_PUBLISH_TARGET_FPS,
                                resolved_direct_url,
                                "stream publish produced first video frame"
                            );
                            let _ = event_tx.send(StreamPublishEvent::FirstFrame {
                                startup_ms,
                                fps: STREAM_PUBLISH_TARGET_FPS,
                            });
                        }
                        if frame_tx
                            .send(StreamPublishFrame {
                                access_unit,
                                timestamp_increment: 90_000 / STREAM_PUBLISH_TARGET_FPS,
                            })
                            .is_err()
                        {
                            break;
                        }
                    }
                }
                Err(error) => {
                    let _ = event_tx.send(StreamPublishEvent::Error(format!(
                        "stream publish stdout read failed: {error}"
                    )));
                    break;
                }
            }
        }

        if !stop_clone.load(Ordering::Relaxed) {
            for access_unit in drain_h264_access_units(&mut h264_buffer, true) {
                let _ = frame_tx.send(StreamPublishFrame {
                    access_unit,
                    timestamp_increment: 90_000 / STREAM_PUBLISH_TARGET_FPS,
                });
            }
        }

        process_compat::terminate_child(&mut child, "stream_publish");
        let wait_result = child.wait();
        if let Some(handle) = stderr_thread.take() {
            let _ = handle.join();
        }
        child_pid_thread.store(0, Ordering::SeqCst);
        paused_thread.store(false, Ordering::SeqCst);

        let stderr_summary = {
            let tail = stderr_tail.lock();
            if tail.is_empty() {
                String::new()
            } else {
                format!(
                    " | stderr tail: {}",
                    tail.iter().cloned().collect::<Vec<_>>().join(" || ")
                )
            }
        };

        if !stop_clone.load(Ordering::Relaxed) {
            match wait_result {
                Ok(status) if status.success() => {
                    let _ = event_tx.send(StreamPublishEvent::Idle);
                }
                Ok(status) => {
                    let _ = event_tx.send(StreamPublishEvent::Error(format!(
                        "stream publish pipeline exited with status {status}{stderr_summary}"
                    )));
                }
                Err(error) => {
                    let _ = event_tx.send(StreamPublishEvent::Error(format!(
                        "stream publish pipeline wait failed: {error}{stderr_summary}"
                    )));
                }
            }
        }
    });

    Self {
        stop,
        paused,
        child_pid,
        thread: Some(thread),
        mode: StreamPublishPlayerMode::Url,
    }
}

pub(crate) fn start_visualizer(
    url: &str,
    frame_tx: crossbeam::Sender<StreamPublishFrame>,
    event_tx: crossbeam::Sender<StreamPublishEvent>,
    resolved_direct_url: bool,
    visualizer_mode: &str,
) -> Self {
    let stop = Arc::new(AtomicBool::new(false));
    let stop_clone = stop.clone();
    let paused = Arc::new(AtomicBool::new(false));
    let paused_thread = paused.clone();
    let child_pid = Arc::new(AtomicU32::new(0));
    let child_pid_thread = child_pid.clone();
    let url = url.to_string();
    let visualizer_mode = visualizer_mode.to_string();

    let thread = std::thread::spawn(move || {
        let pipeline_command = build_stream_publish_visualizer_pipeline_command(
            &url,
            resolved_direct_url,
            &visualizer_mode,
        );
        let pipeline_started_at = tokio::time::Instant::now();
        let child = {
            let mut cmd = process_compat::shell_command(&pipeline_command);
            cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
            cmd.spawn()
        };

        let mut child = match child {
            Ok(child) => child,
            Err(error) => {
                let _ = event_tx.send(StreamPublishEvent::Error(format!(
                    "stream publish visualizer spawn failed: {error}"
                )));
                return;
            }
        };
        child_pid_thread.store(child.id(), Ordering::SeqCst);

        let stderr_tail = Arc::new(parking_lot::Mutex::new(VecDeque::<String>::new()));
        let mut stderr_thread = child.stderr.take().map(|stderr| {
            let stderr_tail = stderr_tail.clone();
            std::thread::spawn(move || {
                let reader = io::BufReader::new(stderr);
                for line_result in reader.lines() {
                    let line = match line_result {
                        Ok(value) => value.trim().to_string(),
                        Err(_) => break,
                    };
                    if line.is_empty() {
                        continue;
                    }
                    let mut tail = stderr_tail.lock();
                    if tail.len() >= STREAM_PUBLISH_STDERR_TAIL_LINES {
                        tail.pop_front();
                    }
                    tail.push_back(line);
                }
            })
        });

        let Some(mut stdout) = child.stdout.take() else {
            let _ = event_tx.send(StreamPublishEvent::Error(
                "stream publish visualizer missing stdout".to_string(),
            ));
            process_compat::terminate_child(&mut child, "stream_publish");
            let _ = child.wait();
            if let Some(handle) = stderr_thread.take() {
                let _ = handle.join();
            }
            child_pid_thread.store(0, Ordering::SeqCst);
            return;
        };

        let mut first_frame_reported = false;
        let mut read_buffer = [0u8; 16 * 1024];
        let mut h264_buffer = Vec::<u8>::with_capacity(256 * 1024);

        loop {
            if stop_clone.load(Ordering::Relaxed) {
                break;
            }

            match stdout.read(&mut read_buffer) {
                Ok(0) => break,
                Ok(bytes_read) => {
                    h264_buffer.extend_from_slice(&read_buffer[..bytes_read]);
                    for access_unit in drain_h264_access_units(&mut h264_buffer, false) {
                        if !first_frame_reported {
                            first_frame_reported = true;
                            let startup_ms = pipeline_started_at.elapsed().as_millis() as u64;
                            info!(
                                startup_ms,
                                fps = STREAM_PUBLISH_TARGET_FPS,
                                visualizer_mode = %visualizer_mode,
                                "stream publish visualizer produced first video frame"
                            );
                            let _ = event_tx.send(StreamPublishEvent::FirstFrame {
                                startup_ms,
                                fps: STREAM_PUBLISH_TARGET_FPS,
                            });
                        }
                        if frame_tx
                            .send(StreamPublishFrame {
                                access_unit,
                                timestamp_increment: 90_000 / STREAM_PUBLISH_TARGET_FPS,
                            })
                            .is_err()
                        {
                            break;
                        }
                    }
                }
                Err(error) => {
                    let _ = event_tx.send(StreamPublishEvent::Error(format!(
                        "stream publish visualizer stdout read failed: {error}"
                    )));
                    break;
                }
            }
        }

        if !stop_clone.load(Ordering::Relaxed) {
            for access_unit in drain_h264_access_units(&mut h264_buffer, true) {
                let _ = frame_tx.send(StreamPublishFrame {
                    access_unit,
                    timestamp_increment: 90_000 / STREAM_PUBLISH_TARGET_FPS,
                });
            }
        }

        process_compat::terminate_child(&mut child, "stream_publish");
        let wait_result = child.wait();
        if let Some(handle) = stderr_thread.take() {
            let _ = handle.join();
        }
        child_pid_thread.store(0, Ordering::SeqCst);
        paused_thread.store(false, Ordering::SeqCst);

        let stderr_summary = {
            let tail = stderr_tail.lock();
            if tail.is_empty() {
                String::new()
            } else {
                format!(
                    " | stderr tail: {}",
                    tail.iter().cloned().collect::<Vec<_>>().join(" || ")
                )
            }
        };

        if !stop_clone.load(Ordering::Relaxed) {
            match wait_result {
                Ok(status) if status.success() => {
                    let _ = event_tx.send(StreamPublishEvent::Idle);
                }
                Ok(status) => {
                    let _ = event_tx.send(StreamPublishEvent::Error(format!(
                        "stream publish visualizer exited with status {status}{stderr_summary}"
                    )));
                }
                Err(error) => {
                    let _ = event_tx.send(StreamPublishEvent::Error(format!(
                        "stream publish visualizer wait failed: {error}{stderr_summary}"
                    )));
                }
            }
        }
    });

    Self {
        stop,
        paused,
        child_pid,
        thread: Some(thread),
        mode: StreamPublishPlayerMode::Url,
    }
}

pub(crate) fn start_browser_frames(
    mime_type: &str,
    frame_tx: crossbeam::Sender<StreamPublishFrame>,
    event_tx: crossbeam::Sender<StreamPublishEvent>,
) -> Self {
    let stop = Arc::new(AtomicBool::new(false));
    let stop_clone = stop.clone();
    let paused = Arc::new(AtomicBool::new(false));
    let paused_thread = paused.clone();
    let child_pid = Arc::new(AtomicU32::new(0));
    let child_pid_thread = child_pid.clone();
    let mime_type = normalize_browser_frame_mime_type(mime_type)
        .unwrap_or("image/png")
        .to_string();
    let stdin = Arc::new(parking_lot::Mutex::new(None));
    let stdin_thread = stdin.clone();
    let timestamp_increments = Arc::new(parking_lot::Mutex::new(VecDeque::<u32>::new()));
    let timestamp_increments_thread = timestamp_increments.clone();
    let last_captured_at_ms = Arc::new(AtomicU64::new(0));
    let browser_mime_type = mime_type.clone();

    let thread = std::thread::spawn(move || {
        let pipeline_command =
            build_stream_publish_browser_pipeline_command(&browser_mime_type);
        let pipeline_started_at = tokio::time::Instant::now();
        let child = {
            let mut cmd = process_compat::shell_command(&pipeline_command);
            cmd.stdin(Stdio::piped()).stdout(Stdio::piped()).stderr(Stdio::piped());
            cmd.spawn()
        };

        let mut child = match child {
            Ok(child) => child,
            Err(error) => {
                let _ = event_tx.send(StreamPublishEvent::Error(format!(
                    "stream publish browser ffmpeg spawn failed: {error}"
                )));
                return;
            }
        };
        child_pid_thread.store(child.id(), Ordering::SeqCst);
        *stdin_thread.lock() = child.stdin.take();

        let stderr_tail = Arc::new(parking_lot::Mutex::new(VecDeque::<String>::new()));
        let mut stderr_thread = child.stderr.take().map(|stderr| {
            let stderr_tail = stderr_tail.clone();
            std::thread::spawn(move || {
                let reader = io::BufReader::new(stderr);
                for line_result in reader.lines() {
                    let line = match line_result {
                        Ok(value) => value.trim().to_string(),
                        Err(_) => break,
                    };
                    if line.is_empty() {
                        continue;
                    }
                    let mut tail = stderr_tail.lock();
                    if tail.len() >= STREAM_PUBLISH_STDERR_TAIL_LINES {
                        tail.pop_front();
                    }
                    tail.push_back(line);
                }
            })
        });

        let Some(mut stdout) = child.stdout.take() else {
            let _ = event_tx.send(StreamPublishEvent::Error(
                "stream publish browser pipeline missing stdout".to_string(),
            ));
            *stdin_thread.lock() = None;
            process_compat::terminate_child(&mut child, "stream_publish");
            let _ = child.wait();
            if let Some(handle) = stderr_thread.take() {
                let _ = handle.join();
            }
            child_pid_thread.store(0, Ordering::SeqCst);
            return;
        };

        let mut first_frame_reported = false;
        let mut read_buffer = [0u8; 16 * 1024];
        let mut h264_buffer = Vec::<u8>::with_capacity(256 * 1024);

        loop {
            if stop_clone.load(Ordering::Relaxed) {
                break;
            }

            match stdout.read(&mut read_buffer) {
                Ok(0) => break,
                Ok(bytes_read) => {
                    h264_buffer.extend_from_slice(&read_buffer[..bytes_read]);
                    for access_unit in drain_h264_access_units(&mut h264_buffer, false) {
                        if !first_frame_reported {
                            first_frame_reported = true;
                            let startup_ms = pipeline_started_at.elapsed().as_millis() as u64;
                            info!(
                                startup_ms,
                                fps = STREAM_PUBLISH_TARGET_FPS,
                                mime_type = %browser_mime_type,
                                "stream publish produced first browser video frame"
                            );
                            let _ = event_tx.send(StreamPublishEvent::FirstFrame {
                                startup_ms,
                                fps: STREAM_PUBLISH_TARGET_FPS,
                            });
                        }
                        let timestamp_increment = timestamp_increments_thread
                            .lock()
                            .pop_front()
                            .unwrap_or(90_000 / STREAM_PUBLISH_TARGET_FPS);
                        if frame_tx
                            .send(StreamPublishFrame {
                                access_unit,
                                timestamp_increment,
                            })
                            .is_err()
                        {
                            break;
                        }
                    }
                }
                Err(error) => {
                    let _ = event_tx.send(StreamPublishEvent::Error(format!(
                        "stream publish browser stdout read failed: {error}"
                    )));
                    break;
                }
            }
        }

        if !stop_clone.load(Ordering::Relaxed) {
            for access_unit in drain_h264_access_units(&mut h264_buffer, true) {
                let timestamp_increment = timestamp_increments_thread
                    .lock()
                    .pop_front()
                    .unwrap_or(90_000 / STREAM_PUBLISH_TARGET_FPS);
                let _ = frame_tx.send(StreamPublishFrame {
                    access_unit,
                    timestamp_increment,
                });
            }
        }

        *stdin_thread.lock() = None;
        process_compat::terminate_child(&mut child, "stream_publish");
        let wait_result = child.wait();
        if let Some(handle) = stderr_thread.take() {
            let _ = handle.join();
        }
        child_pid_thread.store(0, Ordering::SeqCst);
        paused_thread.store(false, Ordering::SeqCst);

        let stderr_summary = {
            let tail = stderr_tail.lock();
            if tail.is_empty() {
                String::new()
            } else {
                format!(
                    " | stderr tail: {}",
                    tail.iter().cloned().collect::<Vec<_>>().join(" || ")
                )
            }
        };

        if !stop_clone.load(Ordering::Relaxed) {
            match wait_result {
                Ok(status) if status.success() => {
                    let _ = event_tx.send(StreamPublishEvent::Idle);
                }
                Ok(status) => {
                    let _ = event_tx.send(StreamPublishEvent::Error(format!(
                        "stream publish browser pipeline exited with status {status}{stderr_summary}"
                    )));
                }
                Err(error) => {
                    let _ = event_tx.send(StreamPublishEvent::Error(format!(
                        "stream publish browser pipeline wait failed: {error}{stderr_summary}"
                    )));
                }
            }
        }
    });

    Self {
        stop,
        paused,
        child_pid,
        thread: Some(thread),
        mode: StreamPublishPlayerMode::BrowserFrames {
            mime_type,
            stdin,
            timestamp_increments,
            last_captured_at_ms,
        },
    }
}

pub(crate) fn push_browser_frame(
    &self,
    mime_type: &str,
    frame_bytes: &[u8],
    captured_at_ms: u64,
) -> io::Result<()> {
    let StreamPublishPlayerMode::BrowserFrames {
        mime_type: active_mime_type,
        stdin,
        timestamp_increments,
        last_captured_at_ms,
    } = &self.mode
    else {
        return Err(io::Error::new(
            io::ErrorKind::InvalidInput,
            "stream publish player is not a browser frame source",
        ));
    };

    if active_mime_type != mime_type {
        return Err(io::Error::new(
            io::ErrorKind::InvalidInput,
            "stream publish browser frame mime type mismatch",
        ));
    }
    if frame_bytes.is_empty() {
        return Err(io::Error::new(
            io::ErrorKind::InvalidInput,
            "stream publish browser frame was empty",
        ));
    }
    if !self.is_alive() {
        return Err(io::Error::new(
            io::ErrorKind::BrokenPipe,
            "stream publish browser pipeline is not running",
        ));
    }

    let timestamp_increment =
        compute_browser_frame_timestamp_increment(last_captured_at_ms, captured_at_ms);
    timestamp_increments.lock().push_back(timestamp_increment);

    let mut guard = stdin.lock();
    let Some(writer) = guard.as_mut() else {
        return Err(io::Error::new(
            io::ErrorKind::BrokenPipe,
            "stream publish browser pipeline stdin unavailable",
        ));
    };
    if let Err(error) = writer.write_all(frame_bytes) {
        let _ = timestamp_increments.lock().pop_back();
        return Err(error);
    }
    writer.flush()
}

pub(crate) fn is_alive(&self) -> bool {
    self.child_pid.load(Ordering::SeqCst) != 0
}

pub(crate) fn pause(&self) -> bool {
    if self.paused.load(Ordering::SeqCst) {
        return self.is_alive();
    }
    let pid = self.child_pid.load(Ordering::SeqCst);
    if pid == 0 {
        return false;
    }
    match process_compat::signal_process_group(pid, ProcessSignal::Suspend) {
        Ok(()) => {
            self.paused.store(true, Ordering::SeqCst);
            true
        }
        Err(error) => {
            if error.kind() != io::ErrorKind::NotFound {
                warn!(pid, error = %error, "failed to pause stream publish process group");
            }
            false
        }
    }
}

pub(crate) fn resume(&self) -> bool {
    if !self.paused.load(Ordering::SeqCst) {
        return self.is_alive();
    }
    let pid = self.child_pid.load(Ordering::SeqCst);
    if pid == 0 {
        return false;
    }
    match process_compat::signal_process_group(pid, ProcessSignal::Resume) {
        Ok(()) => {
            self.paused.store(false, Ordering::SeqCst);
            true
        }
        Err(error) => {
            if error.kind() != io::ErrorKind::NotFound {
                warn!(
                    pid,
                    error = %error,
                    "failed to resume stream publish process group"
                );
            }
            false
        }
    }
}

pub(crate) fn stop(mut self) {
    self.stop.store(true, Ordering::SeqCst);
    if let StreamPublishPlayerMode::BrowserFrames { stdin, .. } = &self.mode {
        *stdin.lock() = None;
    }
    if let Some(handle) = self.thread.take() {
        let pid = self.child_pid.load(Ordering::SeqCst);
        if let Err(error) = process_compat::signal_process_group(pid, ProcessSignal::Terminate) {
            if error.kind() != io::ErrorKind::NotFound {
                warn!(
                    pid,
                    error = %error,
                    "failed to stop stream publish process group"
                );
            }
        }
        let _ = handle.join();
    }
}

}

impl AppState { fn clear_stream_publish_runtime_buffers(&self) { while self.stream_publish_frame_rx.try_recv().is_ok() {} while self.stream_publish_event_rx.try_recv().is_ok() {} }

pub(crate) fn stop_stream_publish_runtime(&mut self, reason: &str) {
    if let Some(conn) = self.stream_publish_conn.as_ref() {
        if let Err(error) = conn.set_stream_publish_speaking(false) {
            warn!(reason = reason, error = %error, "failed to disable stream publish speaking");
        }
        if let Err(error) = conn.set_stream_publish_video_active(false) {
            warn!(reason = reason, error = %error, "failed to disable stream publish video state");
        }
    }
    self.stream_publish.stop_player();
    self.clear_stream_publish_runtime_buffers();
    self.stream_publish.active = false;
    self.stream_publish.paused = false;
    self.stream_publish.active_source = None;
    self.stream_publish_frames_sent = 0;
}

pub(crate) fn maybe_start_stream_publish_pipeline(&mut self) {
    if self.stream_publish_conn.is_none()
        || self.stream_publish.active
        || self.stream_publish.paused
    {
        return;
    }
    let Some(source) = self.stream_publish.pending_source.clone() else {
        return;
    };

    self.stop_stream_publish_runtime("restart_before_publish_start");
    self.clear_stream_publish_runtime_buffers();

    self.stream_publish.player = Some(match &source {
        StreamPublishSource::Url {
            url,
            resolved_direct_url,
        } => StreamPublishPlayer::start_url(
            url,
            self.stream_publish_frame_tx.clone(),
            self.stream_publish_event_tx.clone(),
            *resolved_direct_url,
        ),
        StreamPublishSource::Visualizer {
            url,
            resolved_direct_url,
            visualizer_mode,
        } => StreamPublishPlayer::start_visualizer(
            url,
            self.stream_publish_frame_tx.clone(),
            self.stream_publish_event_tx.clone(),
            *resolved_direct_url,
            visualizer_mode,
        ),
        StreamPublishSource::BrowserFrames { mime_type } => {
            StreamPublishPlayer::start_browser_frames(
                mime_type,
                self.stream_publish_frame_tx.clone(),
                self.stream_publish_event_tx.clone(),
            )
        }
    });
    self.stream_publish.active = true;
    self.stream_publish.paused = false;
    self.stream_publish.active_source = Some(source.clone());
    self.stream_publish.clear_pending_start();

    if let Some(conn) = self.stream_publish_conn.as_ref() {
        if let Err(error) = conn.set_stream_publish_video_active(true) {
            warn!(error = %error, "failed to announce active stream publish video state");
        }
        if let Err(error) = conn.set_stream_publish_speaking(true) {
            warn!(error = %error, "failed to enable stream publish speaking state");
        }
    }
    self.emit_transport_state(TransportRole::StreamPublish, "playing", None);
    match source {
        StreamPublishSource::Url {
            url,
            resolved_direct_url,
        } => {
            info!(
                url = %url,
                resolved_direct_url,
                "started stream publish pipeline"
            );
        }
        StreamPublishSource::Visualizer {
            url,
            resolved_direct_url,
            visualizer_mode,
        } => {
            info!(
                url = %url,
                resolved_direct_url,
                visualizer_mode = %visualizer_mode,
                "started stream publish visualizer pipeline"
            );
        }
        StreamPublishSource::BrowserFrames { mime_type } => {
            info!(mime_type = %mime_type, "started browser stream publish pipeline");
        }
    }
}

pub(crate) fn handle_stream_publish_command(&mut self, msg: StreamPublishCommand) {
    match msg {
        StreamPublishCommand::Play {
            url,
            resolved_direct_url,
        } => {
            let normalized_url = url.trim().to_string();
            if normalized_url.is_empty() {
                self.emit_transport_state(
                    TransportRole::StreamPublish,
                    "failed",
                    Some("stream_publish_play_missing_url"),
                );
                return;
            }
            let source = StreamPublishSource::Url {
                url: normalized_url,
                resolved_direct_url,
            };
            if self.stream_publish.active
                && !self.stream_publish.paused
                && self.stream_publish.active_source.as_ref() == Some(&source)
            {
                self.emit_transport_state(TransportRole::StreamPublish, "playing", None);
                return;
            }
            if self.stream_publish.active_source.as_ref() != Some(&source) {
                self.stop_stream_publish_runtime("stream_publish_source_switch");
            }
            self.stream_publish.queue_pending_start(source);
            if self.stream_publish_conn.is_some() {
                self.maybe_start_stream_publish_pipeline();
            } else {
                self.emit_transport_state(
                    TransportRole::StreamPublish,
                    "waiting_for_transport",
                    None,
                );
            }
        }
        StreamPublishCommand::PlayVisualizer {
            url,
            resolved_direct_url,
            visualizer_mode,
        } => {
            let normalized_url = url.trim().to_string();
            if normalized_url.is_empty() {
                self.emit_transport_state(
                    TransportRole::StreamPublish,
                    "failed",
                    Some("stream_publish_play_visualizer_missing_url"),
                );
                return;
            }
            let normalized_mode = match visualizer_mode.trim() {
                "spectrum" => "spectrum",
                "waves" => "waves",
                "vectorscope" => "vectorscope",
                _ => "cqt",
            }
            .to_string();
            let source = StreamPublishSource::Visualizer {
                url: normalized_url,
                resolved_direct_url,
                visualizer_mode: normalized_mode,
            };
            if self.stream_publish.active
                && !self.stream_publish.paused
                && self.stream_publish.active_source.as_ref() == Some(&source)
            {
                self.emit_transport_state(TransportRole::StreamPublish, "playing", None);
                return;
            }
            if self.stream_publish.active_source.as_ref() != Some(&source) {
                self.stop_stream_publish_runtime("stream_publish_source_switch");
            }
            self.stream_publish.queue_pending_start(source);
            if self.stream_publish_conn.is_some() {
                self.maybe_start_stream_publish_pipeline();
            } else {
                self.emit_transport_state(
                    TransportRole::StreamPublish,
                    "waiting_for_transport",
                    None,
                );
            }
        }
        StreamPublishCommand::BrowserStart { mime_type } => {
            let Some(normalized_mime_type) =
                normalize_browser_frame_mime_type(&mime_type).map(str::to_string)
            else {
                self.emit_transport_state(
                    TransportRole::StreamPublish,
                    "failed",
                    Some("stream_publish_browser_start_unsupported_mime_type"),
                );
                return;
            };
            let source = StreamPublishSource::BrowserFrames {
                mime_type: normalized_mime_type,
            };
            if self.stream_publish.active
                && !self.stream_publish.paused
                && self.stream_publish.active_source.as_ref() == Some(&source)
            {
                self.emit_transport_state(TransportRole::StreamPublish, "playing", None);
                return;
            }
            if self.stream_publish.active_source.as_ref() != Some(&source) {
                self.stop_stream_publish_runtime("stream_publish_source_switch");
            }
            self.stream_publish.queue_pending_start(source);
            if self.stream_publish_conn.is_some() {
                self.maybe_start_stream_publish_pipeline();
            } else {
                self.emit_transport_state(
                    TransportRole::StreamPublish,
                    "waiting_for_transport",
                    None,
                );
            }
        }
        StreamPublishCommand::BrowserFrame {
            mime_type,
            frame_base64,
            captured_at_ms,
        } => {
            let Some(normalized_mime_type) =
                normalize_browser_frame_mime_type(&mime_type).map(str::to_string)
            else {
                self.emit_transport_state(
                    TransportRole::StreamPublish,
                    "failed",
                    Some("stream_publish_browser_frame_unsupported_mime_type"),
                );
                return;
            };
            let frame_bytes = match decode_stream_publish_browser_frame(&frame_base64) {
                Ok(bytes) => bytes,
                Err(error) => {
                    self.emit_transport_state(
                        TransportRole::StreamPublish,
                        "failed",
                        Some(&error),
                    );
                    return;
                }
            };

            if self.stream_publish_conn.is_some()
                && !self.stream_publish.active
                && matches!(
                    self.stream_publish.pending_source.as_ref(),
                    Some(StreamPublishSource::BrowserFrames { mime_type })
                        if mime_type == &normalized_mime_type
                )
            {
                self.maybe_start_stream_publish_pipeline();
            }

            let Some(player) = self.stream_publish.player.as_ref() else {
                self.emit_transport_state(
                    TransportRole::StreamPublish,
                    "waiting_for_transport",
                    Some("stream_publish_browser_source_not_started"),
                );
                return;
            };
            if !matches!(
                self.stream_publish.active_source.as_ref(),
                Some(StreamPublishSource::BrowserFrames { mime_type })
                    if mime_type == &normalized_mime_type
            ) {
                self.emit_transport_state(
                    TransportRole::StreamPublish,
                    "failed",
                    Some("stream_publish_browser_frame_source_mismatch"),
                );
                return;
            }
            if let Err(error) =
                player.push_browser_frame(&normalized_mime_type, &frame_bytes, captured_at_ms)
            {
                self.emit_transport_state(
                    TransportRole::StreamPublish,
                    "failed",
                    Some(&format!(
                        "stream_publish_browser_frame_write_failed: {error}"
                    )),
                );
            }
        }
        StreamPublishCommand::Stop => {
            self.stop_stream_publish_runtime("stream_publish_stop");
            self.stream_publish.clear_pending_start();
            self.emit_transport_state(
                TransportRole::StreamPublish,
                "ready",
                Some("stream_publish_stopped"),
            );
        }
        StreamPublishCommand::Pause => {
            self.stream_publish.paused = true;
            if let Some(player) = self.stream_publish.player.as_ref() {
                let _ = player.pause();
            }
            if let Some(conn) = self.stream_publish_conn.as_ref() {
                let _ = conn.set_stream_publish_speaking(false);
                let _ = conn.set_stream_publish_video_active(false);
            }
            self.emit_transport_state(TransportRole::StreamPublish, "paused", None);
        }
        StreamPublishCommand::Resume => {
            self.stream_publish.paused = false;
            if let Some(player) = self.stream_publish.player.as_ref() {
                if player.resume() {
                    if let Some(conn) = self.stream_publish_conn.as_ref() {
                        let _ = conn.set_stream_publish_video_active(true);
                        let _ = conn.set_stream_publish_speaking(true);
                    }
                    self.emit_transport_state(TransportRole::StreamPublish, "playing", None);
                    return;
                }
            }
            if let Some(active_source) = self.stream_publish.active_source.clone() {
                self.stream_publish.queue_pending_start(active_source);
                self.stream_publish.active = false;
                self.stream_publish.active_source = None;
            }
            self.maybe_start_stream_publish_pipeline();
        }
    }
}

fn handle_stream_publish_event(&mut self, event: StreamPublishEvent) {
    match event {
        StreamPublishEvent::Idle => {
            self.stop_stream_publish_runtime("stream_publish_idle");
            self.emit_transport_state(
                TransportRole::StreamPublish,
                "ready",
                Some("stream_publish_idle"),
            );
        }
        StreamPublishEvent::Error(message) => {
            self.stop_stream_publish_runtime("stream_publish_error");
            self.emit_transport_state(TransportRole::StreamPublish, "failed", Some(&message));
        }
        StreamPublishEvent::FirstFrame { startup_ms, fps } => {
            info!(startup_ms, fps, "stream publish first frame observed");
        }
    }
}

pub(crate) fn drain_stream_publish_runtime_events(&mut self) {
    while let Ok(event) = self.stream_publish_event_rx.try_recv() {
        self.handle_stream_publish_event(event);
    }
}

pub(crate) async fn send_pending_stream_publish_frame(&mut self) {
    if !self.stream_publish.active || self.stream_publish.paused {
        return;
    }

    // Drain all available frames this tick rather than just one.
    // ffmpeg with -re paces output at ~30fps, but read() can deliver
    // multiple access units in a single stdout chunk.  Sending only
    // one per 20ms tick can fall behind, causing the viewer to see a
    // choppy slideshow as frames queue up with stale RTP timestamps.
    //
    // Cap at 4 frames per tick to avoid monopolising the event loop
    // if the queue is deeply backed up (e.g. after unpause).
    const MAX_FRAMES_PER_TICK: usize = 4;
    let mut frames_this_tick = 0;

    while let Ok(frame) = self.stream_publish_frame_rx.try_recv() {
        frames_this_tick += 1;
        self.stream_publish_frames_sent += 1;
        let queue_depth = self.stream_publish_frame_rx.len();
        if self.stream_publish_frames_sent <= 5
            || self.stream_publish_frames_sent % 150 == 0
            || queue_depth > 10
        {
            info!(
                frame_number = self.stream_publish_frames_sent,
                frame_bytes = frame.access_unit.len(),
                queue_depth,
                frames_this_tick,
                timestamp_increment = frame.timestamp_increment,
                "clankvox_stream_publish_frame_sent"
            );
        }

        let encrypted_frame = {
            let mut guard = self.stream_publish_dave.lock();
            match *guard {
                Some(ref mut dave_manager) if dave_manager.is_ready() => dave_manager
                    .encrypt_video(&frame.access_unit)
                    .unwrap_or_else(|error| {
                        warn!(error = %error, "stream publish DAVE encrypt fallback to unencrypted");
                        frame.access_unit.clone()
                    }),
                _ => {
                    if self.stream_publish_frames_sent <= 3 {
                        warn!(
                            frame_number = self.stream_publish_frames_sent,
                            "stream publish frame sent without DAVE (not ready)"
                        );
                    }
                    frame.access_unit.clone()
                }
            }
        };

        if let Some(conn) = self.stream_publish_conn.as_ref() {
            if let Err(error) = conn
                .send_h264_frame(&encrypted_frame, frame.timestamp_increment)
                .await
            {
                warn!(error = %error, "failed to send stream publish video frame");
            }
        }

        if frames_this_tick >= MAX_FRAMES_PER_TICK {
            break;
        }
    }
}

}

#[cfg(test)] mod tests { use super::{ STREAM_PUBLISH_TARGET_FPS, build_stream_publish_browser_pipeline_command, build_stream_publish_pipeline_command, drain_h264_access_units, };

#[test]
fn build_stream_publish_pipeline_command_uses_direct_ffmpeg_for_direct_urls() {
    let command =
        build_stream_publish_pipeline_command("https://cdn.example.com/video.mp4", true);
    assert!(command.contains("ffmpeg"));
    assert!(!command.contains("yt-dlp"));
    assert!(command.contains(&format!("fps={STREAM_PUBLISH_TARGET_FPS}")));
}

#[test]
fn build_stream_publish_pipeline_command_uses_ytdlp_for_indirect_urls() {
    let command =
        build_stream_publish_pipeline_command("https://www.youtube.com/watch?v=abc123", false);
    assert!(command.contains("yt-dlp"));
    assert!(command.contains("h264_metadata=aud=insert"));
}

#[test]
fn build_stream_publish_browser_pipeline_command_uses_image2pipe_png_input() {
    let command = build_stream_publish_browser_pipeline_command("image/png");
    assert!(command.contains("-f image2pipe"));
    assert!(command.contains("-codec:v png"));
    assert!(command.contains("h264_metadata=aud=insert"));
}

#[test]
fn drain_h264_access_units_splits_on_aud_boundaries() {
    let mut buffer = vec![
        0, 0, 0, 1, 0x09, 0xf0, 0, 0, 0, 1, 0x67, 0x01, 0x02, 0, 0, 0, 1, 0x09, 0xf0, 0, 0, 0,
        1, 0x65, 0xaa,
    ];
    let frames = drain_h264_access_units(&mut buffer, false);
    assert_eq!(frames.len(), 1);
    assert!(frames[0].starts_with(&[0, 0, 0, 1, 0x09]));
    assert!(buffer.starts_with(&[0, 0, 0, 1, 0x09]));
}

}