diff --git a/src/sync_client.rs b/src/sync_client.rs index 89ff738..dfe6c14 100644 --- a/src/sync_client.rs +++ b/src/sync_client.rs @@ -1,23 +1,46 @@ use crate::broadcast_messages::BroadcastMessage; +use crate::user::UserID; +use std::collections::HashMap; +use std::sync::Arc; use tokio::sync::broadcast; /// ID of sync client -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct SyncClientID(uuid::Uuid); /// Sync client launcher loop pub async fn sync_client_manager(tx: broadcast::Sender) { let mut rx = tx.subscribe(); + let tx = Arc::new(tx.clone()); + + let mut running_tasks = HashMap::new(); while let Ok(msg) = rx.recv().await { match msg { - BroadcastMessage::StopSyncTaskForUser(user_id) => { - log::info!("Stop sync task for user {:?}", user_id); - // TODO - } BroadcastMessage::StartSyncTaskForUser(user_id) => { + if running_tasks.contains_key(&user_id) { + log::info!("Won't start sync task for user {user_id:?} because a task is already running for this user!"); + continue; + } + log::info!("Start sync task for user {:?}", user_id); - // TODO + let task_id = SyncClientID(uuid::Uuid::new_v4()); + running_tasks.insert(user_id.clone(), task_id.clone()); + + let tx = tx.clone(); + tokio::task::spawn(async move { + sync_task(task_id, user_id, tx).await; + }); + } + + BroadcastMessage::StopSyncTaskForUser(user_id) => { + // Check if a task is running for this user + if let Some(task_id) = running_tasks.remove(&user_id) { + log::info!("Stop sync task for user {:?}", user_id); + tx.send(BroadcastMessage::StopSyncClient(task_id)).unwrap(); + } else { + log::info!("Not stopping sync task for user {user_id:?}: not running"); + } } _ => {} @@ -26,3 +49,37 @@ pub async fn sync_client_manager(tx: broadcast::Sender) { panic!("Sync client manager stopped unexpectedly!"); } + +/// Sync task for a single user +async fn sync_task( + id: SyncClientID, + user_id: UserID, + tx: Arc>, +) { + let mut rx = tx.subscribe(); + + loop { + tokio::select! { + // Message from tokio broadcast + msg = rx.recv() => { + match msg { + Ok(BroadcastMessage::StopSyncClient(client_id)) => { + if client_id == id { + log::info!("A request was received to stop this client! {id:?} for user {user_id:?}"); + break; + } + } + + Err(e) => { + log::error!("Failed to receive a message from broadcast! {e}"); + } + + Ok(_) => {} + } + } + + // Message from Elements + // TODO + } + } +} diff --git a/src/user.rs b/src/user.rs index 25005db..9a061f4 100644 --- a/src/user.rs +++ b/src/user.rs @@ -19,7 +19,7 @@ pub enum UserError { MissingMatrixToken, } -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, Hash)] pub struct UserID(pub String); impl UserID {