All checks were successful
		
		
	
	continuous-integration/drone/push Build is passing
				
			
		
			
				
	
	
		
			146 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			146 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| use crate::broadcast_messages::{BroadcastMessage, SyncEvent};
 | |
| use crate::user::{UserConfig, UserID};
 | |
| use futures_util::TryStreamExt;
 | |
| use ruma::api::client::sync::sync_events;
 | |
| use ruma::assign;
 | |
| use ruma::presence::PresenceState;
 | |
| use std::collections::HashMap;
 | |
| use std::sync::Arc;
 | |
| use std::time::Duration;
 | |
| use tokio::sync::broadcast;
 | |
| 
 | |
| /// ID of sync client
 | |
| #[derive(Debug, Clone, Eq, PartialEq)]
 | |
| pub struct SyncClientID(uuid::Uuid);
 | |
| 
 | |
| /// Sync client launcher loop
 | |
| pub async fn sync_client_manager(tx: broadcast::Sender<BroadcastMessage>) -> ! {
 | |
|     let mut rx = tx.subscribe();
 | |
|     let tx = Arc::new(tx.clone());
 | |
| 
 | |
|     let mut running_tasks = HashMap::new();
 | |
| 
 | |
|     while let Ok(msg) = rx.recv().await {
 | |
|         match msg {
 | |
|             BroadcastMessage::StartSyncTaskForUser(user_id) => {
 | |
|                 if running_tasks.contains_key(&user_id) {
 | |
|                     log::info!("Won't start sync task for user {user_id:?} because a task is already running for this user!");
 | |
|                     continue;
 | |
|                 }
 | |
| 
 | |
|                 log::info!("Start sync task for user {user_id:?}");
 | |
|                 let task_id = SyncClientID(uuid::Uuid::new_v4());
 | |
|                 running_tasks.insert(user_id.clone(), task_id.clone());
 | |
| 
 | |
|                 let tx = tx.clone();
 | |
|                 tokio::task::spawn(async move {
 | |
|                     sync_task(task_id, user_id, tx).await;
 | |
|                 });
 | |
|             }
 | |
| 
 | |
|             BroadcastMessage::StopSyncTaskForUser(user_id) => {
 | |
|                 // Check if a task is running for this user
 | |
|                 if let Some(task_id) = running_tasks.remove(&user_id) {
 | |
|                     log::info!("Stop sync task for user {user_id:?}");
 | |
|                     tx.send(BroadcastMessage::StopSyncClient(task_id)).unwrap();
 | |
|                 } else {
 | |
|                     log::info!("Not stopping sync task for user {user_id:?}: not running");
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             _ => {}
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     panic!("Sync client manager stopped unexpectedly!");
 | |
| }
 | |
| 
 | |
| /// Sync task for a single user
 | |
| async fn sync_task(
 | |
|     id: SyncClientID,
 | |
|     user_id: UserID,
 | |
|     tx: Arc<broadcast::Sender<BroadcastMessage>>,
 | |
| ) {
 | |
|     let mut rx = tx.subscribe();
 | |
| 
 | |
|     let Ok(user_config) = UserConfig::load(&user_id, false).await else {
 | |
|         log::error!("Failed to load user config in sync thread!");
 | |
|         return;
 | |
|     };
 | |
| 
 | |
|     let client = match user_config.matrix_client().await {
 | |
|         Err(e) => {
 | |
|             log::error!("Failed to load matrix client for user {user_id:?}: {e}");
 | |
|             return;
 | |
|         }
 | |
|         Ok(client) => client,
 | |
|     };
 | |
| 
 | |
|     let initial_sync_response = match client
 | |
|         .send_request(assign!(sync_events::v3::Request::new(), {
 | |
|             filter: None,
 | |
|         }))
 | |
|         .await
 | |
|     {
 | |
|         Ok(res) => res,
 | |
|         Err(e) => {
 | |
|             log::error!("Failed to perform initial sync request for user {user_id:?}! {e}");
 | |
|             return;
 | |
|         }
 | |
|     };
 | |
| 
 | |
|     let mut sync_stream = Box::pin(client.sync(
 | |
|         None,
 | |
|         initial_sync_response.next_batch,
 | |
|         PresenceState::Offline,
 | |
|         Some(Duration::from_secs(30)),
 | |
|     ));
 | |
| 
 | |
|     loop {
 | |
|         tokio::select! {
 | |
|             // Message from tokio broadcast
 | |
|             msg = rx.recv() => {
 | |
|                match msg {
 | |
|                     Ok(BroadcastMessage::StopSyncClient(client_id)) => {
 | |
|                         if client_id == id {
 | |
|                             log::info!("A request was received to stop this client! {id:?} for user {user_id:?}");
 | |
|                             break;
 | |
|                         }
 | |
|                     }
 | |
| 
 | |
|                     Err(e) => {
 | |
|                         log::error!("Failed to receive a message from broadcast! {e}");
 | |
|                         return;
 | |
|                     }
 | |
| 
 | |
|                     Ok(_) => {}
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             // Message from Matrix
 | |
|             msg_stream = sync_stream.try_next() => {
 | |
|                 match msg_stream {
 | |
|                     Ok(Some(msg)) => {
 | |
|                         log::debug!("Received new message from Matrix: {msg:#?}");
 | |
|                         if let Err(e) = tx.send(BroadcastMessage::SyncEvent(user_id.clone(), Box::new(SyncEvent {
 | |
|                             rooms: msg.rooms,presence: msg.presence,
 | |
|                             account_data: msg.account_data,
 | |
|                             to_device: msg.to_device,
 | |
|                             device_lists: msg.device_lists,
 | |
|                         }))) {
 | |
|                             log::error!("Failed to propagate event! {e}");
 | |
|                         }
 | |
|                     }
 | |
|                     Ok(None) => {
 | |
|                         log::debug!("Received no message from Matrix");
 | |
|                     }
 | |
|                     Err(e) => {
 | |
|                         log::error!("Failed to receive a message from Matrix! {e}");
 | |
|                         return;
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| }
 |