use crate::broadcast_messages::BroadcastMessage; use crate::constants; use crate::controllers::HttpResult; use crate::extractors::auth_extractor::{AuthExtractor, AuthenticatedMethod}; use crate::extractors::matrix_client_extractor::MatrixClientExtractor; use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::matrix_manager::MatrixManagerMsg; use crate::users::UserEmail; use actix_web::dev::Payload; use actix_web::{FromRequest, HttpRequest, HttpResponse, web}; use actix_ws::Message; use futures_util::StreamExt; use matrix_sdk::ruma::events::reaction::ReactionEventContent; use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; use matrix_sdk::ruma::events::room::redaction::RoomRedactionEventContent; use matrix_sdk::ruma::{MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId}; use ractor::ActorRef; use std::time::Instant; use tokio::sync::broadcast; use tokio::sync::broadcast::Receiver; use tokio::time::interval; #[derive(Debug, serde::Serialize)] pub struct WsRoomEvent { pub room_id: OwnedRoomId, pub event_id: OwnedEventId, pub sender: OwnedUserId, pub origin_server_ts: MilliSecondsSinceUnixEpoch, pub data: Box, } /// Messages sent to the client #[derive(Debug, serde::Serialize)] #[serde(tag = "type")] pub enum WsMessage { /// Room message event RoomMessageEvent(WsRoomEvent), /// Room reaction event RoomReactionEvent(WsRoomEvent), /// Room reaction event RoomRedactionEvent(WsRoomEvent), } impl WsMessage { pub fn from_bx_message(msg: &BroadcastMessage, user: &UserEmail) -> Option { match msg { BroadcastMessage::RoomMessageEvent(evt) if &evt.user == user => { Some(Self::RoomMessageEvent(WsRoomEvent { room_id: evt.room.room_id().to_owned(), event_id: evt.data.event_id.clone(), sender: evt.data.sender.clone(), origin_server_ts: evt.data.origin_server_ts, data: Box::new(evt.data.content.clone()), })) } BroadcastMessage::ReactionEvent(evt) if &evt.user == user => { Some(Self::RoomReactionEvent(WsRoomEvent { room_id: evt.room.room_id().to_owned(), event_id: evt.data.event_id.clone(), sender: evt.data.sender.clone(), origin_server_ts: evt.data.origin_server_ts, data: Box::new(evt.data.content.clone()), })) } BroadcastMessage::RoomRedactionEvent(evt) if &evt.user == user => { Some(Self::RoomRedactionEvent(WsRoomEvent { room_id: evt.room.room_id().to_owned(), event_id: evt.data.event_id.clone(), sender: evt.data.sender.clone(), origin_server_ts: evt.data.origin_server_ts, data: Box::new(evt.data.content.clone()), })) } _ => None, } } } /// Main WS route pub async fn ws( req: HttpRequest, stream: web::Payload, tx: web::Data>, manager: web::Data>, ) -> HttpResult { // Forcefully ignore request payload by manually extracting authentication information let client = MatrixClientExtractor::from_request(&req, &mut Payload::None).await?; // Check if Matrix link has been established first if !client.client.is_client_connected() { return Ok(HttpResponse::ExpectationFailed().json("Matrix link not established yet!")); } // Ensure sync thread is started ractor::cast!( manager, MatrixManagerMsg::StartSyncThread(client.auth.user.email.clone()) ) .expect("Failed to start sync thread prior to running WebSocket!"); let rx = tx.subscribe(); let (res, session, msg_stream) = actix_ws::handle(&req, stream)?; // spawn websocket handler (and don't await it) so that the response is returned immediately actix_web::rt::spawn(ws_handler( session, msg_stream, client.auth, client.client, rx, )); Ok(res) } pub async fn ws_handler( mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream, auth: AuthExtractor, client: MatrixClient, mut rx: Receiver, ) { log::info!( "WS connected for user {:?} / auth method={}", client.email, auth.method.light_str() ); let mut last_heartbeat = Instant::now(); let mut interval = interval(constants::WS_HEARTBEAT_INTERVAL); let reason = loop { // waits for either `msg_stream` to receive a message from the client, the broadcast channel // to send a message, or the heartbeat interval timer to tick, yielding the value of // whichever one is ready first tokio::select! { ws_msg = rx.recv() => { let msg = match ws_msg { Ok(msg) => msg, Err(broadcast::error::RecvError::Closed) => break None, Err(broadcast::error::RecvError::Lagged(_)) => continue, }; match (&msg, WsMessage::from_bx_message(&msg, &auth.user.email)) { (BroadcastMessage::APITokenDeleted(t), _) => { match &auth.method{ AuthenticatedMethod::Token(tok) if tok.id == t.id => { log::info!( "closing WS session of user {:?} as associated token was deleted {:?}", client.email, t.base.name ); break None; } _=>{} } }, (BroadcastMessage::UserDisconnectedFromMatrix(mail), _) if mail == &auth.user.email => { log::info!( "closing WS session of user {mail:?} as user was disconnected from Matrix" ); break None; } (_, Some(message)) => { // Send the message to the websocket if let Ok(msg) = serde_json::to_string(&message) && let Err(e) = session.text(msg).await { log::error!("Failed to send SyncEvent: {e}"); } } _ => {} }; } // heartbeat interval ticked _tick = interval.tick() => { // if no heartbeat ping/pong received recently, close the connection if Instant::now().duration_since(last_heartbeat) > constants::WS_CLIENT_TIMEOUT { log::info!( "client has not sent heartbeat in over {:?}; disconnecting",constants::WS_CLIENT_TIMEOUT ); break None; } // send heartbeat ping let _ = session.ping(b"").await; }, // Websocket messages msg = msg_stream.next() => { let msg = match msg { // received message from WebSocket client Some(Ok(msg)) => msg, // client WebSocket stream error Some(Err(err)) => { log::error!("{err}"); break None; } // client WebSocket stream ended None => break None }; log::debug!("msg: {msg:?}"); match msg { Message::Text(s) => { log::info!("Text message from WS: {s}"); } Message::Binary(_) => { // drop client's binary messages } Message::Close(reason) => { break reason; } Message::Ping(bytes) => { last_heartbeat = Instant::now(); let _ = session.pong(&bytes).await; } Message::Pong(_) => { last_heartbeat = Instant::now(); } Message::Continuation(_) => { log::warn!("no support for continuation frames"); } // no-op; ignore Message::Nop => {} }; } } }; // attempt to close connection gracefully let _ = session.close(reason).await; log::info!("WS disconnected for user {:?}", client.email); }