Ready to implement sync client manager

This commit is contained in:
Pierre HUBERT 2025-02-11 22:20:23 +01:00
parent bb1e412d36
commit 17e086b43c
6 changed files with 39 additions and 20 deletions

17
src/broadcast_messages.rs Normal file
View File

@ -0,0 +1,17 @@
use crate::sync_client::SyncClientID;
use crate::user::{APIClientID, UserID};
/// Broadcast messages
#[derive(Debug, Clone)]
pub enum BroadcastMessage {
/// Request to close the session of a specific client
CloseClientSession(APIClientID),
/// Close all the sessions of a given user
CloseAllUserSessions(UserID),
/// Stop sync client for a given user
StopSyncClientForUser(UserID),
/// Start sync client for a given user (if not already running)
StartSyncClientForUser(UserID),
/// Stop a client with a given client ID
StopSyncClient(SyncClientID),
}

View File

@ -4,3 +4,5 @@ pub mod extractors;
pub mod server; pub mod server;
pub mod user; pub mod user;
pub mod utils; pub mod utils;
pub mod sync_client;
pub mod broadcast_messages;

View File

@ -4,7 +4,7 @@ use actix_session::{storage::RedisSessionStore, SessionMiddleware};
use actix_web::cookie::Key; use actix_web::cookie::Key;
use actix_web::{web, App, HttpServer}; use actix_web::{web, App, HttpServer};
use matrix_gateway::app_config::AppConfig; use matrix_gateway::app_config::AppConfig;
use matrix_gateway::server::api::ws::WsMessage; use matrix_gateway::broadcast_messages::BroadcastMessage;
use matrix_gateway::server::{api, web_ui}; use matrix_gateway::server::{api, web_ui};
use matrix_gateway::user::UserConfig; use matrix_gateway::user::UserConfig;
@ -22,7 +22,9 @@ async fn main() -> std::io::Result<()> {
.await .await
.expect("Failed to connect to Redis!"); .expect("Failed to connect to Redis!");
let (ws_tx, _) = tokio::sync::broadcast::channel::<WsMessage>(16); let (ws_tx, _) = tokio::sync::broadcast::channel::<BroadcastMessage>(16);
// TODO : spawn a tokio task to launch sync client
log::info!( log::info!(
"Starting to listen on {} for {}", "Starting to listen on {} for {}",

View File

@ -1,7 +1,6 @@
use crate::constants::{WS_CLIENT_TIMEOUT, WS_HEARTBEAT_INTERVAL}; use crate::constants::{WS_CLIENT_TIMEOUT, WS_HEARTBEAT_INTERVAL};
use crate::extractors::client_auth::APIClientAuth; use crate::extractors::client_auth::APIClientAuth;
use crate::server::HttpResult; use crate::server::HttpResult;
use crate::user::{APIClientID, UserID};
use actix_web::dev::Payload; use actix_web::dev::Payload;
use actix_web::{web, FromRequest, HttpRequest}; use actix_web::{web, FromRequest, HttpRequest};
use actix_ws::Message; use actix_ws::Message;
@ -11,21 +10,15 @@ use tokio::select;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Receiver;
use tokio::time::interval; use tokio::time::interval;
use crate::broadcast_messages::BroadcastMessage;
/// WebSocket message
#[derive(Debug, Clone)]
pub enum WsMessage {
/// Request to close the session of a specific client
CloseClientSession(APIClientID),
/// Close all the sessions of a given user
CloseAllUserSessions(UserID),
}
/// Main WS route /// Main WS route
pub async fn ws( pub async fn ws(
req: HttpRequest, req: HttpRequest,
stream: web::Payload, stream: web::Payload,
tx: web::Data<broadcast::Sender<WsMessage>>, tx: web::Data<broadcast::Sender<BroadcastMessage>>,
) -> HttpResult { ) -> HttpResult {
// Forcefully ignore request payload by manually extracting authentication information // Forcefully ignore request payload by manually extracting authentication information
let auth = APIClientAuth::from_request(&req, &mut Payload::None).await?; let auth = APIClientAuth::from_request(&req, &mut Payload::None).await?;
@ -44,7 +37,7 @@ pub async fn ws_handler(
mut session: actix_ws::Session, mut session: actix_ws::Session,
mut msg_stream: actix_ws::MessageStream, mut msg_stream: actix_ws::MessageStream,
auth: APIClientAuth, auth: APIClientAuth,
mut rx: Receiver<WsMessage>, mut rx: Receiver<BroadcastMessage>,
) { ) {
log::info!("WS connected"); log::info!("WS connected");
@ -64,7 +57,7 @@ pub async fn ws_handler(
}; };
match msg { match msg {
WsMessage::CloseClientSession(id) => { BroadcastMessage::CloseClientSession(id) => {
if let Some(client) = &auth.client { if let Some(client) = &auth.client {
if client.id == id { if client.id == id {
log::info!( log::info!(
@ -74,7 +67,7 @@ pub async fn ws_handler(
} }
} }
}, },
WsMessage::CloseAllUserSessions(userid) => { BroadcastMessage::CloseAllUserSessions(userid) => {
if userid == auth.user.user_id { if userid == auth.user.user_id {
log::info!( log::info!(
"closing WS session of user {userid:?} as requested" "closing WS session of user {userid:?} as requested"
@ -82,7 +75,7 @@ pub async fn ws_handler(
break None; break None;
} }
} }
}; _ => {}};
} }

View File

@ -1,6 +1,5 @@
use crate::app_config::AppConfig; use crate::app_config::AppConfig;
use crate::constants::{STATE_KEY, USER_SESSION_KEY}; use crate::constants::{STATE_KEY, USER_SESSION_KEY};
use crate::server::api::ws::WsMessage;
use crate::server::{HttpFailure, HttpResult}; use crate::server::{HttpFailure, HttpResult};
use crate::user::{APIClient, APIClientID, User, UserConfig, UserID}; use crate::user::{APIClient, APIClientID, User, UserConfig, UserID};
use crate::utils; use crate::utils;
@ -11,6 +10,7 @@ use ipnet::IpNet;
use light_openid::primitives::OpenIDConfig; use light_openid::primitives::OpenIDConfig;
use std::str::FromStr; use std::str::FromStr;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use crate::broadcast_messages::BroadcastMessage;
/// Static assets /// Static assets
#[derive(rust_embed::Embed)] #[derive(rust_embed::Embed)]
@ -65,7 +65,7 @@ pub struct FormRequest {
pub async fn home( pub async fn home(
session: Session, session: Session,
form_req: Option<web::Form<FormRequest>>, form_req: Option<web::Form<FormRequest>>,
tx: web::Data<broadcast::Sender<WsMessage>>, tx: web::Data<broadcast::Sender<BroadcastMessage>>,
) -> HttpResult { ) -> HttpResult {
// Get user information, requesting authentication if information is missing // Get user information, requesting authentication if information is missing
let Some(user): Option<User> = session.get(USER_SESSION_KEY)? else { let Some(user): Option<User> = session.get(USER_SESSION_KEY)? else {
@ -103,8 +103,10 @@ pub async fn home(
config.save().await?; config.save().await?;
success_message = Some("Matrix token was successfully updated!".to_string()); success_message = Some("Matrix token was successfully updated!".to_string());
// TODO : stop user sync thread
// Invalidate all Ws connections // Invalidate all Ws connections
if let Err(e) = tx.send(WsMessage::CloseAllUserSessions(user.id.clone())) { if let Err(e) = tx.send(BroadcastMessage::CloseAllUserSessions(user.id.clone())) {
log::error!("Failed to send CloseAllUserSessions: {}", e); log::error!("Failed to send CloseAllUserSessions: {}", e);
} }
} }
@ -139,7 +141,7 @@ pub async fn home(
config.save().await?; config.save().await?;
success_message = Some("The client was successfully deleted!".to_string()); success_message = Some("The client was successfully deleted!".to_string());
if let Err(e) = tx.send(WsMessage::CloseClientSession(delete_client_id)) { if let Err(e) = tx.send(BroadcastMessage::CloseClientSession(delete_client_id)) {
log::error!("Failed to send CloseClientSession: {}", e); log::error!("Failed to send CloseClientSession: {}", e);
} }
} }

3
src/sync_client.rs Normal file
View File

@ -0,0 +1,3 @@
/// ID of sync client
#[derive(Debug, Clone)]
pub struct SyncClientID(uuid::Uuid);