diff --git a/matrixgw_backend/src/matrix_connection/matrix_client.rs b/matrixgw_backend/src/matrix_connection/matrix_client.rs index 58677a8..dc59884 100644 --- a/matrixgw_backend/src/matrix_connection/matrix_client.rs +++ b/matrixgw_backend/src/matrix_connection/matrix_client.rs @@ -10,12 +10,14 @@ use matrix_sdk::authentication::oauth::{ }; use matrix_sdk::config::SyncSettings; use matrix_sdk::encryption::recovery::RecoveryState; +use matrix_sdk::event_handler::{EventHandler, EventHandlerHandle, SyncEvent}; use matrix_sdk::ruma::presence::PresenceState; use matrix_sdk::ruma::serde::Raw; use matrix_sdk::ruma::{DeviceId, UserId}; use matrix_sdk::sync::SyncResponse; -use matrix_sdk::{Client, ClientBuildError}; +use matrix_sdk::{Client, ClientBuildError, SendOutsideWasm}; use ractor::ActorRef; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::pin::Pin; use url::Url; @@ -255,23 +257,32 @@ impl MatrixClient { pub async fn setup_background_session_save(&self) { let this = self.clone(); tokio::spawn(async move { - while let Ok(update) = this.client.subscribe_to_session_changes().recv().await { - match update { - matrix_sdk::SessionChange::UnknownToken { soft_logout } => { - log::warn!("Received an unknown token error; soft logout? {soft_logout:?}"); - if let Err(e) = this - .manager - .cast(MatrixManagerMsg::DisconnectClient(this.email)) - { - log::warn!("Failed to propagate invalid token error: {e}"); + loop { + match this.client.subscribe_to_session_changes().recv().await { + Ok(update) => match update { + matrix_sdk::SessionChange::UnknownToken { soft_logout } => { + log::warn!( + "Received an unknown token error; soft logout? {soft_logout:?}" + ); + if let Err(e) = this + .manager + .cast(MatrixManagerMsg::DisconnectClient(this.email)) + { + log::warn!("Failed to propagate invalid token error: {e}"); + } + break; } - break; - } - matrix_sdk::SessionChange::TokensRefreshed => { - // The tokens have been refreshed, persist them to disk. - if let Err(err) = this.save_stored_session().await { - log::error!("Unable to store a session in the background: {err}"); + matrix_sdk::SessionChange::TokensRefreshed => { + // The tokens have been refreshed, persist them to disk. + if let Err(err) = this.save_stored_session().await { + log::error!("Unable to store a session in the background: {err}"); + } } + }, + Err(e) => { + log::error!("[!] Session change error: {e}"); + log::error!("Session change background service INTERRUPTED!"); + return; } } } @@ -369,4 +380,19 @@ impl MatrixClient { ) -> Pin>>> { Box::pin(self.client.sync_stream(Self::sync_settings()).await) } + + /// Add new Matrix event handler + #[must_use] + pub fn add_event_handler(&self, handler: H) -> EventHandlerHandle + where + Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static, + H: EventHandler, + { + self.client.add_event_handler(handler) + } + + /// Remove Matrix event handler + pub fn remove_event_handler(&self, handle: EventHandlerHandle) { + self.client.remove_event_handler(handle) + } } diff --git a/matrixgw_backend/src/matrix_connection/matrix_manager.rs b/matrixgw_backend/src/matrix_connection/matrix_manager.rs index 4c6d940..c7a3ccb 100644 --- a/matrixgw_backend/src/matrix_connection/matrix_manager.rs +++ b/matrixgw_backend/src/matrix_connection/matrix_manager.rs @@ -15,6 +15,7 @@ pub enum MatrixManagerMsg { GetClient(UserEmail, RpcReplyPort>), DisconnectClient(UserEmail), StartSyncThread(UserEmail), + SyncThreadTerminated(UserEmail, MatrixSyncTaskID), } pub struct MatrixManagerActor; @@ -112,9 +113,18 @@ impl Actor for MatrixManagerActor { // Start thread log::debug!("Starting sync thread for {email:?}"); let thread_id = - start_sync_thread(client.clone(), state.broadcast_sender.clone()).await?; + start_sync_thread(client.clone(), state.broadcast_sender.clone(), myself) + .await?; state.running_sync_threads.insert(email, thread_id); } + MatrixManagerMsg::SyncThreadTerminated(email, task_id) => { + if state.running_sync_threads.get(&email) == Some(&task_id) { + log::info!( + "Sync thread {task_id:?} has been terminated, removing it from the list..." + ); + state.running_sync_threads.remove(&email); + } + } } Ok(()) } diff --git a/matrixgw_backend/src/matrix_connection/sync_thread.rs b/matrixgw_backend/src/matrix_connection/sync_thread.rs index ac9b4e2..e8f07a8 100644 --- a/matrixgw_backend/src/matrix_connection/sync_thread.rs +++ b/matrixgw_backend/src/matrix_connection/sync_thread.rs @@ -4,7 +4,9 @@ use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; use crate::matrix_connection::matrix_client::MatrixClient; +use crate::matrix_connection::matrix_manager::MatrixManagerMsg; use futures_util::StreamExt; +use ractor::ActorRef; #[derive(Clone, Debug, Eq, PartialEq)] pub struct MatrixSyncTaskID(uuid::Uuid); @@ -13,19 +15,25 @@ pub struct MatrixSyncTaskID(uuid::Uuid); pub async fn start_sync_thread( client: MatrixClient, tx: BroadcastSender, + manager: ActorRef, ) -> anyhow::Result { let task_id = MatrixSyncTaskID(uuid::Uuid::new_v4()); let task_id_clone = task_id.clone(); tokio::task::spawn(async move { - sync_thread_task(task_id_clone, client, tx).await; + sync_thread_task(task_id_clone, client, tx, manager).await; }); Ok(task_id) } /// Sync thread function for a single function -async fn sync_thread_task(id: MatrixSyncTaskID, client: MatrixClient, tx: BroadcastSender) { +async fn sync_thread_task( + id: MatrixSyncTaskID, + client: MatrixClient, + tx: BroadcastSender, + manager: ActorRef, +) { let mut rx = tx.subscribe(); log::info!("Sync thread {id:?} started for user {:?}", client.email); @@ -38,6 +46,8 @@ async fn sync_thread_task(id: MatrixSyncTaskID, client: MatrixClient, tx: Broadc let mut sync_stream = client.sync_stream().await; + //let room_message_handle = client.add_event_handler(); + loop { tokio::select! { // Message from tokio broadcast @@ -66,7 +76,16 @@ async fn sync_thread_task(id: MatrixSyncTaskID, client: MatrixClient, tx: Broadc } } + //client.remove_event_handler(room_message_handle); + + // Notify manager about termination, so this thread can be removed from the list log::info!("Sync thread {id:?} terminated!"); + if let Err(e) = ractor::cast!( + manager, + MatrixManagerMsg::SyncThreadTerminated(client.email.clone(), id.clone()) + ) { + log::error!("Failed to notify Matrix manager about thread termination! {e}"); + } if let Err(e) = tx.send(BroadcastMessage::SyncThreadStopped(id)) { log::warn!("Failed to notify that synchronization thread has been interrupted! {e}") }