diff --git a/matrixgw_backend/src/broadcast_messages.rs b/matrixgw_backend/src/broadcast_messages.rs index 18328e1..b5f9407 100644 --- a/matrixgw_backend/src/broadcast_messages.rs +++ b/matrixgw_backend/src/broadcast_messages.rs @@ -1,5 +1,8 @@ use crate::matrix_connection::sync_thread::MatrixSyncTaskID; use crate::users::{APIToken, UserEmail}; +use matrix_sdk::Room; +use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent; +use matrix_sdk::sync::SyncResponse; pub type BroadcastSender = tokio::sync::broadcast::Sender; @@ -14,4 +17,12 @@ pub enum BroadcastMessage { StopSyncThread(MatrixSyncTaskID), /// Matrix sync thread has been interrupted SyncThreadStopped(MatrixSyncTaskID), + /// New room message + RoomMessageEvent { + user: UserEmail, + event: Box, + room: Room, + }, + /// Raw Matrix sync response + MatrixSyncResponse { user: UserEmail, sync: SyncResponse }, } diff --git a/matrixgw_backend/src/matrix_connection/sync_thread.rs b/matrixgw_backend/src/matrix_connection/sync_thread.rs index e8f07a8..223fbc3 100644 --- a/matrixgw_backend/src/matrix_connection/sync_thread.rs +++ b/matrixgw_backend/src/matrix_connection/sync_thread.rs @@ -6,6 +6,8 @@ use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::matrix_manager::MatrixManagerMsg; use futures_util::StreamExt; +use matrix_sdk::Room; +use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent; use ractor::ActorRef; #[derive(Clone, Debug, Eq, PartialEq)] @@ -17,6 +19,13 @@ pub async fn start_sync_thread( tx: BroadcastSender, manager: ActorRef, ) -> anyhow::Result { + // Perform initial synchronization here, so in case of error the sync task does not get registered + log::info!("Perform initial synchronization..."); + if let Err(e) = client.perform_initial_sync().await { + log::error!("Failed to perform initial Matrix synchronization! {e:?}"); + return Err(e); + } + let task_id = MatrixSyncTaskID(uuid::Uuid::new_v4()); let task_id_clone = task_id.clone(); @@ -35,18 +44,23 @@ async fn sync_thread_task( manager: ActorRef, ) { let mut rx = tx.subscribe(); - log::info!("Sync thread {id:?} started for user {:?}", client.email); - log::info!("Perform initial synchronization..."); - if let Err(e) = client.perform_initial_sync().await { - log::error!("Failed to perform initial Matrix synchronization! {e:?}"); - return; - } - let mut sync_stream = client.sync_stream().await; - //let room_message_handle = client.add_event_handler(); + let tx_msg_handle = tx.clone(); + let user = client.email.clone(); + let room_message_handle = client.add_event_handler( + async move |event: OriginalSyncRoomMessageEvent, room: Room| { + if let Err(e) = tx_msg_handle.send(BroadcastMessage::RoomMessageEvent { + user: user.clone(), + event: Box::new(event), + room, + }) { + log::warn!("Failed to forward room event! {e}"); + } + }, + ); loop { tokio::select! { @@ -65,18 +79,31 @@ async fn sync_thread_task( } } - evt = sync_stream.next() => { - let Some(evt)= evt else { + res = sync_stream.next() => { + let Some(res)= res else { log::error!("No more Matrix event to process, stopping now..."); break; }; - println!("Sync thread {id:?} event: {:?}", evt); + // Forward message + match res { + Ok(res) => { + if let Err(e)= tx.send(BroadcastMessage::MatrixSyncResponse { + user: client.email.clone(), + sync: res + }) { + log::warn!("Failed to forward room event! {e}"); + } + } + Err(e) => { + log::error!("Sync error for user {:?}! {e}", client.email); + } + } } } } - //client.remove_event_handler(room_message_handle); + client.remove_event_handler(room_message_handle); // Notify manager about termination, so this thread can be removed from the list log::info!("Sync thread {id:?} terminated!");