Start to synchronize with Matrix
This commit is contained in:
parent
4ee26df97b
commit
b91b1ba096
@ -1,7 +1,12 @@
|
|||||||
use crate::broadcast_messages::BroadcastMessage;
|
use crate::broadcast_messages::BroadcastMessage;
|
||||||
use crate::user::UserID;
|
use crate::user::{UserConfig, UserID};
|
||||||
|
use futures_util::TryStreamExt;
|
||||||
|
use ruma::api::client::sync::sync_events;
|
||||||
|
use ruma::assign;
|
||||||
|
use ruma::presence::PresenceState;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
/// ID of sync client
|
/// ID of sync client
|
||||||
@ -9,7 +14,7 @@ use tokio::sync::broadcast;
|
|||||||
pub struct SyncClientID(uuid::Uuid);
|
pub struct SyncClientID(uuid::Uuid);
|
||||||
|
|
||||||
/// Sync client launcher loop
|
/// Sync client launcher loop
|
||||||
pub async fn sync_client_manager(tx: broadcast::Sender<BroadcastMessage>) {
|
pub async fn sync_client_manager(tx: broadcast::Sender<BroadcastMessage>) -> ! {
|
||||||
let mut rx = tx.subscribe();
|
let mut rx = tx.subscribe();
|
||||||
let tx = Arc::new(tx.clone());
|
let tx = Arc::new(tx.clone());
|
||||||
|
|
||||||
@ -58,6 +63,39 @@ async fn sync_task(
|
|||||||
) {
|
) {
|
||||||
let mut rx = tx.subscribe();
|
let mut rx = tx.subscribe();
|
||||||
|
|
||||||
|
let Ok(user_config) = UserConfig::load(&user_id, false).await else {
|
||||||
|
log::error!("Failed to load user config in sync thread!");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let client = match user_config.matrix_client().await {
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Failed to load matrix client for user {user_id:?}: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Ok(client) => client,
|
||||||
|
};
|
||||||
|
|
||||||
|
let initial_sync_response = match client
|
||||||
|
.send_request(assign!(sync_events::v3::Request::new(), {
|
||||||
|
filter: None,
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(res) => res,
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Failed to perform initial sync request for user {user_id:?}! {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut sync_stream = Box::pin(client.sync(
|
||||||
|
None,
|
||||||
|
initial_sync_response.next_batch,
|
||||||
|
PresenceState::Offline,
|
||||||
|
Some(Duration::from_secs(30)),
|
||||||
|
));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
// Message from tokio broadcast
|
// Message from tokio broadcast
|
||||||
@ -72,14 +110,28 @@ async fn sync_task(
|
|||||||
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::error!("Failed to receive a message from broadcast! {e}");
|
log::error!("Failed to receive a message from broadcast! {e}");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Message from Elements
|
// Message from Matrix
|
||||||
// TODO
|
msg_stream = sync_stream.try_next() => {
|
||||||
|
match msg_stream {
|
||||||
|
Ok(Some(msg)) => {
|
||||||
|
log::debug!("Received new message from Matrix: {msg:?}");
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
log::debug!("Received no message from Matrix");
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Failed to receive a message from Matrix! {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user