diff --git a/src/sync_client.rs b/src/sync_client.rs index dfe6c14..85c8cbe 100644 --- a/src/sync_client.rs +++ b/src/sync_client.rs @@ -1,7 +1,12 @@ use crate::broadcast_messages::BroadcastMessage; -use crate::user::UserID; +use crate::user::{UserConfig, UserID}; +use futures_util::TryStreamExt; +use ruma::api::client::sync::sync_events; +use ruma::assign; +use ruma::presence::PresenceState; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use tokio::sync::broadcast; /// ID of sync client @@ -9,7 +14,7 @@ use tokio::sync::broadcast; pub struct SyncClientID(uuid::Uuid); /// Sync client launcher loop -pub async fn sync_client_manager(tx: broadcast::Sender) { +pub async fn sync_client_manager(tx: broadcast::Sender) -> ! { let mut rx = tx.subscribe(); let tx = Arc::new(tx.clone()); @@ -58,6 +63,39 @@ async fn sync_task( ) { let mut rx = tx.subscribe(); + let Ok(user_config) = UserConfig::load(&user_id, false).await else { + log::error!("Failed to load user config in sync thread!"); + return; + }; + + let client = match user_config.matrix_client().await { + Err(e) => { + log::error!("Failed to load matrix client for user {user_id:?}: {e}"); + return; + } + Ok(client) => client, + }; + + let initial_sync_response = match client + .send_request(assign!(sync_events::v3::Request::new(), { + filter: None, + })) + .await + { + Ok(res) => res, + Err(e) => { + log::error!("Failed to perform initial sync request for user {user_id:?}! {e}"); + return; + } + }; + + let mut sync_stream = Box::pin(client.sync( + None, + initial_sync_response.next_batch, + PresenceState::Offline, + Some(Duration::from_secs(30)), + )); + loop { tokio::select! { // Message from tokio broadcast @@ -72,14 +110,28 @@ async fn sync_task( Err(e) => { log::error!("Failed to receive a message from broadcast! {e}"); + return; } Ok(_) => {} } } - // Message from Elements - // TODO + // Message from Matrix + msg_stream = sync_stream.try_next() => { + match msg_stream { + Ok(Some(msg)) => { + log::debug!("Received new message from Matrix: {msg:?}"); + } + Ok(None) => { + log::debug!("Received no message from Matrix"); + } + Err(e) => { + log::error!("Failed to receive a message from Matrix! {e}"); + return; + } + } + } } } }