From c9b703bea3198149920da2da322a3843e4bdbf6c Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Tue, 18 Nov 2025 22:17:39 +0100 Subject: [PATCH] Ready to implement sync thread logic --- matrixgw_backend/src/broadcast_messages.rs | 3 ++ .../matrix_sync_thread_controller.rs | 22 +++++++++++++ matrixgw_backend/src/controllers/mod.rs | 1 + matrixgw_backend/src/main.rs | 8 ++++- .../src/matrix_connection/matrix_manager.rs | 33 +++++++++++++++++++ matrixgw_backend/src/matrix_connection/mod.rs | 1 + .../src/matrix_connection/sync_thread.rs | 32 ++++++++++++++++++ matrixgw_frontend/src/api/MatrixSyncApi.ts | 13 ++++++++ matrixgw_frontend/src/routes/HomeRoute.tsx | 15 ++++++++- 9 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 matrixgw_backend/src/controllers/matrix_sync_thread_controller.rs create mode 100644 matrixgw_backend/src/matrix_connection/sync_thread.rs create mode 100644 matrixgw_frontend/src/api/MatrixSyncApi.ts diff --git a/matrixgw_backend/src/broadcast_messages.rs b/matrixgw_backend/src/broadcast_messages.rs index e31174e..d83d2d3 100644 --- a/matrixgw_backend/src/broadcast_messages.rs +++ b/matrixgw_backend/src/broadcast_messages.rs @@ -1,3 +1,4 @@ +use crate::matrix_connection::sync_thread::MatrixSyncTaskID; use crate::users::{APIToken, UserEmail}; pub type BroadcastSender = tokio::sync::broadcast::Sender; @@ -9,4 +10,6 @@ pub enum BroadcastMessage { UserDisconnectedFromMatrix(UserEmail), /// API token has been deleted APITokenDeleted(APIToken), + /// Request a Matrix sync thread to be interrupted + StopSyncThread(MatrixSyncTaskID), } diff --git a/matrixgw_backend/src/controllers/matrix_sync_thread_controller.rs b/matrixgw_backend/src/controllers/matrix_sync_thread_controller.rs new file mode 100644 index 0000000..bbfa799 --- /dev/null +++ b/matrixgw_backend/src/controllers/matrix_sync_thread_controller.rs @@ -0,0 +1,22 @@ +use crate::controllers::HttpResult; +use crate::extractors::matrix_client_extractor::MatrixClientExtractor; +use crate::matrix_connection::matrix_manager::MatrixManagerMsg; +use actix_web::{HttpResponse, web}; +use ractor::ActorRef; + +/// Start sync thread +pub async fn start_sync( + client: MatrixClientExtractor, + manager: web::Data>, +) -> HttpResult { + match ractor::cast!( + manager, + MatrixManagerMsg::StartSyncThread(client.auth.user.email.clone()) + ) { + Ok(_) => Ok(HttpResponse::Accepted().finish()), + Err(e) => { + log::error!("Failed to start sync: {e}"); + Ok(HttpResponse::InternalServerError().finish()) + } + } +} diff --git a/matrixgw_backend/src/controllers/mod.rs b/matrixgw_backend/src/controllers/mod.rs index 0fbc422..01d8545 100644 --- a/matrixgw_backend/src/controllers/mod.rs +++ b/matrixgw_backend/src/controllers/mod.rs @@ -4,6 +4,7 @@ use std::error::Error; pub mod auth_controller; pub mod matrix_link_controller; +pub mod matrix_sync_thread_controller; pub mod server_controller; pub mod tokens_controller; diff --git a/matrixgw_backend/src/main.rs b/matrixgw_backend/src/main.rs index 4d5ec16..9a98eb9 100644 --- a/matrixgw_backend/src/main.rs +++ b/matrixgw_backend/src/main.rs @@ -10,7 +10,8 @@ use matrixgw_backend::app_config::AppConfig; use matrixgw_backend::broadcast_messages::BroadcastMessage; use matrixgw_backend::constants; use matrixgw_backend::controllers::{ - auth_controller, matrix_link_controller, server_controller, tokens_controller, + auth_controller, matrix_link_controller, matrix_sync_thread_controller, server_controller, + tokens_controller, }; use matrixgw_backend::matrix_connection::matrix_manager::MatrixManagerActor; use matrixgw_backend::users::User; @@ -119,6 +120,11 @@ async fn main() -> std::io::Result<()> { "/api/token/{id}", web::delete().to(tokens_controller::delete), ) + // Matrix synchronization controller + .route( + "/api/matrix_sync/start", + web::post().to(matrix_sync_thread_controller::start_sync), + ) }) .workers(4) .bind(&AppConfig::get().listen_address)? diff --git a/matrixgw_backend/src/matrix_connection/matrix_manager.rs b/matrixgw_backend/src/matrix_connection/matrix_manager.rs index 759d694..fc80451 100644 --- a/matrixgw_backend/src/matrix_connection/matrix_manager.rs +++ b/matrixgw_backend/src/matrix_connection/matrix_manager.rs @@ -1,5 +1,6 @@ use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; use crate::matrix_connection::matrix_client::MatrixClient; +use crate::matrix_connection::sync_thread::{MatrixSyncTaskID, start_sync_thread}; use crate::users::UserEmail; use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; use std::collections::HashMap; @@ -7,11 +8,13 @@ use std::collections::HashMap; pub struct MatrixManagerState { pub broadcast_sender: BroadcastSender, pub clients: HashMap, + pub running_sync_threads: HashMap, } pub enum MatrixManagerMsg { GetClient(UserEmail, RpcReplyPort>), DisconnectClient(UserEmail), + StartSyncThread(UserEmail), } pub struct MatrixManagerActor; @@ -29,6 +32,7 @@ impl Actor for MatrixManagerActor { Ok(MatrixManagerState { broadcast_sender: args, clients: HashMap::new(), + running_sync_threads: Default::default(), }) } @@ -62,6 +66,15 @@ impl Actor for MatrixManagerActor { } MatrixManagerMsg::DisconnectClient(email) => { if let Some(c) = state.clients.remove(&email) { + // Stop sync thread (if running) + if let Some(id) = state.running_sync_threads.remove(&email) { + state + .broadcast_sender + .send(BroadcastMessage::StopSyncThread(id)) + .ok(); + } + + // Disconnect client if let Err(e) = c.disconnect().await { log::error!("Failed to disconnect client: {e}"); } @@ -75,6 +88,26 @@ impl Actor for MatrixManagerActor { } } } + MatrixManagerMsg::StartSyncThread(email) => { + // Do nothing if task is already running + if state.running_sync_threads.contains_key(&email) { + log::debug!("Not starting sync thread for {email:?} as it is already running"); + return Ok(()); + } + + let Some(client) = state.clients.get(&email) else { + log::warn!( + "Cannot start sync thread for {email:?} because client is not initialized!" + ); + return Ok(()); + }; + + // Start thread + log::debug!("Starting sync thread for {email:?}"); + let thread_id = + start_sync_thread(client.clone(), state.broadcast_sender.clone()).await?; + state.running_sync_threads.insert(email, thread_id); + } } Ok(()) } diff --git a/matrixgw_backend/src/matrix_connection/mod.rs b/matrixgw_backend/src/matrix_connection/mod.rs index c929f1b..29194b1 100644 --- a/matrixgw_backend/src/matrix_connection/mod.rs +++ b/matrixgw_backend/src/matrix_connection/mod.rs @@ -1,2 +1,3 @@ pub mod matrix_client; pub mod matrix_manager; +pub mod sync_thread; diff --git a/matrixgw_backend/src/matrix_connection/sync_thread.rs b/matrixgw_backend/src/matrix_connection/sync_thread.rs new file mode 100644 index 0000000..9169cb6 --- /dev/null +++ b/matrixgw_backend/src/matrix_connection/sync_thread.rs @@ -0,0 +1,32 @@ +//! # Matrix sync thread +//! +//! This file contains the logic performed by the threads that synchronize with Matrix account. + +use crate::broadcast_messages::BroadcastSender; +use crate::matrix_connection::matrix_client::MatrixClient; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct MatrixSyncTaskID(uuid::Uuid); + +/// Start synchronization thread for a given user +pub async fn start_sync_thread( + client: MatrixClient, + tx: BroadcastSender, +) -> anyhow::Result { + let task_id = MatrixSyncTaskID(uuid::Uuid::new_v4()); + let task_id_clone = task_id.clone(); + + tokio::task::spawn(async move { + sync_thread_task(task_id_clone, client, tx).await; + }); + + Ok(task_id) +} + +/// Sync thread function for a single function +async fn sync_thread_task(task_id: MatrixSyncTaskID, client: MatrixClient, _tx: BroadcastSender) { + loop { + println!("TODO : sync actions {task_id:?} {:?}", client.email); + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + } +} diff --git a/matrixgw_frontend/src/api/MatrixSyncApi.ts b/matrixgw_frontend/src/api/MatrixSyncApi.ts new file mode 100644 index 0000000..600a4e6 --- /dev/null +++ b/matrixgw_frontend/src/api/MatrixSyncApi.ts @@ -0,0 +1,13 @@ +import { APIClient } from "./ApiClient"; + +export class MatrixSyncApi { + /** + * Force sync thread startup + */ + static async Start(): Promise { + await APIClient.exec({ + method: "POST", + uri: "/matrix_sync/start", + }); + } +} diff --git a/matrixgw_frontend/src/routes/HomeRoute.tsx b/matrixgw_frontend/src/routes/HomeRoute.tsx index ee0ddf1..f21be44 100644 --- a/matrixgw_frontend/src/routes/HomeRoute.tsx +++ b/matrixgw_frontend/src/routes/HomeRoute.tsx @@ -1,3 +1,6 @@ +import { APIClient } from "../api/ApiClient"; +import { MatrixSyncApi } from "../api/MatrixSyncApi"; +import { AsyncWidget } from "../widgets/AsyncWidget"; import { useUserInfo } from "../widgets/dashboard/BaseAuthenticatedPage"; import { NotLinkedAccountMessage } from "../widgets/NotLinkedAccountMessage"; @@ -6,5 +9,15 @@ export function HomeRoute(): React.ReactElement { if (!user.info.matrix_user_id) return ; - return

Todo home route

; + return ( +

+ Todo home route{" "} + <>sync started} + /> +

+ ); }