Forward typing event in WebSocket

This commit is contained in:
2025-12-01 11:17:02 +01:00
parent dac20f60e0
commit 077c64be28
3 changed files with 46 additions and 8 deletions

View File

@@ -5,6 +5,7 @@ use matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent;
use matrix_sdk::ruma::events::receipt::SyncReceiptEvent;
use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent;
use matrix_sdk::ruma::events::room::redaction::OriginalSyncRoomRedactionEvent;
use matrix_sdk::ruma::events::typing::SyncTypingEvent;
use matrix_sdk::sync::SyncResponse;
pub type BroadcastSender = tokio::sync::broadcast::Sender<BroadcastMessage>;
@@ -35,6 +36,8 @@ pub enum BroadcastMessage {
RoomRedactionEvent(BxRoomEvent<OriginalSyncRoomRedactionEvent>),
/// Message fully read event
ReceiptEvent(BxRoomEvent<SyncReceiptEvent>),
/// User is typing message event
TypingEvent(BxRoomEvent<SyncTypingEvent>),
/// Raw Matrix sync response
MatrixSyncResponse { user: UserEmail, sync: SyncResponse },
}

View File

@@ -42,6 +42,12 @@ pub struct WsReceiptEvent {
pub receipts: Vec<WsReceiptEntry>,
}
#[derive(Debug, serde::Serialize)]
pub struct WsTypingEvent {
pub room_id: OwnedRoomId,
pub user_ids: Vec<OwnedUserId>,
}
/// Messages sent to the client
#[derive(Debug, serde::Serialize)]
#[serde(tag = "type")]
@@ -57,6 +63,9 @@ pub enum WsMessage {
/// Fully read message event
ReceiptEvent(WsReceiptEvent),
/// User is typing event
TypingEvent(WsTypingEvent),
}
impl WsMessage {
@@ -71,6 +80,7 @@ impl WsMessage {
data: Box::new(evt.data.content.clone()),
}))
}
BroadcastMessage::ReactionEvent(evt) if &evt.user == user => {
Some(Self::RoomReactionEvent(WsRoomEvent {
room_id: evt.room.room_id().to_owned(),
@@ -80,6 +90,7 @@ impl WsMessage {
data: Box::new(evt.data.content.clone()),
}))
}
BroadcastMessage::RoomRedactionEvent(evt) if &evt.user == user => {
Some(Self::RoomRedactionEvent(WsRoomEvent {
room_id: evt.room.room_id().to_owned(),
@@ -89,6 +100,7 @@ impl WsMessage {
data: Box::new(evt.data.content.clone()),
}))
}
BroadcastMessage::ReceiptEvent(evt) if &evt.user == user => {
let mut receipts = vec![];
for (event_id, r) in &evt.data.content.0 {
@@ -108,6 +120,14 @@ impl WsMessage {
receipts,
}))
}
BroadcastMessage::TypingEvent(evt) if &evt.user == user => {
Some(Self::TypingEvent(WsTypingEvent {
room_id: evt.room.room_id().to_owned(),
user_ids: evt.data.content.user_ids.clone(),
}))
}
_ => None,
}
}

View File

@@ -11,6 +11,7 @@ use matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent;
use matrix_sdk::ruma::events::receipt::SyncReceiptEvent;
use matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent;
use matrix_sdk::ruma::events::room::redaction::OriginalSyncRoomRedactionEvent;
use matrix_sdk::ruma::events::typing::SyncTypingEvent;
use ractor::ActorRef;
#[derive(Clone, Debug, Eq, PartialEq)]
@@ -54,11 +55,11 @@ async fn sync_thread_task(
let mut handlers = vec![];
let tx_msg_handle = tx.clone();
let user_msg_handle = client.email.clone();
let user_msg_mail = client.email.clone();
handlers.push(client.add_event_handler(
async move |event: OriginalSyncRoomMessageEvent, room: Room| {
if let Err(e) = tx_msg_handle.send(BroadcastMessage::RoomMessageEvent(BxRoomEvent {
user: user_msg_handle.clone(),
user: user_msg_mail.clone(),
data: Box::new(event),
room,
})) {
@@ -68,11 +69,11 @@ async fn sync_thread_task(
));
let tx_reac_handle = tx.clone();
let user_reac_handle = client.email.clone();
let user_reac_mail = client.email.clone();
handlers.push(client.add_event_handler(
async move |event: OriginalSyncReactionEvent, room: Room| {
if let Err(e) = tx_reac_handle.send(BroadcastMessage::ReactionEvent(BxRoomEvent {
user: user_reac_handle.clone(),
user: user_reac_mail.clone(),
data: Box::new(event),
room,
})) {
@@ -82,12 +83,12 @@ async fn sync_thread_task(
));
let tx_redac_handle = tx.clone();
let user_redac_handle = client.email.clone();
let user_redac_mail = client.email.clone();
handlers.push(client.add_event_handler(
async move |event: OriginalSyncRoomRedactionEvent, room: Room| {
if let Err(e) =
tx_redac_handle.send(BroadcastMessage::RoomRedactionEvent(BxRoomEvent {
user: user_redac_handle.clone(),
user: user_redac_mail.clone(),
data: Box::new(event),
room,
}))
@@ -98,11 +99,11 @@ async fn sync_thread_task(
));
let tx_receipt_handle = tx.clone();
let user_receipt_handle = client.email.clone();
let user_receipt_mail = client.email.clone();
handlers.push(
client.add_event_handler(async move |event: SyncReceiptEvent, room: Room| {
if let Err(e) = tx_receipt_handle.send(BroadcastMessage::ReceiptEvent(BxRoomEvent {
user: user_receipt_handle.clone(),
user: user_receipt_mail.clone(),
data: Box::new(event),
room,
})) {
@@ -111,6 +112,20 @@ async fn sync_thread_task(
}),
);
let tx_typing_handle = tx.clone();
let user_typing_mail = client.email.clone();
handlers.push(
client.add_event_handler(async move |event: SyncTypingEvent, room: Room| {
if let Err(e) = tx_typing_handle.send(BroadcastMessage::TypingEvent(BxRoomEvent {
user: user_typing_mail.clone(),
data: Box::new(event),
room,
})) {
log::warn!("Failed to forward typing event! {e}");
}
}),
);
loop {
tokio::select! {
// Message from tokio broadcast