From 79d4482ea4a202432dce56bf7c7a2c481d826963 Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Wed, 19 Nov 2025 10:27:46 +0100 Subject: [PATCH] Sync threads can be interrupted --- .../src/matrix_connection/sync_thread.rs | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/matrixgw_backend/src/matrix_connection/sync_thread.rs b/matrixgw_backend/src/matrix_connection/sync_thread.rs index 9169cb6..99af254 100644 --- a/matrixgw_backend/src/matrix_connection/sync_thread.rs +++ b/matrixgw_backend/src/matrix_connection/sync_thread.rs @@ -2,7 +2,7 @@ //! //! This file contains the logic performed by the threads that synchronize with Matrix account. -use crate::broadcast_messages::BroadcastSender; +use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; use crate::matrix_connection::matrix_client::MatrixClient; #[derive(Clone, Debug, Eq, PartialEq)] @@ -24,9 +24,29 @@ pub async fn start_sync_thread( } /// Sync thread function for a single function -async fn sync_thread_task(task_id: MatrixSyncTaskID, client: MatrixClient, _tx: BroadcastSender) { +async fn sync_thread_task(id: MatrixSyncTaskID, client: MatrixClient, tx: BroadcastSender) { + let mut rx = tx.subscribe(); + + log::info!("Sync thread {id:?} started for user {:?}", client.email); + loop { - println!("TODO : sync actions {task_id:?} {:?}", client.email); - tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + tokio::select! { + // Message from tokio broadcast + msg = rx.recv() => { + match msg { + Ok(BroadcastMessage::StopSyncThread(task_id)) if task_id == id => { + log::info!("A request was received to stop sync task! {id:?} for user {:?}", client.email); + break; + } + Err(e) => { + log::error!("Failed to receive a message from broadcast! {e}"); + return; + } + Ok(_) => {} + } + } + } } + + log::info!("Sync thread {id:?} terminated!"); }