4 Commits

5 changed files with 169 additions and 25 deletions

View File

@@ -12,4 +12,6 @@ pub enum BroadcastMessage {
APITokenDeleted(APIToken), APITokenDeleted(APIToken),
/// Request a Matrix sync thread to be interrupted /// Request a Matrix sync thread to be interrupted
StopSyncThread(MatrixSyncTaskID), StopSyncThread(MatrixSyncTaskID),
/// Matrix sync thread has been interrupted
SyncThreadStopped(MatrixSyncTaskID),
} }

View File

@@ -15,8 +15,8 @@ impl MatrixClientExtractor {
pub async fn to_extended_user_info(&self) -> anyhow::Result<ExtendedUserInfo> { pub async fn to_extended_user_info(&self) -> anyhow::Result<ExtendedUserInfo> {
Ok(ExtendedUserInfo { Ok(ExtendedUserInfo {
user: self.auth.user.clone(), user: self.auth.user.clone(),
matrix_user_id: self.client.client.user_id().map(|id| id.to_string()), matrix_user_id: self.client.user_id().map(|id| id.to_string()),
matrix_device_id: self.client.client.device_id().map(|id| id.to_string()), matrix_device_id: self.client.device_id().map(|id| id.to_string()),
matrix_recovery_state: self.client.recovery_state(), matrix_recovery_state: self.client.recovery_state(),
}) })
} }

View File

@@ -3,15 +3,23 @@ use crate::matrix_connection::matrix_manager::MatrixManagerMsg;
use crate::users::UserEmail; use crate::users::UserEmail;
use crate::utils::rand_utils::rand_string; use crate::utils::rand_utils::rand_string;
use anyhow::Context; use anyhow::Context;
use futures_util::Stream;
use matrix_sdk::authentication::oauth::error::OAuthDiscoveryError; use matrix_sdk::authentication::oauth::error::OAuthDiscoveryError;
use matrix_sdk::authentication::oauth::{ use matrix_sdk::authentication::oauth::{
ClientId, OAuthError, OAuthSession, UrlOrQuery, UserSession, ClientId, OAuthError, OAuthSession, UrlOrQuery, UserSession,
}; };
use matrix_sdk::config::SyncSettings;
use matrix_sdk::encryption::recovery::RecoveryState; use matrix_sdk::encryption::recovery::RecoveryState;
use matrix_sdk::event_handler::{EventHandler, EventHandlerHandle, SyncEvent};
use matrix_sdk::ruma::presence::PresenceState;
use matrix_sdk::ruma::serde::Raw; use matrix_sdk::ruma::serde::Raw;
use matrix_sdk::{Client, ClientBuildError}; use matrix_sdk::ruma::{DeviceId, UserId};
use matrix_sdk::sync::SyncResponse;
use matrix_sdk::{Client, ClientBuildError, SendOutsideWasm};
use ractor::ActorRef; use ractor::ActorRef;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::pin::Pin;
use url::Url; use url::Url;
/// The full session to persist. /// The full session to persist.
@@ -83,7 +91,7 @@ pub struct FinishMatrixAuth {
pub struct MatrixClient { pub struct MatrixClient {
manager: ActorRef<MatrixManagerMsg>, manager: ActorRef<MatrixManagerMsg>,
pub email: UserEmail, pub email: UserEmail,
pub client: Client, client: Client,
} }
impl MatrixClient { impl MatrixClient {
@@ -249,23 +257,32 @@ impl MatrixClient {
pub async fn setup_background_session_save(&self) { pub async fn setup_background_session_save(&self) {
let this = self.clone(); let this = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(update) = this.client.subscribe_to_session_changes().recv().await { loop {
match update { match this.client.subscribe_to_session_changes().recv().await {
matrix_sdk::SessionChange::UnknownToken { soft_logout } => { Ok(update) => match update {
log::warn!("Received an unknown token error; soft logout? {soft_logout:?}"); matrix_sdk::SessionChange::UnknownToken { soft_logout } => {
if let Err(e) = this log::warn!(
.manager "Received an unknown token error; soft logout? {soft_logout:?}"
.cast(MatrixManagerMsg::DisconnectClient(this.email)) );
{ if let Err(e) = this
log::warn!("Failed to propagate invalid token error: {e}"); .manager
.cast(MatrixManagerMsg::DisconnectClient(this.email))
{
log::warn!("Failed to propagate invalid token error: {e}");
}
break;
} }
break; matrix_sdk::SessionChange::TokensRefreshed => {
} // The tokens have been refreshed, persist them to disk.
matrix_sdk::SessionChange::TokensRefreshed => { if let Err(err) = this.save_stored_session().await {
// The tokens have been refreshed, persist them to disk. log::error!("Unable to store a session in the background: {err}");
if let Err(err) = this.save_stored_session().await { }
log::error!("Unable to store a session in the background: {err}");
} }
},
Err(e) => {
log::error!("[!] Session change error: {e}");
log::error!("Session change background service INTERRUPTED!");
return;
} }
} }
} }
@@ -298,6 +315,11 @@ impl MatrixClient {
Ok(()) Ok(())
} }
/// Check whether a user is currently connected to this client or not
pub fn is_client_connected(&self) -> bool {
self.client.is_active()
}
/// Disconnect user from client /// Disconnect user from client
pub async fn disconnect(self) -> anyhow::Result<()> { pub async fn disconnect(self) -> anyhow::Result<()> {
if let Err(e) = self.client.logout().await { if let Err(e) = self.client.logout().await {
@@ -310,6 +332,16 @@ impl MatrixClient {
Ok(()) Ok(())
} }
/// Get client Matrix device id
pub fn device_id(&self) -> Option<&DeviceId> {
self.client.device_id()
}
/// Get client Matrix user id
pub fn user_id(&self) -> Option<&UserId> {
self.client.user_id()
}
/// Get current encryption keys recovery state /// Get current encryption keys recovery state
pub fn recovery_state(&self) -> EncryptionRecoveryState { pub fn recovery_state(&self) -> EncryptionRecoveryState {
match self.client.encryption().recovery().state() { match self.client.encryption().recovery().state() {
@@ -330,4 +362,37 @@ impl MatrixClient {
.await .await
.map_err(MatrixClientError::SetRecoveryKey)?) .map_err(MatrixClientError::SetRecoveryKey)?)
} }
/// Get matrix synchronization settings to use
fn sync_settings() -> SyncSettings {
SyncSettings::default().set_presence(PresenceState::Offline)
}
/// Perform initial synchronization
pub async fn perform_initial_sync(&self) -> anyhow::Result<()> {
self.client.sync_once(Self::sync_settings()).await?;
Ok(())
}
/// Perform routine synchronization
pub async fn sync_stream(
&self,
) -> Pin<Box<impl Stream<Item = matrix_sdk::Result<SyncResponse>>>> {
Box::pin(self.client.sync_stream(Self::sync_settings()).await)
}
/// Add new Matrix event handler
#[must_use]
pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
where
Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
H: EventHandler<Ev, Ctx>,
{
self.client.add_event_handler(handler)
}
/// Remove Matrix event handler
pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
self.client.remove_event_handler(handle)
}
} }

View File

@@ -15,6 +15,7 @@ pub enum MatrixManagerMsg {
GetClient(UserEmail, RpcReplyPort<anyhow::Result<MatrixClient>>), GetClient(UserEmail, RpcReplyPort<anyhow::Result<MatrixClient>>),
DisconnectClient(UserEmail), DisconnectClient(UserEmail),
StartSyncThread(UserEmail), StartSyncThread(UserEmail),
SyncThreadTerminated(UserEmail, MatrixSyncTaskID),
} }
pub struct MatrixManagerActor; pub struct MatrixManagerActor;
@@ -102,12 +103,28 @@ impl Actor for MatrixManagerActor {
return Ok(()); return Ok(());
}; };
if !client.is_client_connected() {
log::warn!(
"Cannot start sync thread for {email:?} because Matrix account is not set!"
);
return Ok(());
}
// Start thread // Start thread
log::debug!("Starting sync thread for {email:?}"); log::debug!("Starting sync thread for {email:?}");
let thread_id = let thread_id =
start_sync_thread(client.clone(), state.broadcast_sender.clone()).await?; start_sync_thread(client.clone(), state.broadcast_sender.clone(), myself)
.await?;
state.running_sync_threads.insert(email, thread_id); state.running_sync_threads.insert(email, thread_id);
} }
MatrixManagerMsg::SyncThreadTerminated(email, task_id) => {
if state.running_sync_threads.get(&email) == Some(&task_id) {
log::info!(
"Sync thread {task_id:?} has been terminated, removing it from the list..."
);
state.running_sync_threads.remove(&email);
}
}
} }
Ok(()) Ok(())
} }

View File

@@ -2,8 +2,11 @@
//! //!
//! This file contains the logic performed by the threads that synchronize with Matrix account. //! This file contains the logic performed by the threads that synchronize with Matrix account.
use crate::broadcast_messages::BroadcastSender; use crate::broadcast_messages::{BroadcastMessage, BroadcastSender};
use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::matrix_client::MatrixClient;
use crate::matrix_connection::matrix_manager::MatrixManagerMsg;
use futures_util::StreamExt;
use ractor::ActorRef;
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub struct MatrixSyncTaskID(uuid::Uuid); pub struct MatrixSyncTaskID(uuid::Uuid);
@@ -12,21 +15,78 @@ pub struct MatrixSyncTaskID(uuid::Uuid);
pub async fn start_sync_thread( pub async fn start_sync_thread(
client: MatrixClient, client: MatrixClient,
tx: BroadcastSender, tx: BroadcastSender,
manager: ActorRef<MatrixManagerMsg>,
) -> anyhow::Result<MatrixSyncTaskID> { ) -> anyhow::Result<MatrixSyncTaskID> {
let task_id = MatrixSyncTaskID(uuid::Uuid::new_v4()); let task_id = MatrixSyncTaskID(uuid::Uuid::new_v4());
let task_id_clone = task_id.clone(); let task_id_clone = task_id.clone();
tokio::task::spawn(async move { tokio::task::spawn(async move {
sync_thread_task(task_id_clone, client, tx).await; sync_thread_task(task_id_clone, client, tx, manager).await;
}); });
Ok(task_id) Ok(task_id)
} }
/// Sync thread function for a single function /// Sync thread function for a single function
async fn sync_thread_task(task_id: MatrixSyncTaskID, client: MatrixClient, _tx: BroadcastSender) { async fn sync_thread_task(
id: MatrixSyncTaskID,
client: MatrixClient,
tx: BroadcastSender,
manager: ActorRef<MatrixManagerMsg>,
) {
let mut rx = tx.subscribe();
log::info!("Sync thread {id:?} started for user {:?}", client.email);
log::info!("Perform initial synchronization...");
if let Err(e) = client.perform_initial_sync().await {
log::error!("Failed to perform initial Matrix synchronization! {e:?}");
return;
}
let mut sync_stream = client.sync_stream().await;
//let room_message_handle = client.add_event_handler();
loop { loop {
println!("TODO : sync actions {task_id:?} {:?}", client.email); tokio::select! {
tokio::time::sleep(std::time::Duration::from_millis(1000)).await; // Message from tokio broadcast
msg = rx.recv() => {
match msg {
Ok(BroadcastMessage::StopSyncThread(task_id)) if task_id == id => {
log::info!("A request was received to stop sync task! {id:?} for user {:?}", client.email);
break;
}
Err(e) => {
log::error!("Failed to receive a message from broadcast! {e}");
break;
}
Ok(_) => {}
}
}
evt = sync_stream.next() => {
let Some(evt)= evt else {
log::error!("No more Matrix event to process, stopping now...");
break;
};
println!("Sync thread {id:?} event: {:?}", evt);
}
}
}
//client.remove_event_handler(room_message_handle);
// Notify manager about termination, so this thread can be removed from the list
log::info!("Sync thread {id:?} terminated!");
if let Err(e) = ractor::cast!(
manager,
MatrixManagerMsg::SyncThreadTerminated(client.email.clone(), id.clone())
) {
log::error!("Failed to notify Matrix manager about thread termination! {e}");
}
if let Err(e) = tx.send(BroadcastMessage::SyncThreadStopped(id)) {
log::warn!("Failed to notify that synchronization thread has been interrupted! {e}")
} }
} }