src/connection_supervisor.rs
use crate::app_state::{AppState, TryConnectOutcome}; use crate::ipc::{ErrorCode, send_error}; use crate::ipc_protocol::ConnectionCommand; use crate::voice_conn::{TransportRole, VoiceConnection, VoiceConnectionParams};
impl AppState { async fn maybe_try_connect(&mut self, failure_reason: &str, source: &str) { if self.reconnect_deadline.is_some() { tracing::info!( source = source, "Reconnect already scheduled; deferring immediate voice connect" ); return; }
let outcome = self.try_connect().await;
self.apply_connect_outcome(outcome, failure_reason);
}
pub(crate) fn apply_connect_outcome(
&mut self,
outcome: TryConnectOutcome,
failure_reason: &str,
) {
match outcome {
TryConnectOutcome::Connected => self.reset_reconnect(),
TryConnectOutcome::Failed => self.schedule_reconnect(failure_reason),
TryConnectOutcome::AlreadyConnected | TryConnectOutcome::MissingData => {}
}
}
pub(crate) async fn try_connect(&mut self) -> TryConnectOutcome {
if self.voice_conn.is_some() {
return TryConnectOutcome::AlreadyConnected;
}
let Some(guild_id) = self.guild_id else {
return TryConnectOutcome::MissingData;
};
let Some(channel_id) = self.channel_id else {
return TryConnectOutcome::MissingData;
};
let Some(user_id) = self.pending_conn.user_id else {
return TryConnectOutcome::MissingData;
};
let Some(endpoint) = self.pending_conn.endpoint.as_deref() else {
return TryConnectOutcome::MissingData;
};
let Some(session_id) = self.pending_conn.session_id.as_deref() else {
return TryConnectOutcome::MissingData;
};
let Some(token) = self.pending_conn.token.as_deref() else {
return TryConnectOutcome::MissingData;
};
tracing::info!(
endpoint = ?self.pending_conn.endpoint,
guild_id,
channel_id,
user_id,
"Connecting primary voice transport"
);
match VoiceConnection::connect(
VoiceConnectionParams {
endpoint,
server_id: guild_id,
user_id,
session_id,
token,
dave_channel_id: channel_id,
role: TransportRole::Voice,
},
self.voice_event_tx.clone(),
self.dave.clone(),
)
.await
{
Ok(conn) => {
self.voice_conn = Some(conn);
TryConnectOutcome::Connected
}
Err(error) => {
tracing::error!("Voice connection failed: {error}");
send_error(
ErrorCode::VoiceConnectFailed,
format!("Voice connect failed: {error}"),
);
TryConnectOutcome::Failed
}
}
}
async fn try_connect_stream_watch(&mut self) -> TryConnectOutcome {
if self.stream_watch_conn.is_some() {
return TryConnectOutcome::AlreadyConnected;
}
let Some(server_id) = self.stream_watch_pending_conn.server_id else {
return TryConnectOutcome::MissingData;
};
let Some(dave_channel_id) = self.stream_watch_pending_conn.dave_channel_id else {
return TryConnectOutcome::MissingData;
};
let Some(user_id) = self.stream_watch_pending_conn.user_id else {
return TryConnectOutcome::MissingData;
};
let Some(endpoint) = self.stream_watch_pending_conn.endpoint.as_deref() else {
return TryConnectOutcome::MissingData;
};
let Some(session_id) = self.stream_watch_pending_conn.session_id.as_deref() else {
return TryConnectOutcome::MissingData;
};
let Some(token) = self.stream_watch_pending_conn.token.as_deref() else {
return TryConnectOutcome::MissingData;
};
tracing::info!(
endpoint = ?self.stream_watch_pending_conn.endpoint,
server_id,
dave_channel_id,
user_id,
"Connecting stream watch transport"
);
match VoiceConnection::connect(
VoiceConnectionParams {
endpoint,
server_id,
user_id,
session_id,
token,
dave_channel_id,
role: TransportRole::StreamWatch,
},
self.voice_event_tx.clone(),
self.stream_watch_dave.clone(),
)
.await
{
Ok(conn) => {
self.stream_watch_conn = Some(conn);
TryConnectOutcome::Connected
}
Err(error) => {
tracing::error!("Stream watch connection failed: {error}");
send_error(
ErrorCode::StreamWatchConnectFailed,
format!("Stream watch connect failed: {error}"),
);
self.emit_transport_state(
TransportRole::StreamWatch,
"failed",
Some(&error.to_string()),
);
TryConnectOutcome::Failed
}
}
}
async fn try_connect_stream_publish(&mut self) -> TryConnectOutcome {
if self.stream_publish_conn.is_some() {
return TryConnectOutcome::AlreadyConnected;
}
let Some(server_id) = self.stream_publish_pending_conn.server_id else {
return TryConnectOutcome::MissingData;
};
let Some(dave_channel_id) = self.stream_publish_pending_conn.dave_channel_id else {
return TryConnectOutcome::MissingData;
};
let Some(user_id) = self.stream_publish_pending_conn.user_id else {
return TryConnectOutcome::MissingData;
};
let Some(endpoint) = self.stream_publish_pending_conn.endpoint.as_deref() else {
return TryConnectOutcome::MissingData;
};
let Some(session_id) = self.stream_publish_pending_conn.session_id.as_deref() else {
return TryConnectOutcome::MissingData;
};
let Some(token) = self.stream_publish_pending_conn.token.as_deref() else {
return TryConnectOutcome::MissingData;
};
tracing::info!(
endpoint = ?self.stream_publish_pending_conn.endpoint,
server_id,
dave_channel_id,
user_id,
"Connecting stream publish transport"
);
match VoiceConnection::connect(
VoiceConnectionParams {
endpoint,
server_id,
user_id,
session_id,
token,
dave_channel_id,
role: TransportRole::StreamPublish,
},
self.voice_event_tx.clone(),
self.stream_publish_dave.clone(),
)
.await
{
Ok(conn) => {
self.stream_publish_conn = Some(conn);
TryConnectOutcome::Connected
}
Err(error) => {
tracing::error!("Stream publish connection failed: {error}");
send_error(
ErrorCode::StreamPublishConnectFailed,
format!("Stream publish connect failed: {error}"),
);
self.emit_transport_state(
TransportRole::StreamPublish,
"failed",
Some(&error.to_string()),
);
TryConnectOutcome::Failed
}
}
}
pub(crate) async fn handle_reconnect_timer(&mut self) {
self.reconnect_deadline = None;
let outcome = self.try_connect().await;
match outcome {
TryConnectOutcome::Connected | TryConnectOutcome::AlreadyConnected => {
self.reconnect_attempt = 0;
}
TryConnectOutcome::Failed | TryConnectOutcome::MissingData => {
self.schedule_reconnect("reconnect_retry");
}
}
}
pub(crate) async fn handle_connection_command(&mut self, msg: ConnectionCommand) {
match msg {
ConnectionCommand::Join {
guild_id,
channel_id,
self_mute,
} => {
let Ok(guild_id) = guild_id.parse::<u64>() else {
send_error(
ErrorCode::InvalidRequest,
format!("join requires a numeric guild_id, got {guild_id:?}"),
);
return;
};
let Ok(channel_id) = channel_id.parse::<u64>() else {
send_error(
ErrorCode::InvalidRequest,
format!("join requires a numeric channel_id, got {channel_id:?}"),
);
return;
};
self.guild_id = Some(guild_id);
self.channel_id = Some(channel_id);
self.self_mute = self_mute;
self.reset_reconnect();
crate::ipc::send_gateway_voice_state_update(guild_id, channel_id, self_mute);
tracing::info!(
guild_id,
channel_id,
"Join requested; forwarded OP4 voice state update"
);
}
ConnectionCommand::VoiceServer { data } => {
let endpoint = data.endpoint.clone();
let has_token = data.token.is_some();
tracing::info!(
endpoint = ?endpoint,
has_token,
connected = self.voice_conn.is_some(),
"IPC voice_server received"
);
if let Some(ref endpoint) = endpoint {
self.pending_conn.endpoint = Some(endpoint.clone());
}
if let Some(token) = data.token.as_deref() {
self.pending_conn.token = Some(token.to_string());
}
self.maybe_try_connect("voice_server_connect_failed", "voice_server")
.await;
}
ConnectionCommand::VoiceState { data } => {
let new_session_id = data.session_id.clone();
let old_session_id = self.pending_conn.session_id.clone();
let new_user_id = match data.user_id.as_deref() {
Some(user_id) => crate::app_state::parse_user_id_field(user_id, "voice_state"),
None => None,
};
tracing::info!(
session_id = ?new_session_id,
prev_session_id = ?old_session_id,
channel_id = ?data.channel_id,
user_id = ?new_user_id,
connected = self.voice_conn.is_some(),
stream_watch_connected = self.stream_watch_conn.is_some(),
"IPC voice_state received"
);
if let Some(ref session_id) = new_session_id {
if self.voice_conn.is_some()
&& old_session_id.as_deref() != Some(session_id.as_str())
{
tracing::warn!(
previous = ?old_session_id,
current = ?new_session_id,
"Voice session id changed while connected; tearing down for reconnect"
);
self.clear_voice_connection();
self.clear_transport_runtime_state("session_id_changed");
}
if self.stream_watch_conn.is_some()
&& self.stream_watch_pending_conn.session_id.as_deref()
!= Some(session_id.as_str())
{
tracing::warn!(
previous = ?self.stream_watch_pending_conn.session_id,
current = ?new_session_id,
"Stream watch session id changed while connected; closing stream transport"
);
self.clear_stream_watch_connection();
self.emit_transport_state(
TransportRole::StreamWatch,
"disconnected",
Some("session_id_changed"),
);
}
if self.stream_publish_conn.is_some()
&& self.stream_publish_pending_conn.session_id.as_deref()
!= Some(session_id.as_str())
{
tracing::warn!(
previous = ?self.stream_publish_pending_conn.session_id,
current = ?new_session_id,
"Stream publish session id changed while connected; closing stream publish transport"
);
self.clear_stream_publish_connection();
self.emit_transport_state(
TransportRole::StreamPublish,
"disconnected",
Some("session_id_changed"),
);
}
self.pending_conn.session_id = Some(session_id.clone());
self.stream_watch_pending_conn.session_id = Some(session_id.clone());
self.stream_publish_pending_conn.session_id = Some(session_id.clone());
}
if let Some(user_id) = new_user_id {
self.pending_conn.user_id = Some(user_id);
self.self_user_id = Some(user_id);
if self.stream_watch_pending_conn.user_id.is_none() {
self.stream_watch_pending_conn.user_id = Some(user_id);
}
if self.stream_publish_pending_conn.user_id.is_none() {
self.stream_publish_pending_conn.user_id = Some(user_id);
}
}
self.maybe_try_connect("voice_state_connect_failed", "voice_state")
.await;
}
ConnectionCommand::StreamWatchConnect {
endpoint,
token,
server_id,
session_id,
user_id,
dave_channel_id,
} => {
let Some(user_id) =
crate::app_state::parse_user_id_field(&user_id, "stream_watch_connect.user_id")
else {
return;
};
let Some(server_id) = crate::app_state::parse_user_id_field(
&server_id,
"stream_watch_connect.server_id",
) else {
return;
};
let Some(dave_channel_id) = crate::app_state::parse_user_id_field(
&dave_channel_id,
"stream_watch_connect.dave_channel_id",
) else {
return;
};
tracing::info!(
endpoint = %endpoint,
server_id,
dave_channel_id,
user_id,
"IPC stream_watch_connect received"
);
self.clear_stream_watch_connection();
self.stream_watch_pending_conn.endpoint = Some(endpoint);
self.stream_watch_pending_conn.token = Some(token);
self.stream_watch_pending_conn.server_id = Some(server_id);
self.stream_watch_pending_conn.session_id = Some(session_id);
self.stream_watch_pending_conn.user_id = Some(user_id);
self.stream_watch_pending_conn.dave_channel_id = Some(dave_channel_id);
self.emit_transport_state(TransportRole::StreamWatch, "connecting", None);
match self.try_connect_stream_watch().await {
TryConnectOutcome::Connected | TryConnectOutcome::AlreadyConnected => {}
TryConnectOutcome::MissingData => {
self.emit_transport_state(
TransportRole::StreamWatch,
"failed",
Some("missing_stream_watch_credentials"),
);
}
TryConnectOutcome::Failed => {}
}
}
ConnectionCommand::StreamWatchDisconnect { reason } => {
let disconnect_reason = reason.unwrap_or_else(|| "stream_watch_disconnect".into());
tracing::info!(reason = %disconnect_reason, "IPC stream_watch_disconnect received");
self.clear_stream_watch_connection();
self.emit_transport_state(
TransportRole::StreamWatch,
"disconnected",
Some(&disconnect_reason),
);
}
ConnectionCommand::StreamPublishConnect {
endpoint,
token,
server_id,
session_id,
user_id,
dave_channel_id,
} => {
let Some(user_id) = crate::app_state::parse_user_id_field(
&user_id,
"stream_publish_connect.user_id",
) else {
return;
};
let Some(server_id) = crate::app_state::parse_user_id_field(
&server_id,
"stream_publish_connect.server_id",
) else {
return;
};
let Some(dave_channel_id) = crate::app_state::parse_user_id_field(
&dave_channel_id,
"stream_publish_connect.dave_channel_id",
) else {
return;
};
tracing::info!(
endpoint = %endpoint,
server_id,
dave_channel_id,
user_id,
"IPC stream_publish_connect received"
);
self.clear_stream_publish_connection();
self.stream_publish_pending_conn.endpoint = Some(endpoint);
self.stream_publish_pending_conn.token = Some(token);
self.stream_publish_pending_conn.server_id = Some(server_id);
self.stream_publish_pending_conn.session_id = Some(session_id);
self.stream_publish_pending_conn.user_id = Some(user_id);
self.stream_publish_pending_conn.dave_channel_id = Some(dave_channel_id);
self.emit_transport_state(TransportRole::StreamPublish, "connecting", None);
match self.try_connect_stream_publish().await {
TryConnectOutcome::Connected | TryConnectOutcome::AlreadyConnected => {}
TryConnectOutcome::MissingData => {
self.emit_transport_state(
TransportRole::StreamPublish,
"failed",
Some("missing_stream_publish_credentials"),
);
}
TryConnectOutcome::Failed => {}
}
}
ConnectionCommand::StreamPublishDisconnect { reason } => {
let disconnect_reason =
reason.unwrap_or_else(|| "stream_publish_disconnect".into());
tracing::info!(
reason = %disconnect_reason,
"IPC stream_publish_disconnect received"
);
self.stop_stream_publish_runtime("stream_publish_disconnect");
self.clear_stream_publish_connection();
self.emit_transport_state(
TransportRole::StreamPublish,
"disconnected",
Some(&disconnect_reason),
);
}
}
}
}
