Ready to implement sync thread logic

This commit is contained in:
2025-11-18 22:17:39 +01:00
parent 5c13cffe08
commit c9b703bea3
9 changed files with 126 additions and 2 deletions

View File

@@ -1,3 +1,4 @@
use crate::matrix_connection::sync_thread::MatrixSyncTaskID;
use crate::users::{APIToken, UserEmail};
pub type BroadcastSender = tokio::sync::broadcast::Sender<BroadcastMessage>;
@@ -9,4 +10,6 @@ pub enum BroadcastMessage {
UserDisconnectedFromMatrix(UserEmail),
/// API token has been deleted
APITokenDeleted(APIToken),
/// Request a Matrix sync thread to be interrupted
StopSyncThread(MatrixSyncTaskID),
}

View File

@@ -0,0 +1,22 @@
use crate::controllers::HttpResult;
use crate::extractors::matrix_client_extractor::MatrixClientExtractor;
use crate::matrix_connection::matrix_manager::MatrixManagerMsg;
use actix_web::{HttpResponse, web};
use ractor::ActorRef;
/// Start sync thread
pub async fn start_sync(
client: MatrixClientExtractor,
manager: web::Data<ActorRef<MatrixManagerMsg>>,
) -> HttpResult {
match ractor::cast!(
manager,
MatrixManagerMsg::StartSyncThread(client.auth.user.email.clone())
) {
Ok(_) => Ok(HttpResponse::Accepted().finish()),
Err(e) => {
log::error!("Failed to start sync: {e}");
Ok(HttpResponse::InternalServerError().finish())
}
}
}

View File

@@ -4,6 +4,7 @@ use std::error::Error;
pub mod auth_controller;
pub mod matrix_link_controller;
pub mod matrix_sync_thread_controller;
pub mod server_controller;
pub mod tokens_controller;

View File

@@ -10,7 +10,8 @@ use matrixgw_backend::app_config::AppConfig;
use matrixgw_backend::broadcast_messages::BroadcastMessage;
use matrixgw_backend::constants;
use matrixgw_backend::controllers::{
auth_controller, matrix_link_controller, server_controller, tokens_controller,
auth_controller, matrix_link_controller, matrix_sync_thread_controller, server_controller,
tokens_controller,
};
use matrixgw_backend::matrix_connection::matrix_manager::MatrixManagerActor;
use matrixgw_backend::users::User;
@@ -119,6 +120,11 @@ async fn main() -> std::io::Result<()> {
"/api/token/{id}",
web::delete().to(tokens_controller::delete),
)
// Matrix synchronization controller
.route(
"/api/matrix_sync/start",
web::post().to(matrix_sync_thread_controller::start_sync),
)
})
.workers(4)
.bind(&AppConfig::get().listen_address)?

View File

@@ -1,5 +1,6 @@
use crate::broadcast_messages::{BroadcastMessage, BroadcastSender};
use crate::matrix_connection::matrix_client::MatrixClient;
use crate::matrix_connection::sync_thread::{MatrixSyncTaskID, start_sync_thread};
use crate::users::UserEmail;
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
use std::collections::HashMap;
@@ -7,11 +8,13 @@ use std::collections::HashMap;
pub struct MatrixManagerState {
pub broadcast_sender: BroadcastSender,
pub clients: HashMap<UserEmail, MatrixClient>,
pub running_sync_threads: HashMap<UserEmail, MatrixSyncTaskID>,
}
pub enum MatrixManagerMsg {
GetClient(UserEmail, RpcReplyPort<anyhow::Result<MatrixClient>>),
DisconnectClient(UserEmail),
StartSyncThread(UserEmail),
}
pub struct MatrixManagerActor;
@@ -29,6 +32,7 @@ impl Actor for MatrixManagerActor {
Ok(MatrixManagerState {
broadcast_sender: args,
clients: HashMap::new(),
running_sync_threads: Default::default(),
})
}
@@ -62,6 +66,15 @@ impl Actor for MatrixManagerActor {
}
MatrixManagerMsg::DisconnectClient(email) => {
if let Some(c) = state.clients.remove(&email) {
// Stop sync thread (if running)
if let Some(id) = state.running_sync_threads.remove(&email) {
state
.broadcast_sender
.send(BroadcastMessage::StopSyncThread(id))
.ok();
}
// Disconnect client
if let Err(e) = c.disconnect().await {
log::error!("Failed to disconnect client: {e}");
}
@@ -75,6 +88,26 @@ impl Actor for MatrixManagerActor {
}
}
}
MatrixManagerMsg::StartSyncThread(email) => {
// Do nothing if task is already running
if state.running_sync_threads.contains_key(&email) {
log::debug!("Not starting sync thread for {email:?} as it is already running");
return Ok(());
}
let Some(client) = state.clients.get(&email) else {
log::warn!(
"Cannot start sync thread for {email:?} because client is not initialized!"
);
return Ok(());
};
// Start thread
log::debug!("Starting sync thread for {email:?}");
let thread_id =
start_sync_thread(client.clone(), state.broadcast_sender.clone()).await?;
state.running_sync_threads.insert(email, thread_id);
}
}
Ok(())
}

View File

@@ -1,2 +1,3 @@
pub mod matrix_client;
pub mod matrix_manager;
pub mod sync_thread;

View File

@@ -0,0 +1,32 @@
//! # Matrix sync thread
//!
//! This file contains the logic performed by the threads that synchronize with Matrix account.
use crate::broadcast_messages::BroadcastSender;
use crate::matrix_connection::matrix_client::MatrixClient;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct MatrixSyncTaskID(uuid::Uuid);
/// Start synchronization thread for a given user
pub async fn start_sync_thread(
client: MatrixClient,
tx: BroadcastSender,
) -> anyhow::Result<MatrixSyncTaskID> {
let task_id = MatrixSyncTaskID(uuid::Uuid::new_v4());
let task_id_clone = task_id.clone();
tokio::task::spawn(async move {
sync_thread_task(task_id_clone, client, tx).await;
});
Ok(task_id)
}
/// Sync thread function for a single function
async fn sync_thread_task(task_id: MatrixSyncTaskID, client: MatrixClient, _tx: BroadcastSender) {
loop {
println!("TODO : sync actions {task_id:?} {:?}", client.email);
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
}