diff --git a/apps/labrinth/src/routes/internal/statuses.rs b/apps/labrinth/src/routes/internal/statuses.rs index 7467ad6fc..b54af1869 100644 --- a/apps/labrinth/src/routes/internal/statuses.rs +++ b/apps/labrinth/src/routes/internal/statuses.rs @@ -180,8 +180,7 @@ pub async fn ws_init( } Ok(AggregatedMessage::Ping(msg)) => { - if let Some(mut socket) = - db.auth_sockets.get_mut(&user.id.into()) + if let Some(mut socket) = db.auth_sockets.get_mut(&user.id) { let (_, socket) = socket.value_mut(); let _ = socket.pong(&msg).await; diff --git a/packages/app-lib/src/state/friends.rs b/packages/app-lib/src/state/friends.rs index c4d0f73f4..197590ea8 100644 --- a/packages/app-lib/src/state/friends.rs +++ b/packages/app-lib/src/state/friends.rs @@ -29,9 +29,7 @@ pub struct FriendsSocket { #[derive(Deserialize, Serialize)] pub struct UserFriend { pub id: String, - // TODO: Remove this optional and serde alias on release - pub friend_id: Option, - #[serde(alias = "pending")] + pub friend_id: String, pub accepted: bool, pub created: DateTime, } @@ -72,6 +70,7 @@ impl FriendsSocket { } } + #[tracing::instrument(skip_all)] pub async fn connect( &self, exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite> + Copy, @@ -144,8 +143,20 @@ impl FriendsSocket { ) .ok() } - Message::Ping(_) - | Message::Pong(_) + Message::Ping(bytes) => { + if let Some(write) = write_handle + .write() + .await + .as_mut() + { + let _ = write + .send(Message::Pong(bytes)) + .await; + } + + continue; + } + Message::Pong(_) | Message::Frame(_) => continue, Message::Close(_) => break, }; @@ -175,8 +186,7 @@ impl FriendsSocket { } } Err(e) => { - println!("WebSocket error: {:?}", e); - break; + tracing::error!("Error handling message from websocket server: {:?}", e); } } } @@ -198,11 +208,13 @@ impl FriendsSocket { Ok(()) } - pub async fn reconnect_task() -> crate::Result<()> { + #[tracing::instrument(skip_all)] + pub async fn socket_loop() -> crate::Result<()> { let state = crate::State::get().await?; tokio::task::spawn(async move { let mut last_connection = Utc::now(); + let mut last_ping = Utc::now(); loop { let connected = { @@ -215,6 +227,7 @@ impl FriendsSocket { > chrono::Duration::seconds(30) { last_connection = Utc::now(); + last_ping = Utc::now(); let _ = state .friends_socket .connect( @@ -223,6 +236,15 @@ impl FriendsSocket { &state.process_manager, ) .await; + } else if connected + && Utc::now().signed_duration_since(last_ping) + > chrono::Duration::seconds(10) + { + last_ping = Utc::now(); + let mut write = state.friends_socket.write.write().await; + if let Some(write) = write.as_mut() { + let _ = write.send(Message::Ping(Vec::new())).await; + } } tokio::time::sleep(std::time::Duration::from_secs(1)).await; @@ -232,6 +254,7 @@ impl FriendsSocket { Ok(()) } + #[tracing::instrument(skip(self))] pub async fn disconnect(&self) -> crate::Result<()> { let mut write_lock = self.write.write().await; if let Some(ref mut write_half) = *write_lock { @@ -241,6 +264,7 @@ impl FriendsSocket { Ok(()) } + #[tracing::instrument(skip(self))] pub async fn update_status( &self, profile_name: Option, @@ -257,6 +281,7 @@ impl FriendsSocket { Ok(()) } + #[tracing::instrument(skip_all)] pub async fn friends( exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite> + Copy, semaphore: &FetchSemaphore, @@ -272,6 +297,7 @@ impl FriendsSocket { .await } + #[tracing::instrument(skip(self))] pub fn friend_statuses(&self) -> Vec { self.user_statuses .iter() @@ -279,6 +305,7 @@ impl FriendsSocket { .collect() } + #[tracing::instrument(skip(exec, semaphore))] pub async fn add_friend( user_id: &str, exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite> + Copy, @@ -299,6 +326,7 @@ impl FriendsSocket { Ok(()) } + #[tracing::instrument(skip(exec, semaphore))] pub async fn remove_friend( user_id: &str, exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite> + Copy, diff --git a/packages/app-lib/src/state/mod.rs b/packages/app-lib/src/state/mod.rs index ec0b95e82..25a33befe 100644 --- a/packages/app-lib/src/state/mod.rs +++ b/packages/app-lib/src/state/mod.rs @@ -96,7 +96,7 @@ impl State { &state.process_manager, ) .await; - let _ = FriendsSocket::reconnect_task().await; + let _ = FriendsSocket::socket_loop().await; }); Ok(())