WIP sync thread implementation

This commit is contained in:
2025-11-19 11:37:57 +01:00
parent 5bf7c7f8df
commit 07f6544a4a
4 changed files with 61 additions and 4 deletions

View File

@@ -12,4 +12,6 @@ pub enum BroadcastMessage {
APITokenDeleted(APIToken), APITokenDeleted(APIToken),
/// Request a Matrix sync thread to be interrupted /// Request a Matrix sync thread to be interrupted
StopSyncThread(MatrixSyncTaskID), StopSyncThread(MatrixSyncTaskID),
/// Matrix sync thread has been interrupted
SyncThreadStopped(MatrixSyncTaskID),
} }

View File

@@ -15,8 +15,8 @@ impl MatrixClientExtractor {
pub async fn to_extended_user_info(&self) -> anyhow::Result<ExtendedUserInfo> { pub async fn to_extended_user_info(&self) -> anyhow::Result<ExtendedUserInfo> {
Ok(ExtendedUserInfo { Ok(ExtendedUserInfo {
user: self.auth.user.clone(), user: self.auth.user.clone(),
matrix_user_id: self.client.client.user_id().map(|id| id.to_string()), matrix_user_id: self.client.user_id().map(|id| id.to_string()),
matrix_device_id: self.client.client.device_id().map(|id| id.to_string()), matrix_device_id: self.client.device_id().map(|id| id.to_string()),
matrix_recovery_state: self.client.recovery_state(), matrix_recovery_state: self.client.recovery_state(),
}) })
} }

View File

@@ -3,15 +3,21 @@ use crate::matrix_connection::matrix_manager::MatrixManagerMsg;
use crate::users::UserEmail; use crate::users::UserEmail;
use crate::utils::rand_utils::rand_string; use crate::utils::rand_utils::rand_string;
use anyhow::Context; use anyhow::Context;
use futures_util::Stream;
use matrix_sdk::authentication::oauth::error::OAuthDiscoveryError; use matrix_sdk::authentication::oauth::error::OAuthDiscoveryError;
use matrix_sdk::authentication::oauth::{ use matrix_sdk::authentication::oauth::{
ClientId, OAuthError, OAuthSession, UrlOrQuery, UserSession, ClientId, OAuthError, OAuthSession, UrlOrQuery, UserSession,
}; };
use matrix_sdk::config::SyncSettings;
use matrix_sdk::encryption::recovery::RecoveryState; use matrix_sdk::encryption::recovery::RecoveryState;
use matrix_sdk::ruma::presence::PresenceState;
use matrix_sdk::ruma::serde::Raw; use matrix_sdk::ruma::serde::Raw;
use matrix_sdk::ruma::{DeviceId, UserId};
use matrix_sdk::sync::SyncResponse;
use matrix_sdk::{Client, ClientBuildError}; use matrix_sdk::{Client, ClientBuildError};
use ractor::ActorRef; use ractor::ActorRef;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::pin::Pin;
use url::Url; use url::Url;
/// The full session to persist. /// The full session to persist.
@@ -83,7 +89,7 @@ pub struct FinishMatrixAuth {
pub struct MatrixClient { pub struct MatrixClient {
manager: ActorRef<MatrixManagerMsg>, manager: ActorRef<MatrixManagerMsg>,
pub email: UserEmail, pub email: UserEmail,
pub client: Client, client: Client,
} }
impl MatrixClient { impl MatrixClient {
@@ -315,6 +321,16 @@ impl MatrixClient {
Ok(()) Ok(())
} }
/// Get client Matrix device id
pub fn device_id(&self) -> Option<&DeviceId> {
self.client.device_id()
}
/// Get client Matrix user id
pub fn user_id(&self) -> Option<&UserId> {
self.client.user_id()
}
/// Get current encryption keys recovery state /// Get current encryption keys recovery state
pub fn recovery_state(&self) -> EncryptionRecoveryState { pub fn recovery_state(&self) -> EncryptionRecoveryState {
match self.client.encryption().recovery().state() { match self.client.encryption().recovery().state() {
@@ -335,4 +351,22 @@ impl MatrixClient {
.await .await
.map_err(MatrixClientError::SetRecoveryKey)?) .map_err(MatrixClientError::SetRecoveryKey)?)
} }
/// Get matrix synchronization settings to use
fn sync_settings() -> SyncSettings {
SyncSettings::default().set_presence(PresenceState::Offline)
}
/// Perform initial synchronization
pub async fn perform_initial_sync(&self) -> anyhow::Result<()> {
self.client.sync_once(Self::sync_settings()).await?;
Ok(())
}
/// Perform routine synchronization
pub async fn sync_stream(
&self,
) -> Pin<Box<impl Stream<Item = matrix_sdk::Result<SyncResponse>>>> {
Box::pin(self.client.sync_stream(Self::sync_settings()).await)
}
} }

View File

@@ -4,6 +4,7 @@
use crate::broadcast_messages::{BroadcastMessage, BroadcastSender}; use crate::broadcast_messages::{BroadcastMessage, BroadcastSender};
use crate::matrix_connection::matrix_client::MatrixClient; use crate::matrix_connection::matrix_client::MatrixClient;
use futures_util::StreamExt;
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub struct MatrixSyncTaskID(uuid::Uuid); pub struct MatrixSyncTaskID(uuid::Uuid);
@@ -29,6 +30,14 @@ async fn sync_thread_task(id: MatrixSyncTaskID, client: MatrixClient, tx: Broadc
log::info!("Sync thread {id:?} started for user {:?}", client.email); log::info!("Sync thread {id:?} started for user {:?}", client.email);
log::info!("Perform initial synchronization...");
if let Err(e) = client.perform_initial_sync().await {
log::error!("Failed to perform initial Matrix synchronization! {e:?}");
return;
}
let mut sync_stream = client.sync_stream().await;
loop { loop {
tokio::select! { tokio::select! {
// Message from tokio broadcast // Message from tokio broadcast
@@ -40,13 +49,25 @@ async fn sync_thread_task(id: MatrixSyncTaskID, client: MatrixClient, tx: Broadc
} }
Err(e) => { Err(e) => {
log::error!("Failed to receive a message from broadcast! {e}"); log::error!("Failed to receive a message from broadcast! {e}");
return; break;
} }
Ok(_) => {} Ok(_) => {}
} }
} }
evt = sync_stream.next() => {
let Some(evt)= evt else {
log::error!("No more Matrix event to process, stopping now...");
break;
};
println!("Sync thread {id:?} event: {:?}", evt);
}
} }
} }
log::info!("Sync thread {id:?} terminated!"); log::info!("Sync thread {id:?} terminated!");
if let Err(e) = tx.send(BroadcastMessage::SyncThreadStopped(id)) {
log::warn!("Failed to notify that synchronization thread has been interrupted! {e}")
}
} }