diff --git a/src/broadcast_messages.rs b/src/broadcast_messages.rs new file mode 100644 index 0000000..5dc6766 --- /dev/null +++ b/src/broadcast_messages.rs @@ -0,0 +1,17 @@ +use crate::sync_client::SyncClientID; +use crate::user::{APIClientID, UserID}; + +/// Broadcast messages +#[derive(Debug, Clone)] +pub enum BroadcastMessage { + /// Request to close the session of a specific client + CloseClientSession(APIClientID), + /// Close all the sessions of a given user + CloseAllUserSessions(UserID), + /// Stop sync client for a given user + StopSyncClientForUser(UserID), + /// Start sync client for a given user (if not already running) + StartSyncClientForUser(UserID), + /// Stop a client with a given client ID + StopSyncClient(SyncClientID), +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 54143a9..f9b42fd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,3 +4,5 @@ pub mod extractors; pub mod server; pub mod user; pub mod utils; +pub mod sync_client; +pub mod broadcast_messages; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 668e220..27a8ffa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use actix_session::{storage::RedisSessionStore, SessionMiddleware}; use actix_web::cookie::Key; use actix_web::{web, App, HttpServer}; use matrix_gateway::app_config::AppConfig; -use matrix_gateway::server::api::ws::WsMessage; +use matrix_gateway::broadcast_messages::BroadcastMessage; use matrix_gateway::server::{api, web_ui}; use matrix_gateway::user::UserConfig; @@ -22,7 +22,9 @@ async fn main() -> std::io::Result<()> { .await .expect("Failed to connect to Redis!"); - let (ws_tx, _) = tokio::sync::broadcast::channel::(16); + let (ws_tx, _) = tokio::sync::broadcast::channel::(16); + + // TODO : spawn a tokio task to launch sync client log::info!( "Starting to listen on {} for {}", diff --git a/src/server/api/ws.rs b/src/server/api/ws.rs index b8fde4d..474a90f 100644 --- a/src/server/api/ws.rs +++ b/src/server/api/ws.rs @@ -1,7 +1,6 @@ use crate::constants::{WS_CLIENT_TIMEOUT, WS_HEARTBEAT_INTERVAL}; use crate::extractors::client_auth::APIClientAuth; use crate::server::HttpResult; -use crate::user::{APIClientID, UserID}; use actix_web::dev::Payload; use actix_web::{web, FromRequest, HttpRequest}; use actix_ws::Message; @@ -11,21 +10,15 @@ use tokio::select; use tokio::sync::broadcast; use tokio::sync::broadcast::Receiver; use tokio::time::interval; +use crate::broadcast_messages::BroadcastMessage; + -/// WebSocket message -#[derive(Debug, Clone)] -pub enum WsMessage { - /// Request to close the session of a specific client - CloseClientSession(APIClientID), - /// Close all the sessions of a given user - CloseAllUserSessions(UserID), -} /// Main WS route pub async fn ws( req: HttpRequest, stream: web::Payload, - tx: web::Data>, + tx: web::Data>, ) -> HttpResult { // Forcefully ignore request payload by manually extracting authentication information let auth = APIClientAuth::from_request(&req, &mut Payload::None).await?; @@ -44,7 +37,7 @@ pub async fn ws_handler( mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream, auth: APIClientAuth, - mut rx: Receiver, + mut rx: Receiver, ) { log::info!("WS connected"); @@ -64,7 +57,7 @@ pub async fn ws_handler( }; match msg { - WsMessage::CloseClientSession(id) => { + BroadcastMessage::CloseClientSession(id) => { if let Some(client) = &auth.client { if client.id == id { log::info!( @@ -74,7 +67,7 @@ pub async fn ws_handler( } } }, - WsMessage::CloseAllUserSessions(userid) => { + BroadcastMessage::CloseAllUserSessions(userid) => { if userid == auth.user.user_id { log::info!( "closing WS session of user {userid:?} as requested" @@ -82,7 +75,7 @@ pub async fn ws_handler( break None; } } - }; + _ => {}}; } diff --git a/src/server/web_ui.rs b/src/server/web_ui.rs index 83288d8..4c88172 100644 --- a/src/server/web_ui.rs +++ b/src/server/web_ui.rs @@ -1,6 +1,5 @@ use crate::app_config::AppConfig; use crate::constants::{STATE_KEY, USER_SESSION_KEY}; -use crate::server::api::ws::WsMessage; use crate::server::{HttpFailure, HttpResult}; use crate::user::{APIClient, APIClientID, User, UserConfig, UserID}; use crate::utils; @@ -11,6 +10,7 @@ use ipnet::IpNet; use light_openid::primitives::OpenIDConfig; use std::str::FromStr; use tokio::sync::broadcast; +use crate::broadcast_messages::BroadcastMessage; /// Static assets #[derive(rust_embed::Embed)] @@ -65,7 +65,7 @@ pub struct FormRequest { pub async fn home( session: Session, form_req: Option>, - tx: web::Data>, + tx: web::Data>, ) -> HttpResult { // Get user information, requesting authentication if information is missing let Some(user): Option = session.get(USER_SESSION_KEY)? else { @@ -103,8 +103,10 @@ pub async fn home( config.save().await?; success_message = Some("Matrix token was successfully updated!".to_string()); + // TODO : stop user sync thread + // Invalidate all Ws connections - if let Err(e) = tx.send(WsMessage::CloseAllUserSessions(user.id.clone())) { + if let Err(e) = tx.send(BroadcastMessage::CloseAllUserSessions(user.id.clone())) { log::error!("Failed to send CloseAllUserSessions: {}", e); } } @@ -139,7 +141,7 @@ pub async fn home( config.save().await?; success_message = Some("The client was successfully deleted!".to_string()); - if let Err(e) = tx.send(WsMessage::CloseClientSession(delete_client_id)) { + if let Err(e) = tx.send(BroadcastMessage::CloseClientSession(delete_client_id)) { log::error!("Failed to send CloseClientSession: {}", e); } } diff --git a/src/sync_client.rs b/src/sync_client.rs new file mode 100644 index 0000000..a467e94 --- /dev/null +++ b/src/sync_client.rs @@ -0,0 +1,3 @@ +/// ID of sync client +#[derive(Debug, Clone)] +pub struct SyncClientID(uuid::Uuid); \ No newline at end of file