Notify Matrix manager directly if sync thread is terminated

This commit is contained in:
2025-11-19 13:39:28 +01:00
parent 07f6544a4a
commit 75b6b224bc
3 changed files with 74 additions and 19 deletions

View File

@@ -10,12 +10,14 @@ use matrix_sdk::authentication::oauth::{
}; };
use matrix_sdk::config::SyncSettings; use matrix_sdk::config::SyncSettings;
use matrix_sdk::encryption::recovery::RecoveryState; use matrix_sdk::encryption::recovery::RecoveryState;
use matrix_sdk::event_handler::{EventHandler, EventHandlerHandle, SyncEvent};
use matrix_sdk::ruma::presence::PresenceState; use matrix_sdk::ruma::presence::PresenceState;
use matrix_sdk::ruma::serde::Raw; use matrix_sdk::ruma::serde::Raw;
use matrix_sdk::ruma::{DeviceId, UserId}; use matrix_sdk::ruma::{DeviceId, UserId};
use matrix_sdk::sync::SyncResponse; use matrix_sdk::sync::SyncResponse;
use matrix_sdk::{Client, ClientBuildError}; use matrix_sdk::{Client, ClientBuildError, SendOutsideWasm};
use ractor::ActorRef; use ractor::ActorRef;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::pin::Pin; use std::pin::Pin;
use url::Url; use url::Url;
@@ -255,23 +257,32 @@ impl MatrixClient {
pub async fn setup_background_session_save(&self) { pub async fn setup_background_session_save(&self) {
let this = self.clone(); let this = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(update) = this.client.subscribe_to_session_changes().recv().await { loop {
match update { match this.client.subscribe_to_session_changes().recv().await {
matrix_sdk::SessionChange::UnknownToken { soft_logout } => { Ok(update) => match update {
log::warn!("Received an unknown token error; soft logout? {soft_logout:?}"); matrix_sdk::SessionChange::UnknownToken { soft_logout } => {
if let Err(e) = this log::warn!(
.manager "Received an unknown token error; soft logout? {soft_logout:?}"
.cast(MatrixManagerMsg::DisconnectClient(this.email)) );
{ if let Err(e) = this
log::warn!("Failed to propagate invalid token error: {e}"); .manager
.cast(MatrixManagerMsg::DisconnectClient(this.email))
{
log::warn!("Failed to propagate invalid token error: {e}");
}
break;
} }
break; matrix_sdk::SessionChange::TokensRefreshed => {
} // The tokens have been refreshed, persist them to disk.
matrix_sdk::SessionChange::TokensRefreshed => { if let Err(err) = this.save_stored_session().await {
// The tokens have been refreshed, persist them to disk. log::error!("Unable to store a session in the background: {err}");
if let Err(err) = this.save_stored_session().await { }
log::error!("Unable to store a session in the background: {err}");
} }
},
Err(e) => {
log::error!("[!] Session change error: {e}");
log::error!("Session change background service INTERRUPTED!");
return;
} }
} }
} }
@@ -369,4 +380,19 @@ impl MatrixClient {
) -> Pin<Box<impl Stream<Item = matrix_sdk::Result<SyncResponse>>>> { ) -> Pin<Box<impl Stream<Item = matrix_sdk::Result<SyncResponse>>>> {
Box::pin(self.client.sync_stream(Self::sync_settings()).await) Box::pin(self.client.sync_stream(Self::sync_settings()).await)
} }
/// Add new Matrix event handler
#[must_use]
pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
where
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
H: EventHandler<Ev, Ctx>,
{
self.client.add_event_handler(handler)
}
/// Remove Matrix event handler
pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
self.client.remove_event_handler(handle)
}
} }

View File

@@ -15,6 +15,7 @@ pub enum MatrixManagerMsg {
GetClient(UserEmail, RpcReplyPort<anyhow::Result<MatrixClient>>), GetClient(UserEmail, RpcReplyPort<anyhow::Result<MatrixClient>>),
DisconnectClient(UserEmail), DisconnectClient(UserEmail),
StartSyncThread(UserEmail), StartSyncThread(UserEmail),
SyncThreadTerminated(UserEmail, MatrixSyncTaskID),
} }
pub struct MatrixManagerActor; pub struct MatrixManagerActor;
@@ -112,9 +113,18 @@ impl Actor for MatrixManagerActor {
// Start thread // Start thread
log::debug!("Starting sync thread for {email:?}"); log::debug!("Starting sync thread for {email:?}");
let thread_id = let thread_id =
start_sync_thread(client.clone(), state.broadcast_sender.clone()).await?; start_sync_thread(client.clone(), state.broadcast_sender.clone(), myself)
.await?;
state.running_sync_threads.insert(email, thread_id); state.running_sync_threads.insert(email, thread_id);
} }
MatrixManagerMsg::SyncThreadTerminated(email, task_id) => {
if state.running_sync_threads.get(&email) == Some(&task_id) {
log::info!(
"Sync thread {task_id:?} has been terminated, removing it from the list..."
);
state.running_sync_threads.remove(&email);
}
}
} }
Ok(()) Ok(())
} }

View File

@@ -4,7 +4,9 @@
use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; use crate::broadcast_messages::{BroadcastMessage, BroadcastSender};
use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::matrix_client::MatrixClient;
use crate::matrix_connection::matrix_manager::MatrixManagerMsg;
use futures_util::StreamExt; use futures_util::StreamExt;
use ractor::ActorRef;
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub struct MatrixSyncTaskID(uuid::Uuid); pub struct MatrixSyncTaskID(uuid::Uuid);
@@ -13,19 +15,25 @@ pub struct MatrixSyncTaskID(uuid::Uuid);
pub async fn start_sync_thread( pub async fn start_sync_thread(
client: MatrixClient, client: MatrixClient,
tx: BroadcastSender, tx: BroadcastSender,
manager: ActorRef<MatrixManagerMsg>,
) -> anyhow::Result<MatrixSyncTaskID> { ) -> anyhow::Result<MatrixSyncTaskID> {
let task_id = MatrixSyncTaskID(uuid::Uuid::new_v4()); let task_id = MatrixSyncTaskID(uuid::Uuid::new_v4());
let task_id_clone = task_id.clone(); let task_id_clone = task_id.clone();
tokio::task::spawn(async move { tokio::task::spawn(async move {
sync_thread_task(task_id_clone, client, tx).await; sync_thread_task(task_id_clone, client, tx, manager).await;
}); });
Ok(task_id) Ok(task_id)
} }
/// Sync thread function for a single function /// Sync thread function for a single function
async fn sync_thread_task(id: MatrixSyncTaskID, client: MatrixClient, tx: BroadcastSender) { async fn sync_thread_task(
id: MatrixSyncTaskID,
client: MatrixClient,
tx: BroadcastSender,
manager: ActorRef<MatrixManagerMsg>,
) {
let mut rx = tx.subscribe(); let mut rx = tx.subscribe();
log::info!("Sync thread {id:?} started for user {:?}", client.email); log::info!("Sync thread {id:?} started for user {:?}", client.email);
@@ -38,6 +46,8 @@ async fn sync_thread_task(id: MatrixSyncTaskID, client: MatrixClient, tx: Broadc
let mut sync_stream = client.sync_stream().await; let mut sync_stream = client.sync_stream().await;
//let room_message_handle = client.add_event_handler();
loop { loop {
tokio::select! { tokio::select! {
// Message from tokio broadcast // Message from tokio broadcast
@@ -66,7 +76,16 @@ async fn sync_thread_task(id: MatrixSyncTaskID, client: MatrixClient, tx: Broadc
} }
} }
//client.remove_event_handler(room_message_handle);
// Notify manager about termination, so this thread can be removed from the list
log::info!("Sync thread {id:?} terminated!"); log::info!("Sync thread {id:?} terminated!");
if let Err(e) = ractor::cast!(
manager,
MatrixManagerMsg::SyncThreadTerminated(client.email.clone(), id.clone())
) {
log::error!("Failed to notify Matrix manager about thread termination! {e}");
}
if let Err(e) = tx.send(BroadcastMessage::SyncThreadStopped(id)) { if let Err(e) = tx.send(BroadcastMessage::SyncThreadStopped(id)) {
log::warn!("Failed to notify that synchronization thread has been interrupted! {e}") log::warn!("Failed to notify that synchronization thread has been interrupted! {e}")
} }