diff --git a/matrixgw_backend/src/broadcast_messages.rs b/matrixgw_backend/src/broadcast_messages.rs index d83d2d3..18328e1 100644 --- a/matrixgw_backend/src/broadcast_messages.rs +++ b/matrixgw_backend/src/broadcast_messages.rs @@ -12,4 +12,6 @@ pub enum BroadcastMessage { APITokenDeleted(APIToken), /// Request a Matrix sync thread to be interrupted StopSyncThread(MatrixSyncTaskID), + /// Matrix sync thread has been interrupted + SyncThreadStopped(MatrixSyncTaskID), } diff --git a/matrixgw_backend/src/extractors/matrix_client_extractor.rs b/matrixgw_backend/src/extractors/matrix_client_extractor.rs index 545db6c..5a25e06 100644 --- a/matrixgw_backend/src/extractors/matrix_client_extractor.rs +++ b/matrixgw_backend/src/extractors/matrix_client_extractor.rs @@ -15,8 +15,8 @@ impl MatrixClientExtractor { pub async fn to_extended_user_info(&self) -> anyhow::Result { Ok(ExtendedUserInfo { user: self.auth.user.clone(), - matrix_user_id: self.client.client.user_id().map(|id| id.to_string()), - matrix_device_id: self.client.client.device_id().map(|id| id.to_string()), + matrix_user_id: self.client.user_id().map(|id| id.to_string()), + matrix_device_id: self.client.device_id().map(|id| id.to_string()), matrix_recovery_state: self.client.recovery_state(), }) } diff --git a/matrixgw_backend/src/matrix_connection/matrix_client.rs b/matrixgw_backend/src/matrix_connection/matrix_client.rs index f7f02f7..58677a8 100644 --- a/matrixgw_backend/src/matrix_connection/matrix_client.rs +++ b/matrixgw_backend/src/matrix_connection/matrix_client.rs @@ -3,15 +3,21 @@ use crate::matrix_connection::matrix_manager::MatrixManagerMsg; use crate::users::UserEmail; use crate::utils::rand_utils::rand_string; use anyhow::Context; +use futures_util::Stream; use matrix_sdk::authentication::oauth::error::OAuthDiscoveryError; use matrix_sdk::authentication::oauth::{ ClientId, OAuthError, OAuthSession, UrlOrQuery, UserSession, }; +use matrix_sdk::config::SyncSettings; use matrix_sdk::encryption::recovery::RecoveryState; +use matrix_sdk::ruma::presence::PresenceState; use matrix_sdk::ruma::serde::Raw; +use matrix_sdk::ruma::{DeviceId, UserId}; +use matrix_sdk::sync::SyncResponse; use matrix_sdk::{Client, ClientBuildError}; use ractor::ActorRef; use serde::{Deserialize, Serialize}; +use std::pin::Pin; use url::Url; /// The full session to persist. @@ -83,7 +89,7 @@ pub struct FinishMatrixAuth { pub struct MatrixClient { manager: ActorRef, pub email: UserEmail, - pub client: Client, + client: Client, } impl MatrixClient { @@ -315,6 +321,16 @@ impl MatrixClient { Ok(()) } + /// Get client Matrix device id + pub fn device_id(&self) -> Option<&DeviceId> { + self.client.device_id() + } + + /// Get client Matrix user id + pub fn user_id(&self) -> Option<&UserId> { + self.client.user_id() + } + /// Get current encryption keys recovery state pub fn recovery_state(&self) -> EncryptionRecoveryState { match self.client.encryption().recovery().state() { @@ -335,4 +351,22 @@ impl MatrixClient { .await .map_err(MatrixClientError::SetRecoveryKey)?) } + + /// Get matrix synchronization settings to use + fn sync_settings() -> SyncSettings { + SyncSettings::default().set_presence(PresenceState::Offline) + } + + /// Perform initial synchronization + pub async fn perform_initial_sync(&self) -> anyhow::Result<()> { + self.client.sync_once(Self::sync_settings()).await?; + Ok(()) + } + + /// Perform routine synchronization + pub async fn sync_stream( + &self, + ) -> Pin>>> { + Box::pin(self.client.sync_stream(Self::sync_settings()).await) + } } diff --git a/matrixgw_backend/src/matrix_connection/sync_thread.rs b/matrixgw_backend/src/matrix_connection/sync_thread.rs index 99af254..ac9b4e2 100644 --- a/matrixgw_backend/src/matrix_connection/sync_thread.rs +++ b/matrixgw_backend/src/matrix_connection/sync_thread.rs @@ -4,6 +4,7 @@ use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; use crate::matrix_connection::matrix_client::MatrixClient; +use futures_util::StreamExt; #[derive(Clone, Debug, Eq, PartialEq)] pub struct MatrixSyncTaskID(uuid::Uuid); @@ -29,6 +30,14 @@ async fn sync_thread_task(id: MatrixSyncTaskID, client: MatrixClient, tx: Broadc log::info!("Sync thread {id:?} started for user {:?}", client.email); + log::info!("Perform initial synchronization..."); + if let Err(e) = client.perform_initial_sync().await { + log::error!("Failed to perform initial Matrix synchronization! {e:?}"); + return; + } + + let mut sync_stream = client.sync_stream().await; + loop { tokio::select! { // Message from tokio broadcast @@ -40,13 +49,25 @@ async fn sync_thread_task(id: MatrixSyncTaskID, client: MatrixClient, tx: Broadc } Err(e) => { log::error!("Failed to receive a message from broadcast! {e}"); - return; + break; } Ok(_) => {} } } + + evt = sync_stream.next() => { + let Some(evt)= evt else { + log::error!("No more Matrix event to process, stopping now..."); + break; + }; + + println!("Sync thread {id:?} event: {:?}", evt); + } } } log::info!("Sync thread {id:?} terminated!"); + if let Err(e) = tx.send(BroadcastMessage::SyncThreadStopped(id)) { + log::warn!("Failed to notify that synchronization thread has been interrupted! {e}") + } }