diff --git a/src/broadcast_messages.rs b/src/broadcast_messages.rs index c9ae251..7ee8e63 100644 --- a/src/broadcast_messages.rs +++ b/src/broadcast_messages.rs @@ -1,5 +1,32 @@ use crate::sync_client::SyncClientID; use crate::user::{APIClientID, UserID}; +use ruma::api::client::sync::sync_events::v3::{GlobalAccountData, Presence, Rooms, ToDevice}; +use ruma::api::client::sync::sync_events::DeviceLists; + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct SyncEvent { + /// Updates to rooms. + #[serde(default, skip_serializing_if = "Rooms::is_empty")] + pub rooms: Rooms, + + /// Updates to the presence status of other users. + #[serde(default, skip_serializing_if = "Presence::is_empty")] + pub presence: Presence, + + /// The global private data created by this user. + #[serde(default, skip_serializing_if = "GlobalAccountData::is_empty")] + pub account_data: GlobalAccountData, + + /// Messages sent directly between devices. + #[serde(default, skip_serializing_if = "ToDevice::is_empty")] + pub to_device: ToDevice, + + /// Information on E2E device updates. + /// + /// Only present on an incremental sync. + #[serde(default, skip_serializing_if = "DeviceLists::is_empty")] + pub device_lists: DeviceLists, +} /// Broadcast messages #[derive(Debug, Clone)] @@ -14,4 +41,6 @@ pub enum BroadcastMessage { StartSyncTaskForUser(UserID), /// Stop a client with a given client ID StopSyncClient(SyncClientID), + /// Propagate a new sync event + SyncEvent(UserID, SyncEvent), } diff --git a/src/server/api/ws.rs b/src/server/api/ws.rs index a6ae13e..9975aa2 100644 --- a/src/server/api/ws.rs +++ b/src/server/api/ws.rs @@ -1,4 +1,4 @@ -use crate::broadcast_messages::BroadcastMessage; +use crate::broadcast_messages::{BroadcastMessage, SyncEvent}; use crate::constants::{WS_CLIENT_TIMEOUT, WS_HEARTBEAT_INTERVAL}; use crate::extractors::client_auth::APIClientAuth; use crate::server::HttpResult; @@ -12,6 +12,13 @@ use tokio::sync::broadcast; use tokio::sync::broadcast::Receiver; use tokio::time::interval; +/// Messages send to the client +#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[serde(tag = "type")] +pub enum WsMessage { + Sync(SyncEvent), +} + /// Main WS route pub async fn ws( req: HttpRequest, @@ -80,6 +87,19 @@ pub async fn ws_handler( break None; } } + + BroadcastMessage::SyncEvent(userid, event) => { + if userid != auth.user.user_id { + continue; + } + + // Send the message to the websocket + if let Ok(msg) = serde_json::to_string(&WsMessage::Sync(event)) { + if let Err(e) = session.text(msg).await { + log::error!("Failed to send SyncEvent: {}", e); + } + } + } _ => {}}; } diff --git a/src/sync_client.rs b/src/sync_client.rs index 85c8cbe..af40884 100644 --- a/src/sync_client.rs +++ b/src/sync_client.rs @@ -1,4 +1,4 @@ -use crate::broadcast_messages::BroadcastMessage; +use crate::broadcast_messages::{BroadcastMessage, SyncEvent}; use crate::user::{UserConfig, UserID}; use futures_util::TryStreamExt; use ruma::api::client::sync::sync_events; @@ -121,7 +121,15 @@ async fn sync_task( msg_stream = sync_stream.try_next() => { match msg_stream { Ok(Some(msg)) => { - log::debug!("Received new message from Matrix: {msg:?}"); + log::debug!("Received new message from Matrix: {msg:#?}"); + if let Err(e) = tx.send(BroadcastMessage::SyncEvent(user_id.clone(), SyncEvent { + rooms: msg.rooms,presence: msg.presence, + account_data: msg.account_data, + to_device: msg.to_device, + device_lists: msg.device_lists, + })) { + log::error!("Failed to propagate event! {e}"); + } } Ok(None) => { log::debug!("Received no message from Matrix");