From 3822c209d35b12d987ebaa9b63c292419499ce19 Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Wed, 12 Feb 2025 21:01:34 +0100 Subject: [PATCH] Start sync client manager implementation --- src/broadcast_messages.rs | 6 +++--- src/lib.rs | 4 ++-- src/main.rs | 4 +++- src/server/api/ws.rs | 11 ++++++++--- src/server/web_ui.rs | 7 +++++-- src/sync_client.rs | 27 ++++++++++++++++++++++++++- 6 files changed, 47 insertions(+), 12 deletions(-) diff --git a/src/broadcast_messages.rs b/src/broadcast_messages.rs index 5dc6766..c9ae251 100644 --- a/src/broadcast_messages.rs +++ b/src/broadcast_messages.rs @@ -9,9 +9,9 @@ pub enum BroadcastMessage { /// Close all the sessions of a given user CloseAllUserSessions(UserID), /// Stop sync client for a given user - StopSyncClientForUser(UserID), + StopSyncTaskForUser(UserID), /// Start sync client for a given user (if not already running) - StartSyncClientForUser(UserID), + StartSyncTaskForUser(UserID), /// Stop a client with a given client ID StopSyncClient(SyncClientID), -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index f9b42fd..e0979b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,8 @@ pub mod app_config; +pub mod broadcast_messages; pub mod constants; pub mod extractors; pub mod server; +pub mod sync_client; pub mod user; pub mod utils; -pub mod sync_client; -pub mod broadcast_messages; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 27a8ffa..36ca740 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ use actix_web::{web, App, HttpServer}; use matrix_gateway::app_config::AppConfig; use matrix_gateway::broadcast_messages::BroadcastMessage; use matrix_gateway::server::{api, web_ui}; +use matrix_gateway::sync_client; use matrix_gateway::user::UserConfig; #[tokio::main] @@ -24,7 +25,8 @@ async fn main() -> std::io::Result<()> { let (ws_tx, _) = tokio::sync::broadcast::channel::(16); - // TODO : spawn a tokio task to launch sync client + // Launch sync manager + tokio::spawn(sync_client::sync_client_manager(ws_tx.clone())); log::info!( "Starting to listen on {} for {}", diff --git a/src/server/api/ws.rs b/src/server/api/ws.rs index 474a90f..a6ae13e 100644 --- a/src/server/api/ws.rs +++ b/src/server/api/ws.rs @@ -1,3 +1,4 @@ +use crate::broadcast_messages::BroadcastMessage; use crate::constants::{WS_CLIENT_TIMEOUT, WS_HEARTBEAT_INTERVAL}; use crate::extractors::client_auth::APIClientAuth; use crate::server::HttpResult; @@ -10,9 +11,6 @@ use tokio::select; use tokio::sync::broadcast; use tokio::sync::broadcast::Receiver; use tokio::time::interval; -use crate::broadcast_messages::BroadcastMessage; - - /// Main WS route pub async fn ws( @@ -25,6 +23,13 @@ pub async fn ws( let (res, session, msg_stream) = actix_ws::handle(&req, stream)?; + // Ask for sync client to be started + if let Err(e) = tx.send(BroadcastMessage::StartSyncTaskForUser( + auth.user.user_id.clone(), + )) { + log::error!("Failed to send StartSyncTaskForUser: {}", e); + } + let rx = tx.subscribe(); // spawn websocket handler (and don't await it) so that the response is returned immediately diff --git a/src/server/web_ui.rs b/src/server/web_ui.rs index 4c88172..566f3fb 100644 --- a/src/server/web_ui.rs +++ b/src/server/web_ui.rs @@ -1,4 +1,5 @@ use crate::app_config::AppConfig; +use crate::broadcast_messages::BroadcastMessage; use crate::constants::{STATE_KEY, USER_SESSION_KEY}; use crate::server::{HttpFailure, HttpResult}; use crate::user::{APIClient, APIClientID, User, UserConfig, UserID}; @@ -10,7 +11,6 @@ use ipnet::IpNet; use light_openid::primitives::OpenIDConfig; use std::str::FromStr; use tokio::sync::broadcast; -use crate::broadcast_messages::BroadcastMessage; /// Static assets #[derive(rust_embed::Embed)] @@ -103,7 +103,10 @@ pub async fn home( config.save().await?; success_message = Some("Matrix token was successfully updated!".to_string()); - // TODO : stop user sync thread + // Close sync task + if let Err(e) = tx.send(BroadcastMessage::StopSyncTaskForUser(user.id.clone())) { + log::error!("Failed to send StopSyncClientForUser: {}", e); + } // Invalidate all Ws connections if let Err(e) = tx.send(BroadcastMessage::CloseAllUserSessions(user.id.clone())) { diff --git a/src/sync_client.rs b/src/sync_client.rs index a467e94..89ff738 100644 --- a/src/sync_client.rs +++ b/src/sync_client.rs @@ -1,3 +1,28 @@ +use crate::broadcast_messages::BroadcastMessage; +use tokio::sync::broadcast; + /// ID of sync client #[derive(Debug, Clone)] -pub struct SyncClientID(uuid::Uuid); \ No newline at end of file +pub struct SyncClientID(uuid::Uuid); + +/// Sync client launcher loop +pub async fn sync_client_manager(tx: broadcast::Sender) { + let mut rx = tx.subscribe(); + + while let Ok(msg) = rx.recv().await { + match msg { + BroadcastMessage::StopSyncTaskForUser(user_id) => { + log::info!("Stop sync task for user {:?}", user_id); + // TODO + } + BroadcastMessage::StartSyncTaskForUser(user_id) => { + log::info!("Start sync task for user {:?}", user_id); + // TODO + } + + _ => {} + } + } + + panic!("Sync client manager stopped unexpectedly!"); +}