From 077c64be285bfdd2a97a1c89f7f75956c5978622 Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Mon, 1 Dec 2025 11:17:02 +0100 Subject: [PATCH] Forward typing event in WebSocket --- matrixgw_backend/src/broadcast_messages.rs | 3 ++ .../src/controllers/ws_controller.rs | 20 ++++++++++++ .../src/matrix_connection/sync_thread.rs | 31 ++++++++++++++----- 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/matrixgw_backend/src/broadcast_messages.rs b/matrixgw_backend/src/broadcast_messages.rs index 2e129eb..a398387 100644 --- a/matrixgw_backend/src/broadcast_messages.rs +++ b/matrixgw_backend/src/broadcast_messages.rs @@ -5,6 +5,7 @@ use matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent; use matrix_sdk::ruma::events::receipt::SyncReceiptEvent; use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent; use matrix_sdk::ruma::events::room::redaction::OriginalSyncRoomRedactionEvent; +use matrix_sdk::ruma::events::typing::SyncTypingEvent; use matrix_sdk::sync::SyncResponse; pub type BroadcastSender = tokio::sync::broadcast::Sender; @@ -35,6 +36,8 @@ pub enum BroadcastMessage { RoomRedactionEvent(BxRoomEvent), /// Message fully read event ReceiptEvent(BxRoomEvent), + /// User is typing message event + TypingEvent(BxRoomEvent), /// Raw Matrix sync response MatrixSyncResponse { user: UserEmail, sync: SyncResponse }, } diff --git a/matrixgw_backend/src/controllers/ws_controller.rs b/matrixgw_backend/src/controllers/ws_controller.rs index 62553e7..57a80de 100644 --- a/matrixgw_backend/src/controllers/ws_controller.rs +++ b/matrixgw_backend/src/controllers/ws_controller.rs @@ -42,6 +42,12 @@ pub struct WsReceiptEvent { pub receipts: Vec, } +#[derive(Debug, serde::Serialize)] +pub struct WsTypingEvent { + pub room_id: OwnedRoomId, + pub user_ids: Vec, +} + /// Messages sent to the client #[derive(Debug, serde::Serialize)] #[serde(tag = "type")] @@ -57,6 +63,9 @@ pub enum WsMessage { /// Fully read message event ReceiptEvent(WsReceiptEvent), + + /// User is typing event + TypingEvent(WsTypingEvent), } impl WsMessage { @@ -71,6 +80,7 @@ impl WsMessage { data: Box::new(evt.data.content.clone()), })) } + BroadcastMessage::ReactionEvent(evt) if &evt.user == user => { Some(Self::RoomReactionEvent(WsRoomEvent { room_id: evt.room.room_id().to_owned(), @@ -80,6 +90,7 @@ impl WsMessage { data: Box::new(evt.data.content.clone()), })) } + BroadcastMessage::RoomRedactionEvent(evt) if &evt.user == user => { Some(Self::RoomRedactionEvent(WsRoomEvent { room_id: evt.room.room_id().to_owned(), @@ -89,6 +100,7 @@ impl WsMessage { data: Box::new(evt.data.content.clone()), })) } + BroadcastMessage::ReceiptEvent(evt) if &evt.user == user => { let mut receipts = vec![]; for (event_id, r) in &evt.data.content.0 { @@ -108,6 +120,14 @@ impl WsMessage { receipts, })) } + + BroadcastMessage::TypingEvent(evt) if &evt.user == user => { + Some(Self::TypingEvent(WsTypingEvent { + room_id: evt.room.room_id().to_owned(), + user_ids: evt.data.content.user_ids.clone(), + })) + } + _ => None, } } diff --git a/matrixgw_backend/src/matrix_connection/sync_thread.rs b/matrixgw_backend/src/matrix_connection/sync_thread.rs index 439c753..73df0c4 100644 --- a/matrixgw_backend/src/matrix_connection/sync_thread.rs +++ b/matrixgw_backend/src/matrix_connection/sync_thread.rs @@ -11,6 +11,7 @@ use matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent; use matrix_sdk::ruma::events::receipt::SyncReceiptEvent; use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent; use matrix_sdk::ruma::events::room::redaction::OriginalSyncRoomRedactionEvent; +use matrix_sdk::ruma::events::typing::SyncTypingEvent; use ractor::ActorRef; #[derive(Clone, Debug, Eq, PartialEq)] @@ -54,11 +55,11 @@ async fn sync_thread_task( let mut handlers = vec![]; let tx_msg_handle = tx.clone(); - let user_msg_handle = client.email.clone(); + let user_msg_mail = client.email.clone(); handlers.push(client.add_event_handler( async move |event: OriginalSyncRoomMessageEvent, room: Room| { if let Err(e) = tx_msg_handle.send(BroadcastMessage::RoomMessageEvent(BxRoomEvent { - user: user_msg_handle.clone(), + user: user_msg_mail.clone(), data: Box::new(event), room, })) { @@ -68,11 +69,11 @@ async fn sync_thread_task( )); let tx_reac_handle = tx.clone(); - let user_reac_handle = client.email.clone(); + let user_reac_mail = client.email.clone(); handlers.push(client.add_event_handler( async move |event: OriginalSyncReactionEvent, room: Room| { if let Err(e) = tx_reac_handle.send(BroadcastMessage::ReactionEvent(BxRoomEvent { - user: user_reac_handle.clone(), + user: user_reac_mail.clone(), data: Box::new(event), room, })) { @@ -82,12 +83,12 @@ async fn sync_thread_task( )); let tx_redac_handle = tx.clone(); - let user_redac_handle = client.email.clone(); + let user_redac_mail = client.email.clone(); handlers.push(client.add_event_handler( async move |event: OriginalSyncRoomRedactionEvent, room: Room| { if let Err(e) = tx_redac_handle.send(BroadcastMessage::RoomRedactionEvent(BxRoomEvent { - user: user_redac_handle.clone(), + user: user_redac_mail.clone(), data: Box::new(event), room, })) @@ -98,11 +99,11 @@ async fn sync_thread_task( )); let tx_receipt_handle = tx.clone(); - let user_receipt_handle = client.email.clone(); + let user_receipt_mail = client.email.clone(); handlers.push( client.add_event_handler(async move |event: SyncReceiptEvent, room: Room| { if let Err(e) = tx_receipt_handle.send(BroadcastMessage::ReceiptEvent(BxRoomEvent { - user: user_receipt_handle.clone(), + user: user_receipt_mail.clone(), data: Box::new(event), room, })) { @@ -111,6 +112,20 @@ async fn sync_thread_task( }), ); + let tx_typing_handle = tx.clone(); + let user_typing_mail = client.email.clone(); + handlers.push( + client.add_event_handler(async move |event: SyncTypingEvent, room: Room| { + if let Err(e) = tx_typing_handle.send(BroadcastMessage::TypingEvent(BxRoomEvent { + user: user_typing_mail.clone(), + data: Box::new(event), + room, + })) { + log::warn!("Failed to forward typing event! {e}"); + } + }), + ); + loop { tokio::select! { // Message from tokio broadcast