diff --git a/matrixgw_backend/src/broadcast_messages.rs b/matrixgw_backend/src/broadcast_messages.rs index b5f9407..5c8b9bd 100644 --- a/matrixgw_backend/src/broadcast_messages.rs +++ b/matrixgw_backend/src/broadcast_messages.rs @@ -1,7 +1,9 @@ use crate::matrix_connection::sync_thread::MatrixSyncTaskID; use crate::users::{APIToken, UserEmail}; use matrix_sdk::Room; +use matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent; use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent; +use matrix_sdk::ruma::events::room::redaction::OriginalSyncRoomRedactionEvent; use matrix_sdk::sync::SyncResponse; pub type BroadcastSender = tokio::sync::broadcast::Sender; @@ -23,6 +25,18 @@ pub enum BroadcastMessage { event: Box, room: Room, }, + /// New reaction message + ReactionEvent { + user: UserEmail, + event: Box, + room: Room, + }, + /// New room redaction + RoomRedactionEvent { + user: UserEmail, + event: Box, + room: Room, + }, /// Raw Matrix sync response MatrixSyncResponse { user: UserEmail, sync: SyncResponse }, } diff --git a/matrixgw_backend/src/controllers/ws_controller.rs b/matrixgw_backend/src/controllers/ws_controller.rs index c9ae359..5ed98e2 100644 --- a/matrixgw_backend/src/controllers/ws_controller.rs +++ b/matrixgw_backend/src/controllers/ws_controller.rs @@ -10,7 +10,9 @@ use actix_web::{FromRequest, HttpRequest, HttpResponse, web}; use actix_ws::Message; use futures_util::StreamExt; use matrix_sdk::ruma::OwnedRoomId; +use matrix_sdk::ruma::events::reaction::ReactionEventContent; use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; +use matrix_sdk::ruma::events::room::redaction::RoomRedactionEventContent; use ractor::ActorRef; use std::time::Instant; use tokio::sync::broadcast; @@ -23,7 +25,19 @@ use tokio::time::interval; pub enum WsMessage { /// Room message event RoomMessageEvent { - event: RoomMessageEventContent, + event: Box, + room_id: OwnedRoomId, + }, + + /// Room reaction event + RoomReactionEvent { + event: Box, + room_id: OwnedRoomId, + }, + + /// Room reaction event + RoomRedactionEvent { + event: Box, room_id: OwnedRoomId, }, } @@ -119,12 +133,33 @@ pub async fn ws_handler( BroadcastMessage::RoomMessageEvent{user, event, room} if user == auth.user.email => { // Send the message to the websocket if let Ok(msg) = serde_json::to_string(&WsMessage::RoomMessageEvent { - event:event.content, + event: Box::new(event.content), room_id: room.room_id().to_owned(), }) && let Err(e) = session.text(msg).await { log::error!("Failed to send SyncEvent: {e}"); } } + + BroadcastMessage::ReactionEvent{user, event, room} if user == auth.user.email => { + // Send the message to the websocket + if let Ok(msg) = serde_json::to_string(&WsMessage::RoomReactionEvent { + event: Box::new(event.content), + room_id: room.room_id().to_owned(), + }) && let Err(e) = session.text(msg).await { + log::error!("Failed to send SyncEvent: {e}"); + } + } + + BroadcastMessage::RoomRedactionEvent{user, event, room} if user == auth.user.email => { + // Send the message to the websocket + if let Ok(msg) = serde_json::to_string(&WsMessage::RoomRedactionEvent { + event: Box::new(event.content), + room_id: room.room_id().to_owned(), + }) && let Err(e) = session.text(msg).await { + log::error!("Failed to send SyncEvent: {e}"); + } + } + _ => {} }; diff --git a/matrixgw_backend/src/matrix_connection/matrix_client.rs b/matrixgw_backend/src/matrix_connection/matrix_client.rs index eb7702e..488773b 100644 --- a/matrixgw_backend/src/matrix_connection/matrix_client.rs +++ b/matrixgw_backend/src/matrix_connection/matrix_client.rs @@ -167,6 +167,9 @@ impl MatrixClient { .encryption() .wait_for_e2ee_initialization_tasks() .await; + + // Save stored session once + client.save_stored_session().await?; } // Automatically save session when token gets refreshed diff --git a/matrixgw_backend/src/matrix_connection/sync_thread.rs b/matrixgw_backend/src/matrix_connection/sync_thread.rs index 223fbc3..2bfb397 100644 --- a/matrixgw_backend/src/matrix_connection/sync_thread.rs +++ b/matrixgw_backend/src/matrix_connection/sync_thread.rs @@ -7,7 +7,9 @@ use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::matrix_manager::MatrixManagerMsg; use futures_util::StreamExt; use matrix_sdk::Room; +use matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent; use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent; +use matrix_sdk::ruma::events::room::redaction::OriginalSyncRoomRedactionEvent; use ractor::ActorRef; #[derive(Clone, Debug, Eq, PartialEq)] @@ -48,19 +50,49 @@ async fn sync_thread_task( let mut sync_stream = client.sync_stream().await; + let mut handlers = vec![]; + let tx_msg_handle = tx.clone(); - let user = client.email.clone(); - let room_message_handle = client.add_event_handler( + let user_msg_handle = client.email.clone(); + handlers.push(client.add_event_handler( async move |event: OriginalSyncRoomMessageEvent, room: Room| { if let Err(e) = tx_msg_handle.send(BroadcastMessage::RoomMessageEvent { - user: user.clone(), + user: user_msg_handle.clone(), event: Box::new(event), room, }) { - log::warn!("Failed to forward room event! {e}"); + log::warn!("Failed to forward room message event! {e}"); } }, - ); + )); + + let tx_reac_handle = tx.clone(); + let user_reac_handle = client.email.clone(); + handlers.push(client.add_event_handler( + async move |event: OriginalSyncReactionEvent, room: Room| { + if let Err(e) = tx_reac_handle.send(BroadcastMessage::ReactionEvent { + user: user_reac_handle.clone(), + event: Box::new(event), + room, + }) { + log::warn!("Failed to forward reaction event! {e}"); + } + }, + )); + + let tx_redac_handle = tx.clone(); + let user_redac_handle = client.email.clone(); + handlers.push(client.add_event_handler( + async move |event: OriginalSyncRoomRedactionEvent, room: Room| { + if let Err(e) = tx_redac_handle.send(BroadcastMessage::RoomRedactionEvent { + user: user_redac_handle.clone(), + event: Box::new(event), + room, + }) { + log::warn!("Failed to forward reaction event! {e}"); + } + }, + )); loop { tokio::select! { @@ -103,7 +135,9 @@ async fn sync_thread_task( } } - client.remove_event_handler(room_message_handle); + for h in handlers { + client.remove_event_handler(h); + } // Notify manager about termination, so this thread can be removed from the list log::info!("Sync thread {id:?} terminated!");