use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::sync_thread::{MatrixSyncTaskID, start_sync_thread}; use crate::users::UserEmail; use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; use std::collections::HashMap; pub struct MatrixManagerState { pub broadcast_sender: BroadcastSender, pub clients: HashMap, pub running_sync_threads: HashMap, } pub enum MatrixManagerMsg { GetClient(UserEmail, RpcReplyPort>), DisconnectClient(UserEmail), StartSyncThread(UserEmail), StopSyncThread(UserEmail), SyncThreadGetStatus(UserEmail, RpcReplyPort), SyncThreadTerminated(UserEmail, MatrixSyncTaskID), } pub struct MatrixManagerActor; impl Actor for MatrixManagerActor { type Msg = MatrixManagerMsg; type State = MatrixManagerState; type Arguments = BroadcastSender; async fn pre_start( &self, _myself: ActorRef, args: Self::Arguments, ) -> Result { Ok(MatrixManagerState { broadcast_sender: args, clients: HashMap::new(), running_sync_threads: Default::default(), }) } async fn post_stop( &self, _myself: ActorRef, _state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { log::error!("[!] [!] Matrix Manager Actor stopped!"); Ok(()) } async fn handle( &self, myself: ActorRef, message: Self::Msg, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { match message { // Get client information MatrixManagerMsg::GetClient(email, port) => { let res = port.send(match state.clients.get(&email) { None => { // Generate client if required log::info!("Building new client for {:?}", &email); match MatrixClient::build_client(myself, &email).await { Ok(c) => { state.clients.insert(email.clone(), c.clone()); Ok(c) } Err(e) => Err(e), } } Some(c) => Ok(c.clone()), }); if let Err(e) = res { log::warn!("Failed to send client information: {e}") } } MatrixManagerMsg::DisconnectClient(email) => { if let Some(c) = state.clients.remove(&email) { // Stop sync thread (if running) if let Some(id) = state.running_sync_threads.remove(&email) { state .broadcast_sender .send(BroadcastMessage::StopSyncThread(id)) .ok(); } // Disconnect client if let Err(e) = c.disconnect().await { log::error!("Failed to disconnect client: {e}"); } if let Err(e) = state .broadcast_sender .send(BroadcastMessage::UserDisconnectedFromMatrix(email)) { log::warn!( "Failed to notify that user has been disconnected from Matrix! {e}" ); } } } MatrixManagerMsg::StartSyncThread(email) => { // Do nothing if task is already running if state.running_sync_threads.contains_key(&email) { log::debug!("Not starting sync thread for {email:?} as it is already running"); return Ok(()); } let Some(client) = state.clients.get(&email) else { log::warn!( "Cannot start sync thread for {email:?} because client is not initialized!" ); return Ok(()); }; if !client.is_client_connected() { log::warn!( "Cannot start sync thread for {email:?} because Matrix account is not set!" ); return Ok(()); } // Start thread log::debug!("Starting sync thread for {email:?}"); let thread_id = match start_sync_thread(client.clone(), state.broadcast_sender.clone(), myself) .await { Ok(thread_id) => thread_id, Err(e) => { log::error!("Failed to start sync thread! {e}"); return Ok(()); } }; state.running_sync_threads.insert(email, thread_id); } MatrixManagerMsg::StopSyncThread(email) => { if let Some(thread_id) = state.running_sync_threads.get(&email) && let Err(e) = state .broadcast_sender .send(BroadcastMessage::StopSyncThread(thread_id.clone())) { log::error!("Failed to request sync thread stop: {e}"); } } MatrixManagerMsg::SyncThreadGetStatus(email, reply) => { let started = state.running_sync_threads.contains_key(&email); if let Err(e) = reply.send(started) { log::error!("Failed to send sync thread status! {e}"); } } MatrixManagerMsg::SyncThreadTerminated(email, task_id) => { if state.running_sync_threads.get(&email) == Some(&task_id) { log::info!( "Sync thread {task_id:?} has been terminated, removing it from the list..." ); state.running_sync_threads.remove(&email); } } } Ok(()) } }