Redact more events

This commit is contained in:
2025-11-21 09:35:21 +01:00
parent 24f06a78a9
commit 751e3b8654
4 changed files with 94 additions and 8 deletions

View File

@@ -1,7 +1,9 @@
use crate::matrix_connection::sync_thread::MatrixSyncTaskID; use crate::matrix_connection::sync_thread::MatrixSyncTaskID;
use crate::users::{APIToken, UserEmail}; use crate::users::{APIToken, UserEmail};
use matrix_sdk::Room; use matrix_sdk::Room;
use matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent;
use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent; use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent;
use matrix_sdk::ruma::events::room::redaction::OriginalSyncRoomRedactionEvent;
use matrix_sdk::sync::SyncResponse; use matrix_sdk::sync::SyncResponse;
pub type BroadcastSender = tokio::sync::broadcast::Sender<BroadcastMessage>; pub type BroadcastSender = tokio::sync::broadcast::Sender<BroadcastMessage>;
@@ -23,6 +25,18 @@ pub enum BroadcastMessage {
event: Box<OriginalSyncRoomMessageEvent>, event: Box<OriginalSyncRoomMessageEvent>,
room: Room, room: Room,
}, },
/// New reaction message
ReactionEvent {
user: UserEmail,
event: Box<OriginalSyncReactionEvent>,
room: Room,
},
/// New room redaction
RoomRedactionEvent {
user: UserEmail,
event: Box<OriginalSyncRoomRedactionEvent>,
room: Room,
},
/// Raw Matrix sync response /// Raw Matrix sync response
MatrixSyncResponse { user: UserEmail, sync: SyncResponse }, MatrixSyncResponse { user: UserEmail, sync: SyncResponse },
} }

View File

@@ -10,7 +10,9 @@ use actix_web::{FromRequest, HttpRequest, HttpResponse, web};
use actix_ws::Message; use actix_ws::Message;
use futures_util::StreamExt; use futures_util::StreamExt;
use matrix_sdk::ruma::OwnedRoomId; use matrix_sdk::ruma::OwnedRoomId;
use matrix_sdk::ruma::events::reaction::ReactionEventContent;
use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; use matrix_sdk::ruma::events::room::message::RoomMessageEventContent;
use matrix_sdk::ruma::events::room::redaction::RoomRedactionEventContent;
use ractor::ActorRef; use ractor::ActorRef;
use std::time::Instant; use std::time::Instant;
use tokio::sync::broadcast; use tokio::sync::broadcast;
@@ -23,7 +25,19 @@ use tokio::time::interval;
pub enum WsMessage { pub enum WsMessage {
/// Room message event /// Room message event
RoomMessageEvent { RoomMessageEvent {
event: RoomMessageEventContent, event: Box<RoomMessageEventContent>,
room_id: OwnedRoomId,
},
/// Room reaction event
RoomReactionEvent {
event: Box<ReactionEventContent>,
room_id: OwnedRoomId,
},
/// Room reaction event
RoomRedactionEvent {
event: Box<RoomRedactionEventContent>,
room_id: OwnedRoomId, room_id: OwnedRoomId,
}, },
} }
@@ -119,12 +133,33 @@ pub async fn ws_handler(
BroadcastMessage::RoomMessageEvent{user, event, room} if user == auth.user.email => { BroadcastMessage::RoomMessageEvent{user, event, room} if user == auth.user.email => {
// Send the message to the websocket // Send the message to the websocket
if let Ok(msg) = serde_json::to_string(&WsMessage::RoomMessageEvent { if let Ok(msg) = serde_json::to_string(&WsMessage::RoomMessageEvent {
event:event.content, event: Box::new(event.content),
room_id: room.room_id().to_owned(), room_id: room.room_id().to_owned(),
}) && let Err(e) = session.text(msg).await { }) && let Err(e) = session.text(msg).await {
log::error!("Failed to send SyncEvent: {e}"); log::error!("Failed to send SyncEvent: {e}");
} }
} }
BroadcastMessage::ReactionEvent{user, event, room} if user == auth.user.email => {
// Send the message to the websocket
if let Ok(msg) = serde_json::to_string(&WsMessage::RoomReactionEvent {
event: Box::new(event.content),
room_id: room.room_id().to_owned(),
}) && let Err(e) = session.text(msg).await {
log::error!("Failed to send SyncEvent: {e}");
}
}
BroadcastMessage::RoomRedactionEvent{user, event, room} if user == auth.user.email => {
// Send the message to the websocket
if let Ok(msg) = serde_json::to_string(&WsMessage::RoomRedactionEvent {
event: Box::new(event.content),
room_id: room.room_id().to_owned(),
}) && let Err(e) = session.text(msg).await {
log::error!("Failed to send SyncEvent: {e}");
}
}
_ => {} _ => {}
}; };

View File

@@ -167,6 +167,9 @@ impl MatrixClient {
.encryption() .encryption()
.wait_for_e2ee_initialization_tasks() .wait_for_e2ee_initialization_tasks()
.await; .await;
// Save stored session once
client.save_stored_session().await?;
} }
// Automatically save session when token gets refreshed // Automatically save session when token gets refreshed

View File

@@ -7,7 +7,9 @@ use crate::matrix_connection::matrix_client::MatrixClient;
use crate::matrix_connection::matrix_manager::MatrixManagerMsg; use crate::matrix_connection::matrix_manager::MatrixManagerMsg;
use futures_util::StreamExt; use futures_util::StreamExt;
use matrix_sdk::Room; use matrix_sdk::Room;
use matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent;
use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent; use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent;
use matrix_sdk::ruma::events::room::redaction::OriginalSyncRoomRedactionEvent;
use ractor::ActorRef; use ractor::ActorRef;
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
@@ -48,19 +50,49 @@ async fn sync_thread_task(
let mut sync_stream = client.sync_stream().await; let mut sync_stream = client.sync_stream().await;
let mut handlers = vec![];
let tx_msg_handle = tx.clone(); let tx_msg_handle = tx.clone();
let user = client.email.clone(); let user_msg_handle = client.email.clone();
let room_message_handle = client.add_event_handler( handlers.push(client.add_event_handler(
async move |event: OriginalSyncRoomMessageEvent, room: Room| { async move |event: OriginalSyncRoomMessageEvent, room: Room| {
if let Err(e) = tx_msg_handle.send(BroadcastMessage::RoomMessageEvent { if let Err(e) = tx_msg_handle.send(BroadcastMessage::RoomMessageEvent {
user: user.clone(), user: user_msg_handle.clone(),
event: Box::new(event), event: Box::new(event),
room, room,
}) { }) {
log::warn!("Failed to forward room event! {e}"); log::warn!("Failed to forward room message event! {e}");
} }
}, },
); ));
let tx_reac_handle = tx.clone();
let user_reac_handle = client.email.clone();
handlers.push(client.add_event_handler(
async move |event: OriginalSyncReactionEvent, room: Room| {
if let Err(e) = tx_reac_handle.send(BroadcastMessage::ReactionEvent {
user: user_reac_handle.clone(),
event: Box::new(event),
room,
}) {
log::warn!("Failed to forward reaction event! {e}");
}
},
));
let tx_redac_handle = tx.clone();
let user_redac_handle = client.email.clone();
handlers.push(client.add_event_handler(
async move |event: OriginalSyncRoomRedactionEvent, room: Room| {
if let Err(e) = tx_redac_handle.send(BroadcastMessage::RoomRedactionEvent {
user: user_redac_handle.clone(),
event: Box::new(event),
room,
}) {
log::warn!("Failed to forward reaction event! {e}");
}
},
));
loop { loop {
tokio::select! { tokio::select! {
@@ -103,7 +135,9 @@ async fn sync_thread_task(
} }
} }
client.remove_event_handler(room_message_handle); for h in handlers {
client.remove_event_handler(h);
}
// Notify manager about termination, so this thread can be removed from the list // Notify manager about termination, so this thread can be removed from the list
log::info!("Sync thread {id:?} terminated!"); log::info!("Sync thread {id:?} terminated!");