Refactor messages propagation

This commit is contained in:
2025-11-21 10:30:48 +01:00
parent 751e3b8654
commit 8d2cea5f82
5 changed files with 65 additions and 68 deletions

View File

@@ -3064,7 +3064,6 @@ dependencies = [
"thiserror 2.0.17", "thiserror 2.0.17",
"tokio", "tokio",
"url", "url",
"urlencoding",
"uuid", "uuid",
] ]

View File

@@ -18,7 +18,6 @@ actix-cors = "0.7.1"
light-openid = "1.0.4" light-openid = "1.0.4"
bytes = "1.10.1" bytes = "1.10.1"
sha2 = "0.10.9" sha2 = "0.10.9"
urlencoding = "2.1.3"
base16ct = { version = "0.3.0", features = ["alloc"] } base16ct = { version = "0.3.0", features = ["alloc"] }
futures-util = "0.3.31" futures-util = "0.3.31"
jwt-simple = { version = "0.12.13", default-features = false, features = ["pure-rust"] } jwt-simple = { version = "0.12.13", default-features = false, features = ["pure-rust"] }
@@ -28,7 +27,7 @@ ipnet = { version = "2.11.0", features = ["serde"] }
rand = "0.9.2" rand = "0.9.2"
hex = "0.4.3" hex = "0.4.3"
mailchecker = "6.0.19" mailchecker = "6.0.19"
matrix-sdk = "0.14.0" matrix-sdk = { version = "0.14.0" }
url = "2.5.7" url = "2.5.7"
ractor = "0.15.9" ractor = "0.15.9"
serde_json = "1.0.145" serde_json = "1.0.145"

View File

@@ -8,6 +8,13 @@ use matrix_sdk::sync::SyncResponse;
pub type BroadcastSender = tokio::sync::broadcast::Sender<BroadcastMessage>; pub type BroadcastSender = tokio::sync::broadcast::Sender<BroadcastMessage>;
#[derive(Debug, Clone)]
pub struct BxRoomEvent<E> {
pub user: UserEmail,
pub event: Box<E>,
pub room: Room,
}
/// Broadcast messages /// Broadcast messages
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum BroadcastMessage { pub enum BroadcastMessage {
@@ -20,23 +27,11 @@ pub enum BroadcastMessage {
/// Matrix sync thread has been interrupted /// Matrix sync thread has been interrupted
SyncThreadStopped(MatrixSyncTaskID), SyncThreadStopped(MatrixSyncTaskID),
/// New room message /// New room message
RoomMessageEvent { RoomMessageEvent(BxRoomEvent<OriginalSyncRoomMessageEvent>),
user: UserEmail,
event: Box<OriginalSyncRoomMessageEvent>,
room: Room,
},
/// New reaction message /// New reaction message
ReactionEvent { ReactionEvent(BxRoomEvent<OriginalSyncReactionEvent>),
user: UserEmail,
event: Box<OriginalSyncReactionEvent>,
room: Room,
},
/// New room redaction /// New room redaction
RoomRedactionEvent { RoomRedactionEvent(BxRoomEvent<OriginalSyncRoomRedactionEvent>),
user: UserEmail,
event: Box<OriginalSyncRoomRedactionEvent>,
room: Room,
},
/// Raw Matrix sync response /// Raw Matrix sync response
MatrixSyncResponse { user: UserEmail, sync: SyncResponse }, MatrixSyncResponse { user: UserEmail, sync: SyncResponse },
} }

View File

@@ -5,6 +5,7 @@ use crate::extractors::auth_extractor::{AuthExtractor, AuthenticatedMethod};
use crate::extractors::matrix_client_extractor::MatrixClientExtractor; use crate::extractors::matrix_client_extractor::MatrixClientExtractor;
use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::matrix_client::MatrixClient;
use crate::matrix_connection::matrix_manager::MatrixManagerMsg; use crate::matrix_connection::matrix_manager::MatrixManagerMsg;
use crate::users::UserEmail;
use actix_web::dev::Payload; use actix_web::dev::Payload;
use actix_web::{FromRequest, HttpRequest, HttpResponse, web}; use actix_web::{FromRequest, HttpRequest, HttpResponse, web};
use actix_ws::Message; use actix_ws::Message;
@@ -19,27 +20,50 @@ use tokio::sync::broadcast;
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Receiver;
use tokio::time::interval; use tokio::time::interval;
#[derive(Debug, serde::Serialize)]
pub struct WsRoomEvent<E> {
pub event: Box<E>,
pub room_id: OwnedRoomId,
}
/// Messages sent to the client /// Messages sent to the client
#[derive(Debug, serde::Serialize)] #[derive(Debug, serde::Serialize)]
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum WsMessage { pub enum WsMessage {
/// Room message event /// Room message event
RoomMessageEvent { RoomMessageEvent(WsRoomEvent<RoomMessageEventContent>),
event: Box<RoomMessageEventContent>,
room_id: OwnedRoomId,
},
/// Room reaction event /// Room reaction event
RoomReactionEvent { RoomReactionEvent(WsRoomEvent<ReactionEventContent>),
event: Box<ReactionEventContent>,
room_id: OwnedRoomId,
},
/// Room reaction event /// Room reaction event
RoomRedactionEvent { RoomRedactionEvent(WsRoomEvent<RoomRedactionEventContent>),
event: Box<RoomRedactionEventContent>, }
room_id: OwnedRoomId,
}, impl WsMessage {
pub fn from_bx_message(msg: &BroadcastMessage, user: &UserEmail) -> Option<Self> {
match msg {
BroadcastMessage::RoomMessageEvent(evt) if &evt.user == user => {
Some(Self::RoomMessageEvent(WsRoomEvent {
event: Box::new(evt.event.content.clone()),
room_id: evt.room.room_id().to_owned(),
}))
}
BroadcastMessage::ReactionEvent(evt) if &evt.user == user => {
Some(Self::RoomReactionEvent(WsRoomEvent {
event: Box::new(evt.event.content.clone()),
room_id: evt.room.room_id().to_owned(),
}))
}
BroadcastMessage::RoomRedactionEvent(evt) if &evt.user == user => {
Some(Self::RoomRedactionEvent(WsRoomEvent {
event: Box::new(evt.event.content.clone()),
room_id: evt.room.room_id().to_owned(),
}))
}
_ => None,
}
}
} }
/// Main WS route /// Main WS route
@@ -108,8 +132,8 @@ pub async fn ws_handler(
Err(broadcast::error::RecvError::Lagged(_)) => continue, Err(broadcast::error::RecvError::Lagged(_)) => continue,
}; };
match msg { match (&msg, WsMessage::from_bx_message(&msg, &auth.user.email)) {
BroadcastMessage::APITokenDeleted(t) => { (BroadcastMessage::APITokenDeleted(t), _) => {
match &auth.method{ match &auth.method{
AuthenticatedMethod::Token(tok) if tok.id == t.id => { AuthenticatedMethod::Token(tok) if tok.id == t.id => {
log::info!( log::info!(
@@ -123,39 +147,17 @@ pub async fn ws_handler(
} }
}, },
BroadcastMessage::UserDisconnectedFromMatrix(mail) if mail == auth.user.email => { (BroadcastMessage::UserDisconnectedFromMatrix(mail), _) if mail == &auth.user.email => {
log::info!( log::info!(
"closing WS session of user {mail:?} as user was disconnected from Matrix" "closing WS session of user {mail:?} as user was disconnected from Matrix"
); );
break None; break None;
} }
BroadcastMessage::RoomMessageEvent{user, event, room} if user == auth.user.email => { (_, Some(message)) => {
// Send the message to the websocket // Send the message to the websocket
if let Ok(msg) = serde_json::to_string(&WsMessage::RoomMessageEvent { if let Ok(msg) = serde_json::to_string(&message)
event: Box::new(event.content), && let Err(e) = session.text(msg).await {
room_id: room.room_id().to_owned(),
}) && let Err(e) = session.text(msg).await {
log::error!("Failed to send SyncEvent: {e}");
}
}
BroadcastMessage::ReactionEvent{user, event, room} if user == auth.user.email => {
// Send the message to the websocket
if let Ok(msg) = serde_json::to_string(&WsMessage::RoomReactionEvent {
event: Box::new(event.content),
room_id: room.room_id().to_owned(),
}) && let Err(e) = session.text(msg).await {
log::error!("Failed to send SyncEvent: {e}");
}
}
BroadcastMessage::RoomRedactionEvent{user, event, room} if user == auth.user.email => {
// Send the message to the websocket
if let Ok(msg) = serde_json::to_string(&WsMessage::RoomRedactionEvent {
event: Box::new(event.content),
room_id: room.room_id().to_owned(),
}) && let Err(e) = session.text(msg).await {
log::error!("Failed to send SyncEvent: {e}"); log::error!("Failed to send SyncEvent: {e}");
} }
} }

View File

@@ -2,7 +2,7 @@
//! //!
//! This file contains the logic performed by the threads that synchronize with Matrix account. //! This file contains the logic performed by the threads that synchronize with Matrix account.
use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; use crate::broadcast_messages::{BroadcastMessage, BroadcastSender, BxRoomEvent};
use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::matrix_client::MatrixClient;
use crate::matrix_connection::matrix_manager::MatrixManagerMsg; use crate::matrix_connection::matrix_manager::MatrixManagerMsg;
use futures_util::StreamExt; use futures_util::StreamExt;
@@ -56,11 +56,11 @@ async fn sync_thread_task(
let user_msg_handle = client.email.clone(); let user_msg_handle = client.email.clone();
handlers.push(client.add_event_handler( handlers.push(client.add_event_handler(
async move |event: OriginalSyncRoomMessageEvent, room: Room| { async move |event: OriginalSyncRoomMessageEvent, room: Room| {
if let Err(e) = tx_msg_handle.send(BroadcastMessage::RoomMessageEvent { if let Err(e) = tx_msg_handle.send(BroadcastMessage::RoomMessageEvent(BxRoomEvent {
user: user_msg_handle.clone(), user: user_msg_handle.clone(),
event: Box::new(event), event: Box::new(event),
room, room,
}) { })) {
log::warn!("Failed to forward room message event! {e}"); log::warn!("Failed to forward room message event! {e}");
} }
}, },
@@ -70,11 +70,11 @@ async fn sync_thread_task(
let user_reac_handle = client.email.clone(); let user_reac_handle = client.email.clone();
handlers.push(client.add_event_handler( handlers.push(client.add_event_handler(
async move |event: OriginalSyncReactionEvent, room: Room| { async move |event: OriginalSyncReactionEvent, room: Room| {
if let Err(e) = tx_reac_handle.send(BroadcastMessage::ReactionEvent { if let Err(e) = tx_reac_handle.send(BroadcastMessage::ReactionEvent(BxRoomEvent {
user: user_reac_handle.clone(), user: user_reac_handle.clone(),
event: Box::new(event), event: Box::new(event),
room, room,
}) { })) {
log::warn!("Failed to forward reaction event! {e}"); log::warn!("Failed to forward reaction event! {e}");
} }
}, },
@@ -84,11 +84,13 @@ async fn sync_thread_task(
let user_redac_handle = client.email.clone(); let user_redac_handle = client.email.clone();
handlers.push(client.add_event_handler( handlers.push(client.add_event_handler(
async move |event: OriginalSyncRoomRedactionEvent, room: Room| { async move |event: OriginalSyncRoomRedactionEvent, room: Room| {
if let Err(e) = tx_redac_handle.send(BroadcastMessage::RoomRedactionEvent { if let Err(e) =
tx_redac_handle.send(BroadcastMessage::RoomRedactionEvent(BxRoomEvent {
user: user_redac_handle.clone(), user: user_redac_handle.clone(),
event: Box::new(event), event: Box::new(event),
room, room,
}) { }))
{
log::warn!("Failed to forward reaction event! {e}"); log::warn!("Failed to forward reaction event! {e}");
} }
}, },