use crate::broadcast_messages::{BroadcastMessage, SyncEvent}; use crate::user::{UserConfig, UserID}; use futures_util::TryStreamExt; use ruma::api::client::sync::sync_events; use ruma::assign; use ruma::presence::PresenceState; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; /// ID of sync client #[derive(Debug, Clone, Eq, PartialEq)] pub struct SyncClientID(uuid::Uuid); /// Sync client launcher loop pub async fn sync_client_manager(tx: broadcast::Sender) -> ! { let mut rx = tx.subscribe(); let tx = Arc::new(tx.clone()); let mut running_tasks = HashMap::new(); while let Ok(msg) = rx.recv().await { match msg { BroadcastMessage::StartSyncTaskForUser(user_id) => { if running_tasks.contains_key(&user_id) { log::info!("Won't start sync task for user {user_id:?} because a task is already running for this user!"); continue; } log::info!("Start sync task for user {:?}", user_id); let task_id = SyncClientID(uuid::Uuid::new_v4()); running_tasks.insert(user_id.clone(), task_id.clone()); let tx = tx.clone(); tokio::task::spawn(async move { sync_task(task_id, user_id, tx).await; }); } BroadcastMessage::StopSyncTaskForUser(user_id) => { // Check if a task is running for this user if let Some(task_id) = running_tasks.remove(&user_id) { log::info!("Stop sync task for user {:?}", user_id); tx.send(BroadcastMessage::StopSyncClient(task_id)).unwrap(); } else { log::info!("Not stopping sync task for user {user_id:?}: not running"); } } _ => {} } } panic!("Sync client manager stopped unexpectedly!"); } /// Sync task for a single user async fn sync_task( id: SyncClientID, user_id: UserID, tx: Arc>, ) { let mut rx = tx.subscribe(); let Ok(user_config) = UserConfig::load(&user_id, false).await else { log::error!("Failed to load user config in sync thread!"); return; }; let client = match user_config.matrix_client().await { Err(e) => { log::error!("Failed to load matrix client for user {user_id:?}: {e}"); return; } Ok(client) => client, }; let initial_sync_response = match client .send_request(assign!(sync_events::v3::Request::new(), { filter: None, })) .await { Ok(res) => res, Err(e) => { log::error!("Failed to perform initial sync request for user {user_id:?}! {e}"); return; } }; let mut sync_stream = Box::pin(client.sync( None, initial_sync_response.next_batch, PresenceState::Offline, Some(Duration::from_secs(30)), )); loop { tokio::select! { // Message from tokio broadcast msg = rx.recv() => { match msg { Ok(BroadcastMessage::StopSyncClient(client_id)) => { if client_id == id { log::info!("A request was received to stop this client! {id:?} for user {user_id:?}"); break; } } Err(e) => { log::error!("Failed to receive a message from broadcast! {e}"); return; } Ok(_) => {} } } // Message from Matrix msg_stream = sync_stream.try_next() => { match msg_stream { Ok(Some(msg)) => { log::debug!("Received new message from Matrix: {msg:#?}"); if let Err(e) = tx.send(BroadcastMessage::SyncEvent(user_id.clone(), SyncEvent { rooms: msg.rooms,presence: msg.presence, account_data: msg.account_data, to_device: msg.to_device, device_lists: msg.device_lists, })) { log::error!("Failed to propagate event! {e}"); } } Ok(None) => { log::debug!("Received no message from Matrix"); } Err(e) => { log::error!("Failed to receive a message from Matrix! {e}"); return; } } } } } }