Compare commits
3 Commits
cfdf98b47a
...
564e606ac7
| Author | SHA1 | Date | |
|---|---|---|---|
| 564e606ac7 | |||
| 7b691962a0 | |||
| 1e00d24a8b |
@@ -20,3 +20,40 @@ pub async fn start_sync(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Stop sync thread
|
||||||
|
pub async fn stop_sync(
|
||||||
|
client: MatrixClientExtractor,
|
||||||
|
manager: web::Data<ActorRef<MatrixManagerMsg>>,
|
||||||
|
) -> HttpResult {
|
||||||
|
match ractor::cast!(
|
||||||
|
manager,
|
||||||
|
MatrixManagerMsg::StopSyncThread(client.auth.user.email.clone())
|
||||||
|
) {
|
||||||
|
Ok(_) => Ok(HttpResponse::Accepted().finish()),
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Failed to stop sync thread: {e}");
|
||||||
|
Ok(HttpResponse::InternalServerError().finish())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Serialize)]
|
||||||
|
struct GetSyncStatusResponse {
|
||||||
|
started: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get sync thread status
|
||||||
|
pub async fn status(
|
||||||
|
client: MatrixClientExtractor,
|
||||||
|
manager: web::Data<ActorRef<MatrixManagerMsg>>,
|
||||||
|
) -> HttpResult {
|
||||||
|
let started = ractor::call!(
|
||||||
|
manager.as_ref(),
|
||||||
|
MatrixManagerMsg::SyncThreadGetStatus,
|
||||||
|
client.auth.user.email
|
||||||
|
)
|
||||||
|
.expect("RPC to Matrix Manager failed");
|
||||||
|
|
||||||
|
Ok(HttpResponse::Ok().json(GetSyncStatusResponse { started }))
|
||||||
|
}
|
||||||
|
|||||||
@@ -39,9 +39,13 @@ impl FromRequest for MatrixClientExtractor {
|
|||||||
matrix_manager_actor,
|
matrix_manager_actor,
|
||||||
MatrixManagerMsg::GetClient,
|
MatrixManagerMsg::GetClient,
|
||||||
auth.user.email.clone()
|
auth.user.email.clone()
|
||||||
)
|
);
|
||||||
.expect("Failed to query manager actor!")
|
|
||||||
.expect("Failed to get client!");
|
let client = match client {
|
||||||
|
Ok(Ok(client)) => client,
|
||||||
|
Ok(Err(err)) => panic!("Failed to get client! {err:?}"),
|
||||||
|
Err(err) => panic!("Failed to query manager actor! {err:#?}"),
|
||||||
|
};
|
||||||
|
|
||||||
Ok(Self { auth, client })
|
Ok(Self { auth, client })
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -125,6 +125,14 @@ async fn main() -> std::io::Result<()> {
|
|||||||
"/api/matrix_sync/start",
|
"/api/matrix_sync/start",
|
||||||
web::post().to(matrix_sync_thread_controller::start_sync),
|
web::post().to(matrix_sync_thread_controller::start_sync),
|
||||||
)
|
)
|
||||||
|
.route(
|
||||||
|
"/api/matrix_sync/stop",
|
||||||
|
web::post().to(matrix_sync_thread_controller::stop_sync),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/api/matrix_sync/status",
|
||||||
|
web::get().to(matrix_sync_thread_controller::status),
|
||||||
|
)
|
||||||
})
|
})
|
||||||
.workers(4)
|
.workers(4)
|
||||||
.bind(&AppConfig::get().listen_address)?
|
.bind(&AppConfig::get().listen_address)?
|
||||||
|
|||||||
@@ -15,6 +15,8 @@ pub enum MatrixManagerMsg {
|
|||||||
GetClient(UserEmail, RpcReplyPort<anyhow::Result<MatrixClient>>),
|
GetClient(UserEmail, RpcReplyPort<anyhow::Result<MatrixClient>>),
|
||||||
DisconnectClient(UserEmail),
|
DisconnectClient(UserEmail),
|
||||||
StartSyncThread(UserEmail),
|
StartSyncThread(UserEmail),
|
||||||
|
StopSyncThread(UserEmail),
|
||||||
|
SyncThreadGetStatus(UserEmail, RpcReplyPort<bool>),
|
||||||
SyncThreadTerminated(UserEmail, MatrixSyncTaskID),
|
SyncThreadTerminated(UserEmail, MatrixSyncTaskID),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -37,6 +39,15 @@ impl Actor for MatrixManagerActor {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn post_stop(
|
||||||
|
&self,
|
||||||
|
_myself: ActorRef<Self::Msg>,
|
||||||
|
_state: &mut Self::State,
|
||||||
|
) -> Result<(), ActorProcessingErr> {
|
||||||
|
log::error!("[!] [!] Matrix Manager Actor stopped!");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle(
|
async fn handle(
|
||||||
&self,
|
&self,
|
||||||
myself: ActorRef<Self::Msg>,
|
myself: ActorRef<Self::Msg>,
|
||||||
@@ -113,10 +124,32 @@ impl Actor for MatrixManagerActor {
|
|||||||
// Start thread
|
// Start thread
|
||||||
log::debug!("Starting sync thread for {email:?}");
|
log::debug!("Starting sync thread for {email:?}");
|
||||||
let thread_id =
|
let thread_id =
|
||||||
start_sync_thread(client.clone(), state.broadcast_sender.clone(), myself)
|
match start_sync_thread(client.clone(), state.broadcast_sender.clone(), myself)
|
||||||
.await?;
|
.await
|
||||||
|
{
|
||||||
|
Ok(thread_id) => thread_id,
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Failed to start sync thread! {e}");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
state.running_sync_threads.insert(email, thread_id);
|
state.running_sync_threads.insert(email, thread_id);
|
||||||
}
|
}
|
||||||
|
MatrixManagerMsg::StopSyncThread(email) => {
|
||||||
|
if let Some(thread_id) = state.running_sync_threads.get(&email)
|
||||||
|
&& let Err(e) = state
|
||||||
|
.broadcast_sender
|
||||||
|
.send(BroadcastMessage::StopSyncThread(thread_id.clone()))
|
||||||
|
{
|
||||||
|
log::error!("Failed to request sync thread stop: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MatrixManagerMsg::SyncThreadGetStatus(email, reply) => {
|
||||||
|
let started = state.running_sync_threads.contains_key(&email);
|
||||||
|
if let Err(e) = reply.send(started) {
|
||||||
|
log::error!("Failed to send sync thread status! {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
MatrixManagerMsg::SyncThreadTerminated(email, task_id) => {
|
MatrixManagerMsg::SyncThreadTerminated(email, task_id) => {
|
||||||
if state.running_sync_threads.get(&email) == Some(&task_id) {
|
if state.running_sync_threads.get(&email) == Some(&task_id) {
|
||||||
log::info!(
|
log::info!(
|
||||||
|
|||||||
Reference in New Issue
Block a user