From 8bbbe7022fe4665d7f69fe1a4756d875527e7037 Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Thu, 6 Nov 2025 21:18:27 +0100 Subject: [PATCH] Automatically disconnect user when token is invalid --- matrixgw_backend/src/broadcast_messages.rs | 10 +++ matrixgw_backend/src/lib.rs | 1 + matrixgw_backend/src/main.rs | 6 +- .../src/matrix_connection/matrix_client.rs | 70 ++++++++++++++++--- .../src/matrix_connection/matrix_manager.rs | 27 +++++-- 5 files changed, 99 insertions(+), 15 deletions(-) create mode 100644 matrixgw_backend/src/broadcast_messages.rs diff --git a/matrixgw_backend/src/broadcast_messages.rs b/matrixgw_backend/src/broadcast_messages.rs new file mode 100644 index 0000000..423114f --- /dev/null +++ b/matrixgw_backend/src/broadcast_messages.rs @@ -0,0 +1,10 @@ +use crate::users::UserEmail; + +pub type BroadcastSender = tokio::sync::broadcast::Sender; + +/// Broadcast messages +#[derive(Debug, Clone)] +pub enum BroadcastMessage { + /// User is or has been disconnected + UserDisconnected(UserEmail), +} diff --git a/matrixgw_backend/src/lib.rs b/matrixgw_backend/src/lib.rs index 61b9e52..3eb69b6 100644 --- a/matrixgw_backend/src/lib.rs +++ b/matrixgw_backend/src/lib.rs @@ -1,4 +1,5 @@ pub mod app_config; +pub mod broadcast_messages; pub mod constants; pub mod controllers; pub mod extractors; diff --git a/matrixgw_backend/src/main.rs b/matrixgw_backend/src/main.rs index 6c7e4fd..7614ee8 100644 --- a/matrixgw_backend/src/main.rs +++ b/matrixgw_backend/src/main.rs @@ -7,6 +7,7 @@ use actix_web::cookie::Key; use actix_web::middleware::Logger; use actix_web::{App, HttpServer, web}; use matrixgw_backend::app_config::AppConfig; +use matrixgw_backend::broadcast_messages::BroadcastMessage; use matrixgw_backend::constants; use matrixgw_backend::controllers::{auth_controller, matrix_link_controller, server_controller}; use matrixgw_backend::matrix_connection::matrix_manager::MatrixManagerActor; @@ -24,6 +25,8 @@ async fn main() -> std::io::Result<()> { .await .expect("Failed to connect to Redis!"); + let (ws_tx, _) = tokio::sync::broadcast::channel::(16); + // Auto create default account, if requested if let Some(mail) = &AppConfig::get().unsecure_auto_login_email() { User::create_or_update_user(mail, "Anonymous") @@ -35,7 +38,7 @@ async fn main() -> std::io::Result<()> { let (manager_actor, manager_actor_handle) = Actor::spawn( Some("matrix-clients-manager".to_string()), MatrixManagerActor, - (), + ws_tx.clone(), ) .await .expect("Failed to start Matrix manager actor!"); @@ -69,6 +72,7 @@ async fn main() -> std::io::Result<()> { .app_data(web::Data::new(RemoteIPConfig { proxy: AppConfig::get().proxy_ip.clone(), })) + .app_data(web::Data::new(ws_tx.clone())) // Server controller .route("/robots.txt", web::get().to(server_controller::robots_txt)) .route( diff --git a/matrixgw_backend/src/matrix_connection/matrix_client.rs b/matrixgw_backend/src/matrix_connection/matrix_client.rs index e682292..cfc23b1 100644 --- a/matrixgw_backend/src/matrix_connection/matrix_client.rs +++ b/matrixgw_backend/src/matrix_connection/matrix_client.rs @@ -1,13 +1,17 @@ use crate::app_config::AppConfig; +use crate::matrix_connection::matrix_manager::MatrixManagerMsg; use crate::users::UserEmail; use crate::utils::rand_utils::rand_string; use anyhow::Context; -use matrix_sdk::authentication::oauth::error::OAuthDiscoveryError; +use matrix_sdk::authentication::oauth::error::{ + BasicErrorResponseType, OAuthDiscoveryError, RequestTokenError, +}; use matrix_sdk::authentication::oauth::{ ClientId, OAuthError, OAuthSession, UrlOrQuery, UserSession, }; use matrix_sdk::ruma::serde::Raw; -use matrix_sdk::{Client, ClientBuildError}; +use matrix_sdk::{Client, ClientBuildError, RefreshTokenError}; +use ractor::ActorRef; use serde::{Deserialize, Serialize}; use url::Url; @@ -48,6 +52,10 @@ enum MatrixClientError { DecodeStoredSession(serde_json::Error), #[error("Failed to restore stored session! {0}")] RestoreSession(matrix_sdk::Error), + #[error("Failed to disconnect user! {0}")] + DisconnectUser(anyhow::Error), + #[error("Failed to refresh access token! {0}")] + InitialRefreshToken(RefreshTokenError), #[error("Failed to parse auth redirect URL! {0}")] ParseAuthRedirectURL(url::ParseError), #[error("Failed to build auth request! {0}")] @@ -66,13 +74,17 @@ pub struct FinishMatrixAuth { #[derive(Clone)] pub struct MatrixClient { + manager: ActorRef, pub email: UserEmail, pub client: Client, } impl MatrixClient { /// Start to build Matrix client to initiate user authentication - pub async fn build_client(email: &UserEmail) -> anyhow::Result { + pub async fn build_client( + manager: ActorRef, + email: &UserEmail, + ) -> anyhow::Result { // Check if we are restoring a previous state let session_file_path = AppConfig::get().user_matrix_session_file_path(email); let is_restoring = session_file_path.is_file(); @@ -102,8 +114,14 @@ impl MatrixClient { .await .map_err(MatrixClientError::BuildMatrixClient)?; + let client = Self { + manager, + email: email.clone(), + client, + }; + // Check metadata - let oauth = client.oauth(); + let oauth = client.client.oauth(); let server_metadata = oauth .server_metadata() .await @@ -118,20 +136,33 @@ impl MatrixClient { ) .map_err(MatrixClientError::DecodeStoredSession)?; - // Restore data + // Restore session client + .client .restore_session(OAuthSession { client_id: session.client_id, user: session.user_session, }) .await .map_err(MatrixClientError::RestoreSession)?; - } - let client = Self { - email: email.clone(), - client, - }; + // Force token refresh to make sure session is still alive, otherwise disconnect user + if let Err(refresh_error) = client.client.oauth().refresh_access_token().await { + if let RefreshTokenError::OAuth(e) = &refresh_error + && let OAuthError::RefreshToken(RequestTokenError::ServerResponse(e)) = &**e + && e.error() == &BasicErrorResponseType::InvalidGrant + { + log::warn!( + "Refresh token rejected by server, token must have been invalidated! {refresh_error}" + ); + client + .disconnect() + .await + .map_err(MatrixClientError::DisconnectUser)?; + } + return Err(MatrixClientError::InitialRefreshToken(refresh_error).into()); + } + } // Automatically save session when token gets refreshed client.setup_background_session_save().await; @@ -212,6 +243,13 @@ impl MatrixClient { match update { matrix_sdk::SessionChange::UnknownToken { soft_logout } => { log::warn!("Received an unknown token error; soft logout? {soft_logout:?}"); + if let Err(e) = this + .manager + .cast(MatrixManagerMsg::DisconnectClient(this.email)) + { + log::warn!("Failed to propagate invalid token error: {e}"); + } + break; } matrix_sdk::SessionChange::TokensRefreshed => { // The tokens have been refreshed, persist them to disk. @@ -254,4 +292,16 @@ impl MatrixClient { log::debug!("Updating the stored session: done!"); Ok(()) } + + /// Disconnect user from client + pub async fn disconnect(self) -> anyhow::Result<()> { + if let Err(e) = self.client.logout().await { + log::warn!("Failed to send logout request: {e}"); + } + + // Destroy user associated data + Self::destroy_data(&self.email)?; + + Ok(()) + } } diff --git a/matrixgw_backend/src/matrix_connection/matrix_manager.rs b/matrixgw_backend/src/matrix_connection/matrix_manager.rs index d6f180a..d75cab6 100644 --- a/matrixgw_backend/src/matrix_connection/matrix_manager.rs +++ b/matrixgw_backend/src/matrix_connection/matrix_manager.rs @@ -1,14 +1,17 @@ +use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; use crate::matrix_connection::matrix_client::MatrixClient; use crate::users::UserEmail; use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; use std::collections::HashMap; pub struct MatrixManagerState { + pub broadcast_sender: BroadcastSender, pub clients: HashMap, } pub enum MatrixManagerMsg { GetClient(UserEmail, RpcReplyPort>), + DisconnectClient(UserEmail), } pub struct MatrixManagerActor; @@ -16,21 +19,22 @@ pub struct MatrixManagerActor; impl Actor for MatrixManagerActor { type Msg = MatrixManagerMsg; type State = MatrixManagerState; - type Arguments = (); + type Arguments = BroadcastSender; async fn pre_start( &self, _myself: ActorRef, - _args: Self::Arguments, + args: Self::Arguments, ) -> Result { Ok(MatrixManagerState { + broadcast_sender: args, clients: HashMap::new(), }) } async fn handle( &self, - _myself: ActorRef, + myself: ActorRef, message: Self::Msg, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { @@ -41,7 +45,7 @@ impl Actor for MatrixManagerActor { None => { // Generate client if required log::info!("Building new client for {:?}", &email); - match MatrixClient::build_client(&email).await { + match MatrixClient::build_client(myself, &email).await { Ok(c) => { state.clients.insert(email.clone(), c.clone()); Ok(c) @@ -56,6 +60,21 @@ impl Actor for MatrixManagerActor { log::warn!("Failed to send client information: {e}") } } + MatrixManagerMsg::DisconnectClient(email) => { + if let Some(c) = state.clients.remove(&email) { + if let Err(e) = c.disconnect().await { + log::error!("Failed to disconnect client: {e}"); + } + if let Err(e) = state + .broadcast_sender + .send(BroadcastMessage::UserDisconnected(email)) + { + log::warn!( + "Failed to notify that user has been disconnected from Matrix! {e}" + ); + } + } + } } Ok(()) }