//! # Matrix sync thread //! //! This file contains the logic performed by the threads that synchronize with Matrix account. use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::matrix_manager::MatrixManagerMsg; use futures_util::StreamExt; use matrix_sdk::Room; use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent; use ractor::ActorRef; #[derive(Clone, Debug, Eq, PartialEq)] pub struct MatrixSyncTaskID(uuid::Uuid); /// Start synchronization thread for a given user pub async fn start_sync_thread( client: MatrixClient, tx: BroadcastSender, manager: ActorRef, ) -> anyhow::Result { // Perform initial synchronization here, so in case of error the sync task does not get registered log::info!("Perform initial synchronization..."); if let Err(e) = client.perform_initial_sync().await { log::error!("Failed to perform initial Matrix synchronization! {e:?}"); return Err(e); } let task_id = MatrixSyncTaskID(uuid::Uuid::new_v4()); let task_id_clone = task_id.clone(); tokio::task::spawn(async move { sync_thread_task(task_id_clone, client, tx, manager).await; }); Ok(task_id) } /// Sync thread function for a single function async fn sync_thread_task( id: MatrixSyncTaskID, client: MatrixClient, tx: BroadcastSender, manager: ActorRef, ) { let mut rx = tx.subscribe(); log::info!("Sync thread {id:?} started for user {:?}", client.email); let mut sync_stream = client.sync_stream().await; let tx_msg_handle = tx.clone(); let user = client.email.clone(); let room_message_handle = client.add_event_handler( async move |event: OriginalSyncRoomMessageEvent, room: Room| { if let Err(e) = tx_msg_handle.send(BroadcastMessage::RoomMessageEvent { user: user.clone(), event: Box::new(event), room, }) { log::warn!("Failed to forward room event! {e}"); } }, ); loop { tokio::select! { // Message from tokio broadcast msg = rx.recv() => { match msg { Ok(BroadcastMessage::StopSyncThread(task_id)) if task_id == id => { log::info!("A request was received to stop sync task! {id:?} for user {:?}", client.email); break; } Err(e) => { log::error!("Failed to receive a message from broadcast! {e}"); break; } Ok(_) => {} } } res = sync_stream.next() => { let Some(res)= res else { log::error!("No more Matrix event to process, stopping now..."); break; }; // Forward message match res { Ok(res) => { if let Err(e)= tx.send(BroadcastMessage::MatrixSyncResponse { user: client.email.clone(), sync: res }) { log::warn!("Failed to forward room event! {e}"); } } Err(e) => { log::error!("Sync error for user {:?}! {e}", client.email); } } } } } client.remove_event_handler(room_message_handle); // Notify manager about termination, so this thread can be removed from the list log::info!("Sync thread {id:?} terminated!"); if let Err(e) = ractor::cast!( manager, MatrixManagerMsg::SyncThreadTerminated(client.email.clone(), id.clone()) ) { log::error!("Failed to notify Matrix manager about thread termination! {e}"); } if let Err(e) = tx.send(BroadcastMessage::SyncThreadStopped(id)) { log::warn!("Failed to notify that synchronization thread has been interrupted! {e}") } }