Start sync client manager implementation
This commit is contained in:
parent
17e086b43c
commit
3822c209d3
@ -9,9 +9,9 @@ pub enum BroadcastMessage {
|
|||||||
/// Close all the sessions of a given user
|
/// Close all the sessions of a given user
|
||||||
CloseAllUserSessions(UserID),
|
CloseAllUserSessions(UserID),
|
||||||
/// Stop sync client for a given user
|
/// Stop sync client for a given user
|
||||||
StopSyncClientForUser(UserID),
|
StopSyncTaskForUser(UserID),
|
||||||
/// Start sync client for a given user (if not already running)
|
/// Start sync client for a given user (if not already running)
|
||||||
StartSyncClientForUser(UserID),
|
StartSyncTaskForUser(UserID),
|
||||||
/// Stop a client with a given client ID
|
/// Stop a client with a given client ID
|
||||||
StopSyncClient(SyncClientID),
|
StopSyncClient(SyncClientID),
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
pub mod app_config;
|
pub mod app_config;
|
||||||
|
pub mod broadcast_messages;
|
||||||
pub mod constants;
|
pub mod constants;
|
||||||
pub mod extractors;
|
pub mod extractors;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
|
pub mod sync_client;
|
||||||
pub mod user;
|
pub mod user;
|
||||||
pub mod utils;
|
pub mod utils;
|
||||||
pub mod sync_client;
|
|
||||||
pub mod broadcast_messages;
|
|
@ -6,6 +6,7 @@ use actix_web::{web, App, HttpServer};
|
|||||||
use matrix_gateway::app_config::AppConfig;
|
use matrix_gateway::app_config::AppConfig;
|
||||||
use matrix_gateway::broadcast_messages::BroadcastMessage;
|
use matrix_gateway::broadcast_messages::BroadcastMessage;
|
||||||
use matrix_gateway::server::{api, web_ui};
|
use matrix_gateway::server::{api, web_ui};
|
||||||
|
use matrix_gateway::sync_client;
|
||||||
use matrix_gateway::user::UserConfig;
|
use matrix_gateway::user::UserConfig;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@ -24,7 +25,8 @@ async fn main() -> std::io::Result<()> {
|
|||||||
|
|
||||||
let (ws_tx, _) = tokio::sync::broadcast::channel::<BroadcastMessage>(16);
|
let (ws_tx, _) = tokio::sync::broadcast::channel::<BroadcastMessage>(16);
|
||||||
|
|
||||||
// TODO : spawn a tokio task to launch sync client
|
// Launch sync manager
|
||||||
|
tokio::spawn(sync_client::sync_client_manager(ws_tx.clone()));
|
||||||
|
|
||||||
log::info!(
|
log::info!(
|
||||||
"Starting to listen on {} for {}",
|
"Starting to listen on {} for {}",
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use crate::broadcast_messages::BroadcastMessage;
|
||||||
use crate::constants::{WS_CLIENT_TIMEOUT, WS_HEARTBEAT_INTERVAL};
|
use crate::constants::{WS_CLIENT_TIMEOUT, WS_HEARTBEAT_INTERVAL};
|
||||||
use crate::extractors::client_auth::APIClientAuth;
|
use crate::extractors::client_auth::APIClientAuth;
|
||||||
use crate::server::HttpResult;
|
use crate::server::HttpResult;
|
||||||
@ -10,9 +11,6 @@ use tokio::select;
|
|||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use tokio::sync::broadcast::Receiver;
|
use tokio::sync::broadcast::Receiver;
|
||||||
use tokio::time::interval;
|
use tokio::time::interval;
|
||||||
use crate::broadcast_messages::BroadcastMessage;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/// Main WS route
|
/// Main WS route
|
||||||
pub async fn ws(
|
pub async fn ws(
|
||||||
@ -25,6 +23,13 @@ pub async fn ws(
|
|||||||
|
|
||||||
let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
|
let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
|
||||||
|
|
||||||
|
// Ask for sync client to be started
|
||||||
|
if let Err(e) = tx.send(BroadcastMessage::StartSyncTaskForUser(
|
||||||
|
auth.user.user_id.clone(),
|
||||||
|
)) {
|
||||||
|
log::error!("Failed to send StartSyncTaskForUser: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
let rx = tx.subscribe();
|
let rx = tx.subscribe();
|
||||||
|
|
||||||
// spawn websocket handler (and don't await it) so that the response is returned immediately
|
// spawn websocket handler (and don't await it) so that the response is returned immediately
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
use crate::app_config::AppConfig;
|
use crate::app_config::AppConfig;
|
||||||
|
use crate::broadcast_messages::BroadcastMessage;
|
||||||
use crate::constants::{STATE_KEY, USER_SESSION_KEY};
|
use crate::constants::{STATE_KEY, USER_SESSION_KEY};
|
||||||
use crate::server::{HttpFailure, HttpResult};
|
use crate::server::{HttpFailure, HttpResult};
|
||||||
use crate::user::{APIClient, APIClientID, User, UserConfig, UserID};
|
use crate::user::{APIClient, APIClientID, User, UserConfig, UserID};
|
||||||
@ -10,7 +11,6 @@ use ipnet::IpNet;
|
|||||||
use light_openid::primitives::OpenIDConfig;
|
use light_openid::primitives::OpenIDConfig;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use crate::broadcast_messages::BroadcastMessage;
|
|
||||||
|
|
||||||
/// Static assets
|
/// Static assets
|
||||||
#[derive(rust_embed::Embed)]
|
#[derive(rust_embed::Embed)]
|
||||||
@ -103,7 +103,10 @@ pub async fn home(
|
|||||||
config.save().await?;
|
config.save().await?;
|
||||||
success_message = Some("Matrix token was successfully updated!".to_string());
|
success_message = Some("Matrix token was successfully updated!".to_string());
|
||||||
|
|
||||||
// TODO : stop user sync thread
|
// Close sync task
|
||||||
|
if let Err(e) = tx.send(BroadcastMessage::StopSyncTaskForUser(user.id.clone())) {
|
||||||
|
log::error!("Failed to send StopSyncClientForUser: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
// Invalidate all Ws connections
|
// Invalidate all Ws connections
|
||||||
if let Err(e) = tx.send(BroadcastMessage::CloseAllUserSessions(user.id.clone())) {
|
if let Err(e) = tx.send(BroadcastMessage::CloseAllUserSessions(user.id.clone())) {
|
||||||
|
@ -1,3 +1,28 @@
|
|||||||
|
use crate::broadcast_messages::BroadcastMessage;
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
/// ID of sync client
|
/// ID of sync client
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct SyncClientID(uuid::Uuid);
|
pub struct SyncClientID(uuid::Uuid);
|
||||||
|
|
||||||
|
/// Sync client launcher loop
|
||||||
|
pub async fn sync_client_manager(tx: broadcast::Sender<BroadcastMessage>) {
|
||||||
|
let mut rx = tx.subscribe();
|
||||||
|
|
||||||
|
while let Ok(msg) = rx.recv().await {
|
||||||
|
match msg {
|
||||||
|
BroadcastMessage::StopSyncTaskForUser(user_id) => {
|
||||||
|
log::info!("Stop sync task for user {:?}", user_id);
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
BroadcastMessage::StartSyncTaskForUser(user_id) => {
|
||||||
|
log::info!("Start sync task for user {:?}", user_id);
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
panic!("Sync client manager stopped unexpectedly!");
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user