Basic implementation of websocket
This commit is contained in:
198
matrixgw_backend/src/controllers/ws_controller.rs
Normal file
198
matrixgw_backend/src/controllers/ws_controller.rs
Normal file
@@ -0,0 +1,198 @@
|
||||
use crate::broadcast_messages::BroadcastMessage;
|
||||
use crate::constants;
|
||||
use crate::controllers::HttpResult;
|
||||
use crate::extractors::auth_extractor::{AuthExtractor, AuthenticatedMethod};
|
||||
use crate::extractors::matrix_client_extractor::MatrixClientExtractor;
|
||||
use crate::matrix_connection::matrix_client::MatrixClient;
|
||||
use crate::matrix_connection::matrix_manager::MatrixManagerMsg;
|
||||
use actix_web::dev::Payload;
|
||||
use actix_web::{FromRequest, HttpRequest, web};
|
||||
use actix_ws::Message;
|
||||
use futures_util::StreamExt;
|
||||
use matrix_sdk::ruma::OwnedRoomId;
|
||||
use matrix_sdk::ruma::events::room::message::RoomMessageEventContent;
|
||||
use ractor::ActorRef;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio::time::interval;
|
||||
|
||||
/// Messages sent to the client
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum WsMessage {
|
||||
/// Room message event
|
||||
RoomMessageEvent {
|
||||
event: RoomMessageEventContent,
|
||||
room_id: OwnedRoomId,
|
||||
},
|
||||
}
|
||||
|
||||
/// Main WS route
|
||||
pub async fn ws(
|
||||
req: HttpRequest,
|
||||
stream: web::Payload,
|
||||
tx: web::Data<broadcast::Sender<BroadcastMessage>>,
|
||||
manager: web::Data<ActorRef<MatrixManagerMsg>>,
|
||||
) -> HttpResult {
|
||||
// Forcefully ignore request payload by manually extracting authentication information
|
||||
let client = MatrixClientExtractor::from_request(&req, &mut Payload::None).await?;
|
||||
|
||||
// Ensure sync thread is started
|
||||
ractor::cast!(
|
||||
manager,
|
||||
MatrixManagerMsg::StartSyncThread(client.auth.user.email.clone())
|
||||
)
|
||||
.expect("Failed to start sync thread prior to running WebSocket!");
|
||||
|
||||
let rx = tx.subscribe();
|
||||
|
||||
let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
|
||||
|
||||
// spawn websocket handler (and don't await it) so that the response is returned immediately
|
||||
actix_web::rt::spawn(ws_handler(
|
||||
session,
|
||||
msg_stream,
|
||||
client.auth,
|
||||
client.client,
|
||||
rx,
|
||||
));
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn ws_handler(
|
||||
mut session: actix_ws::Session,
|
||||
mut msg_stream: actix_ws::MessageStream,
|
||||
auth: AuthExtractor,
|
||||
client: MatrixClient,
|
||||
mut rx: Receiver<BroadcastMessage>,
|
||||
) {
|
||||
log::info!(
|
||||
"WS connected for user {:?} / auth method={}",
|
||||
client.email,
|
||||
auth.method.light_str()
|
||||
);
|
||||
|
||||
let mut last_heartbeat = Instant::now();
|
||||
let mut interval = interval(constants::WS_HEARTBEAT_INTERVAL);
|
||||
|
||||
let reason = loop {
|
||||
// waits for either `msg_stream` to receive a message from the client, the broadcast channel
|
||||
// to send a message, or the heartbeat interval timer to tick, yielding the value of
|
||||
// whichever one is ready first
|
||||
tokio::select! {
|
||||
ws_msg = rx.recv() => {
|
||||
let msg = match ws_msg {
|
||||
Ok(msg) => msg,
|
||||
Err(broadcast::error::RecvError::Closed) => break None,
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
};
|
||||
|
||||
match msg {
|
||||
BroadcastMessage::APITokenDeleted(t) => {
|
||||
match &auth.method{
|
||||
AuthenticatedMethod::Token(tok) if tok.id == t.id => {
|
||||
log::info!(
|
||||
"closing WS session of user {:?} as associated token was deleted {:?}",
|
||||
client.email,
|
||||
t.base.name
|
||||
);
|
||||
break None;
|
||||
}
|
||||
_=>{}
|
||||
}
|
||||
|
||||
},
|
||||
BroadcastMessage::UserDisconnectedFromMatrix(mail) if mail == auth.user.email => {
|
||||
log::info!(
|
||||
"closing WS session of user {mail:?} as user was disconnected from Matrix"
|
||||
);
|
||||
break None;
|
||||
}
|
||||
|
||||
BroadcastMessage::RoomMessageEvent{user, event, room} if user == auth.user.email => {
|
||||
// Send the message to the websocket
|
||||
if let Ok(msg) = serde_json::to_string(&WsMessage::RoomMessageEvent {
|
||||
event:event.content,
|
||||
room_id: room.room_id().to_owned(),
|
||||
}) && let Err(e) = session.text(msg).await {
|
||||
log::error!("Failed to send SyncEvent: {e}");
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
// heartbeat interval ticked
|
||||
_tick = interval.tick() => {
|
||||
// if no heartbeat ping/pong received recently, close the connection
|
||||
if Instant::now().duration_since(last_heartbeat) > constants::WS_CLIENT_TIMEOUT {
|
||||
log::info!(
|
||||
"client has not sent heartbeat in over {:?}; disconnecting",constants::WS_CLIENT_TIMEOUT
|
||||
);
|
||||
|
||||
break None;
|
||||
}
|
||||
|
||||
// send heartbeat ping
|
||||
let _ = session.ping(b"").await;
|
||||
},
|
||||
|
||||
// Websocket messages
|
||||
msg = msg_stream.next() => {
|
||||
let msg = match msg {
|
||||
// received message from WebSocket client
|
||||
Some(Ok(msg)) => msg,
|
||||
|
||||
// client WebSocket stream error
|
||||
Some(Err(err)) => {
|
||||
log::error!("{err}");
|
||||
break None;
|
||||
}
|
||||
|
||||
// client WebSocket stream ended
|
||||
None => break None
|
||||
};
|
||||
|
||||
log::debug!("msg: {msg:?}");
|
||||
|
||||
match msg {
|
||||
Message::Text(s) => {
|
||||
log::info!("Text message from WS: {s}");
|
||||
}
|
||||
|
||||
Message::Binary(_) => {
|
||||
// drop client's binary messages
|
||||
}
|
||||
|
||||
Message::Close(reason) => {
|
||||
break reason;
|
||||
}
|
||||
|
||||
Message::Ping(bytes) => {
|
||||
last_heartbeat = Instant::now();
|
||||
let _ = session.pong(&bytes).await;
|
||||
}
|
||||
|
||||
Message::Pong(_) => {
|
||||
last_heartbeat = Instant::now();
|
||||
}
|
||||
|
||||
Message::Continuation(_) => {
|
||||
log::warn!("no support for continuation frames");
|
||||
}
|
||||
|
||||
// no-op; ignore
|
||||
Message::Nop => {}
|
||||
};
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// attempt to close connection gracefully
|
||||
let _ = session.close(reason).await;
|
||||
|
||||
log::info!("WS disconnected for user {:?}", client.email);
|
||||
}
|
||||
Reference in New Issue
Block a user