diff --git a/matrixgw_backend/Cargo.lock b/matrixgw_backend/Cargo.lock index 46d938c..9662dae 100644 --- a/matrixgw_backend/Cargo.lock +++ b/matrixgw_backend/Cargo.lock @@ -3064,7 +3064,6 @@ dependencies = [ "thiserror 2.0.17", "tokio", "url", - "urlencoding", "uuid", ] diff --git a/matrixgw_backend/Cargo.toml b/matrixgw_backend/Cargo.toml index f3b32d1..d146f47 100644 --- a/matrixgw_backend/Cargo.toml +++ b/matrixgw_backend/Cargo.toml @@ -18,7 +18,6 @@ actix-cors = "0.7.1" light-openid = "1.0.4" bytes = "1.10.1" sha2 = "0.10.9" -urlencoding = "2.1.3" base16ct = { version = "0.3.0", features = ["alloc"] } futures-util = "0.3.31" jwt-simple = { version = "0.12.13", default-features = false, features = ["pure-rust"] } @@ -28,7 +27,7 @@ ipnet = { version = "2.11.0", features = ["serde"] } rand = "0.9.2" hex = "0.4.3" mailchecker = "6.0.19" -matrix-sdk = "0.14.0" +matrix-sdk = { version = "0.14.0" } url = "2.5.7" ractor = "0.15.9" serde_json = "1.0.145" diff --git a/matrixgw_backend/src/broadcast_messages.rs b/matrixgw_backend/src/broadcast_messages.rs index 5c8b9bd..3294449 100644 --- a/matrixgw_backend/src/broadcast_messages.rs +++ b/matrixgw_backend/src/broadcast_messages.rs @@ -8,6 +8,13 @@ use matrix_sdk::sync::SyncResponse; pub type BroadcastSender = tokio::sync::broadcast::Sender; +#[derive(Debug, Clone)] +pub struct BxRoomEvent { + pub user: UserEmail, + pub event: Box, + pub room: Room, +} + /// Broadcast messages #[derive(Debug, Clone)] pub enum BroadcastMessage { @@ -20,23 +27,11 @@ pub enum BroadcastMessage { /// Matrix sync thread has been interrupted SyncThreadStopped(MatrixSyncTaskID), /// New room message - RoomMessageEvent { - user: UserEmail, - event: Box, - room: Room, - }, + RoomMessageEvent(BxRoomEvent), /// New reaction message - ReactionEvent { - user: UserEmail, - event: Box, - room: Room, - }, + ReactionEvent(BxRoomEvent), /// New room redaction - RoomRedactionEvent { - user: UserEmail, - event: Box, - room: Room, - }, + RoomRedactionEvent(BxRoomEvent), /// Raw Matrix sync response MatrixSyncResponse { user: UserEmail, sync: SyncResponse }, } diff --git a/matrixgw_backend/src/controllers/ws_controller.rs b/matrixgw_backend/src/controllers/ws_controller.rs index 5ed98e2..c516737 100644 --- a/matrixgw_backend/src/controllers/ws_controller.rs +++ b/matrixgw_backend/src/controllers/ws_controller.rs @@ -5,6 +5,7 @@ use crate::extractors::auth_extractor::{AuthExtractor, AuthenticatedMethod}; use crate::extractors::matrix_client_extractor::MatrixClientExtractor; use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::matrix_manager::MatrixManagerMsg; +use crate::users::UserEmail; use actix_web::dev::Payload; use actix_web::{FromRequest, HttpRequest, HttpResponse, web}; use actix_ws::Message; @@ -19,27 +20,50 @@ use tokio::sync::broadcast; use tokio::sync::broadcast::Receiver; use tokio::time::interval; +#[derive(Debug, serde::Serialize)] +pub struct WsRoomEvent { + pub event: Box, + pub room_id: OwnedRoomId, +} + /// Messages sent to the client #[derive(Debug, serde::Serialize)] #[serde(tag = "type")] pub enum WsMessage { /// Room message event - RoomMessageEvent { - event: Box, - room_id: OwnedRoomId, - }, + RoomMessageEvent(WsRoomEvent), /// Room reaction event - RoomReactionEvent { - event: Box, - room_id: OwnedRoomId, - }, + RoomReactionEvent(WsRoomEvent), /// Room reaction event - RoomRedactionEvent { - event: Box, - room_id: OwnedRoomId, - }, + RoomRedactionEvent(WsRoomEvent), +} + +impl WsMessage { + pub fn from_bx_message(msg: &BroadcastMessage, user: &UserEmail) -> Option { + match msg { + BroadcastMessage::RoomMessageEvent(evt) if &evt.user == user => { + Some(Self::RoomMessageEvent(WsRoomEvent { + event: Box::new(evt.event.content.clone()), + room_id: evt.room.room_id().to_owned(), + })) + } + BroadcastMessage::ReactionEvent(evt) if &evt.user == user => { + Some(Self::RoomReactionEvent(WsRoomEvent { + event: Box::new(evt.event.content.clone()), + room_id: evt.room.room_id().to_owned(), + })) + } + BroadcastMessage::RoomRedactionEvent(evt) if &evt.user == user => { + Some(Self::RoomRedactionEvent(WsRoomEvent { + event: Box::new(evt.event.content.clone()), + room_id: evt.room.room_id().to_owned(), + })) + } + _ => None, + } + } } /// Main WS route @@ -108,8 +132,8 @@ pub async fn ws_handler( Err(broadcast::error::RecvError::Lagged(_)) => continue, }; - match msg { - BroadcastMessage::APITokenDeleted(t) => { + match (&msg, WsMessage::from_bx_message(&msg, &auth.user.email)) { + (BroadcastMessage::APITokenDeleted(t), _) => { match &auth.method{ AuthenticatedMethod::Token(tok) if tok.id == t.id => { log::info!( @@ -123,39 +147,17 @@ pub async fn ws_handler( } }, - BroadcastMessage::UserDisconnectedFromMatrix(mail) if mail == auth.user.email => { + (BroadcastMessage::UserDisconnectedFromMatrix(mail), _) if mail == &auth.user.email => { log::info!( "closing WS session of user {mail:?} as user was disconnected from Matrix" ); break None; } - BroadcastMessage::RoomMessageEvent{user, event, room} if user == auth.user.email => { + (_, Some(message)) => { // Send the message to the websocket - if let Ok(msg) = serde_json::to_string(&WsMessage::RoomMessageEvent { - event: Box::new(event.content), - room_id: room.room_id().to_owned(), - }) && let Err(e) = session.text(msg).await { - log::error!("Failed to send SyncEvent: {e}"); - } - } - - BroadcastMessage::ReactionEvent{user, event, room} if user == auth.user.email => { - // Send the message to the websocket - if let Ok(msg) = serde_json::to_string(&WsMessage::RoomReactionEvent { - event: Box::new(event.content), - room_id: room.room_id().to_owned(), - }) && let Err(e) = session.text(msg).await { - log::error!("Failed to send SyncEvent: {e}"); - } - } - - BroadcastMessage::RoomRedactionEvent{user, event, room} if user == auth.user.email => { - // Send the message to the websocket - if let Ok(msg) = serde_json::to_string(&WsMessage::RoomRedactionEvent { - event: Box::new(event.content), - room_id: room.room_id().to_owned(), - }) && let Err(e) = session.text(msg).await { + if let Ok(msg) = serde_json::to_string(&message) + && let Err(e) = session.text(msg).await { log::error!("Failed to send SyncEvent: {e}"); } } diff --git a/matrixgw_backend/src/matrix_connection/sync_thread.rs b/matrixgw_backend/src/matrix_connection/sync_thread.rs index 2bfb397..4a32e61 100644 --- a/matrixgw_backend/src/matrix_connection/sync_thread.rs +++ b/matrixgw_backend/src/matrix_connection/sync_thread.rs @@ -2,7 +2,7 @@ //! //! This file contains the logic performed by the threads that synchronize with Matrix account. -use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; +use crate::broadcast_messages::{BroadcastMessage, BroadcastSender, BxRoomEvent}; use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::matrix_manager::MatrixManagerMsg; use futures_util::StreamExt; @@ -56,11 +56,11 @@ async fn sync_thread_task( let user_msg_handle = client.email.clone(); handlers.push(client.add_event_handler( async move |event: OriginalSyncRoomMessageEvent, room: Room| { - if let Err(e) = tx_msg_handle.send(BroadcastMessage::RoomMessageEvent { + if let Err(e) = tx_msg_handle.send(BroadcastMessage::RoomMessageEvent(BxRoomEvent { user: user_msg_handle.clone(), event: Box::new(event), room, - }) { + })) { log::warn!("Failed to forward room message event! {e}"); } }, @@ -70,11 +70,11 @@ async fn sync_thread_task( let user_reac_handle = client.email.clone(); handlers.push(client.add_event_handler( async move |event: OriginalSyncReactionEvent, room: Room| { - if let Err(e) = tx_reac_handle.send(BroadcastMessage::ReactionEvent { + if let Err(e) = tx_reac_handle.send(BroadcastMessage::ReactionEvent(BxRoomEvent { user: user_reac_handle.clone(), event: Box::new(event), room, - }) { + })) { log::warn!("Failed to forward reaction event! {e}"); } }, @@ -84,11 +84,13 @@ async fn sync_thread_task( let user_redac_handle = client.email.clone(); handlers.push(client.add_event_handler( async move |event: OriginalSyncRoomRedactionEvent, room: Room| { - if let Err(e) = tx_redac_handle.send(BroadcastMessage::RoomRedactionEvent { - user: user_redac_handle.clone(), - event: Box::new(event), - room, - }) { + if let Err(e) = + tx_redac_handle.send(BroadcastMessage::RoomRedactionEvent(BxRoomEvent { + user: user_redac_handle.clone(), + event: Box::new(event), + room, + })) + { log::warn!("Failed to forward reaction event! {e}"); } },