From d440ab514532f56ca54b116ee2ef9d6f8a9f0c52 Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Sat, 12 Mar 2022 09:10:22 +0100 Subject: [PATCH] Notifications are pushed asynchronously --- Cargo.toml | 4 +- .../push_notifications_controller.rs | 4 +- src/helpers/account_helper.rs | 6 +-- src/helpers/events_helper.rs | 2 +- src/helpers/firebase_notifications_helper.rs | 10 ++--- ...ndent_push_notifications_service_helper.rs | 20 +++++----- src/helpers/push_notifications_helper.rs | 38 ++++++++++--------- 7 files changed, 43 insertions(+), 41 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1554f02..8716967 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ mime_guess = "2.0.3" pdf = "0.7.1" regex = "1.4.3" dashmap = "5.1.0" -reqwest = { version = "0.11.1", features = ["json", "blocking"] } +reqwest = { version = "0.11.1", features = ["json"] } webrtc-sdp = "0.3.8" bcrypt = "0.12.0" mp3-metadata = "0.3.3" @@ -43,4 +43,4 @@ gouth = "0.2.0" webauthn-rs = "0.3.2" url = "2.2.2" async-recursion = "1.0.0" -tokio = "1.17.0" \ No newline at end of file +tokio = { version = "1.17.0", features = ["rt-multi-thread"] } \ No newline at end of file diff --git a/src/controllers/push_notifications_controller.rs b/src/controllers/push_notifications_controller.rs index 46e3b15..74f7b77 100644 --- a/src/controllers/push_notifications_controller.rs +++ b/src/controllers/push_notifications_controller.rs @@ -37,7 +37,7 @@ pub async fn configure(r: &mut HttpRequestHandler) -> RequestResult { return r.bad_request("Independent service is unavailable!".to_string()); } - let token = independent_push_notifications_service_helper::create_token()?; + let token = independent_push_notifications_service_helper::create_token().await?; PushNotificationToken::INDEPENDENT(token) } @@ -47,7 +47,7 @@ pub async fn configure(r: &mut HttpRequestHandler) -> RequestResult { } }; - account_helper::set_push_notification_token(r.user_access_token().unwrap(), status)?; + account_helper::set_push_notification_token(r.user_access_token().unwrap(), status).await?; r.ok() } \ No newline at end of file diff --git a/src/helpers/account_helper.rs b/src/helpers/account_helper.rs index 615e519..dd37d94 100644 --- a/src/helpers/account_helper.rs +++ b/src/helpers/account_helper.rs @@ -103,7 +103,7 @@ pub fn refresh_access_token(token: &UserAccessToken) -> Res { pub async fn destroy_login_tokens(access_tokens: &UserAccessToken) -> Res { // Un-register from independent push notifications service // (continue to destroy token even in case of failure) - push_notifications_helper::un_register_from_previous_service(access_tokens)?; + push_notifications_helper::un_register_from_previous_service(access_tokens).await?; DeleteQuery::new(USER_ACCESS_TOKENS_TABLE) .cond_u64("id", access_tokens.id) @@ -330,9 +330,9 @@ pub fn set_notifications_settings(new_settings: NewNotificationsSettings) -> Res } /// Set new push notification token -pub fn set_push_notification_token(client: &UserAccessToken, new_token: PushNotificationToken) -> Res { +pub async fn set_push_notification_token(client: &UserAccessToken, new_token: PushNotificationToken) -> Res { // In case of independent push service, remove previous client - push_notifications_helper::un_register_from_previous_service(client)?; + push_notifications_helper::un_register_from_previous_service(client).await?; database::UpdateInfo::new(USER_ACCESS_TOKENS_TABLE) .cond_u64("id", client.id) diff --git a/src/helpers/events_helper.rs b/src/helpers/events_helper.rs index 393217f..ff67365 100644 --- a/src/helpers/events_helper.rs +++ b/src/helpers/events_helper.rs @@ -94,6 +94,6 @@ pub async fn propagate_event(e: Event) -> Res { user_ws_controller::handle_event(&e)?; calls_controller::handle_event(&e).await?; rtc_relay_controller::handle_event(&e)?; - push_notifications_helper::handle_event(&e)?; + push_notifications_helper::handle_event(&e).await?; Ok(()) } \ No newline at end of file diff --git a/src/helpers/firebase_notifications_helper.rs b/src/helpers/firebase_notifications_helper.rs index c21da91..273fa3e 100644 --- a/src/helpers/firebase_notifications_helper.rs +++ b/src/helpers/firebase_notifications_helper.rs @@ -6,7 +6,7 @@ use std::collections::HashMap; -use reqwest::blocking::Client; +use reqwest::Client; use reqwest::header::{HeaderMap, HeaderValue}; use serde::Serialize; @@ -74,7 +74,7 @@ fn get_credentials(client: &APIClient) -> Res { } /// Send a single notification through Firebase service -fn send_notification(n: &PushNotification, client_token: &str, access: &FirebaseClientIdentifier) -> Res { +async fn send_notification(n: &PushNotification, client_token: &str, access: &FirebaseClientIdentifier) -> Res { let notif = FirebaseNotificationRequest { validate_only: false, message: FirebaseMessage { @@ -100,13 +100,13 @@ fn send_notification(n: &PushNotification, client_token: &str, access: &Firebase client .post(&FIREBASE_PUSH_MESSAGE_URL.replace("{PROJECT_ID}", &access.project_name)) .json(¬if) - .send()?; + .send().await?; Ok(()) } /// Send a notification -pub fn push_notifications(n: &PushNotification, targets: Vec) -> Res { +pub async fn push_notifications(n: &PushNotification, targets: Vec) -> Res { let mut tokens_cache: HashMap = HashMap::new(); for target in targets { @@ -136,7 +136,7 @@ pub fn push_notifications(n: &PushNotification, targets: Vec) - }; - if let Err(e) = send_notification(n, client_token, authorization_token) { + if let Err(e) = send_notification(n, client_token, authorization_token).await { eprintln!("Failed to send a push notification to a device! Error: {}", e); } } diff --git a/src/helpers/independent_push_notifications_service_helper.rs b/src/helpers/independent_push_notifications_service_helper.rs index e6df8db..08f1b69 100644 --- a/src/helpers/independent_push_notifications_service_helper.rs +++ b/src/helpers/independent_push_notifications_service_helper.rs @@ -5,7 +5,7 @@ use actix_http::header::HeaderValue; use actix_web::http::StatusCode; -use reqwest::blocking::Client; +use reqwest::Client; use reqwest::header::HeaderMap; use serde::{Deserialize, Serialize}; @@ -54,23 +54,23 @@ fn create_client() -> Res { } /// Create a new client token -pub fn create_token() -> Res { +pub async fn create_token() -> Res { let client = create_client()? .get(&format!("{}{}", svc_conf()?.control_url, "create_client")) - .send()?; + .send().await?; - let response: CreateTokenResult = client.json()?; + let response: CreateTokenResult = client.json().await?; Ok(response.token) } /// Destroy a client token -pub fn remove_token(t: &str) -> Res { +pub async fn remove_token(t: &str) -> Res { let client = create_client()? .get(&format!("{}{}", svc_conf()?.control_url, "remove_client")) .json(&RemoveTokenRequest { client: t.to_string() }) - .send()?; + .send().await?; if client.status() == StatusCode::OK { return Ok(()); @@ -81,7 +81,7 @@ pub fn remove_token(t: &str) -> Res { } /// Push notifications -pub fn push_notifications(n: &PushNotification, targets: Vec) -> Res { +pub async fn push_notifications(n: &PushNotification, targets: Vec) -> Res { let client = create_client()? .get(&format!("{}{}", svc_conf()?.control_url, "push_notification")) .json(&PushNotificationRequest { @@ -92,7 +92,7 @@ pub fn push_notifications(n: &PushNotification, targets: Vec) -> Res { image: n.image.clone(), timeout: n.timeout.clone(), }) - .send()?; + .send().await?; if client.status() == StatusCode::OK { return Ok(()); @@ -103,14 +103,14 @@ pub fn push_notifications(n: &PushNotification, targets: Vec) -> Res { } /// Cancel notifications -pub fn cancel_notifications(id: String, targets: Vec) -> Res { +pub async fn cancel_notifications(id: String, targets: Vec) -> Res { let client = create_client()? .get(&format!("{}{}", svc_conf()?.control_url, "remove_notification")) .json(&CancelNotificationRequest { clients: targets, id: id.clone(), }) - .send()?; + .send().await?; if client.status() == StatusCode::OK { return Ok(()); diff --git a/src/helpers/push_notifications_helper.rs b/src/helpers/push_notifications_helper.rs index 733e547..001af0c 100644 --- a/src/helpers/push_notifications_helper.rs +++ b/src/helpers/push_notifications_helper.rs @@ -13,11 +13,11 @@ use crate::helpers::{account_helper, conversations_helper, firebase_notification use crate::helpers::events_helper::Event; /// Un-register for previous push notifications service -pub fn un_register_from_previous_service(client: &UserAccessToken) -> Res { +pub async fn un_register_from_previous_service(client: &UserAccessToken) -> Res { // This method must not fail in case of error if let PushNotificationToken::INDEPENDENT(old_token) = &client.push_notifications_token { - if let Err(e) = independent_push_notifications_service_helper::remove_token(old_token) { + if let Err(e) = independent_push_notifications_service_helper::remove_token(old_token).await { eprintln!("Failed to un-register from independent push notifications service! {}", e); } } @@ -48,35 +48,35 @@ fn split_tokens(targets: Vec) -> (Vec, Vec) -> Res { +async fn push_notification(n: &PushNotification, targets: Vec) -> Res { let (independents, firebase) = split_tokens(targets); // Push independent notifications if !independents.is_empty() { - independent_push_notifications_service_helper::push_notifications(n, independents)?; + independent_push_notifications_service_helper::push_notifications(n, independents).await?; } // Push Firebase notifications if !firebase.is_empty() { - firebase_notifications_helper::push_notifications(n, firebase)?; + firebase_notifications_helper::push_notifications(n, firebase).await?; } Ok(()) } /// Cancel a notification for specific tokens (optional) -fn cancel_notification(id: String, targets: Vec) -> Res { +async fn cancel_notification(id: String, targets: Vec) -> Res { let (independents, _) = split_tokens(targets); if !independents.is_empty() { - independent_push_notifications_service_helper::cancel_notifications(id, independents)?; + independent_push_notifications_service_helper::cancel_notifications(id, independents).await?; } Ok(()) } /// Push a notification to specific users, only if they are not connected -fn push_notification_to_users(n: &PushNotification, users: Vec) -> Res { +async fn push_notification_to_users(n: &PushNotification, users: Vec) -> Res { let dest_users: Vec = users.into_iter() .filter(|u| !user_ws_controller::is_user_connected(u)) .collect(); @@ -89,11 +89,11 @@ fn push_notification_to_users(n: &PushNotification, users: Vec) -> Res { } } - push_notification(n, dest_tokens) + push_notification(n, dest_tokens).await } /// Cancel a notification for one or more users -fn cancel_notification_for_users(id: String, users: Vec) -> Res { +async fn cancel_notification_for_users(id: String, users: Vec) -> Res { let mut dest_tokens = vec![]; for user in users { @@ -102,11 +102,11 @@ fn cancel_notification_for_users(id: String, users: Vec) -> Res { } } - cancel_notification(id, dest_tokens) + cancel_notification(id, dest_tokens).await } /// Create a conversation notification -pub fn create_conversation_notification(msg: &ConversationMessage) -> Res { +pub async fn create_conversation_notification(msg: &ConversationMessage) -> Res { let user_id = match &msg.user_id { Some(id) => id, None => { @@ -150,11 +150,11 @@ pub fn create_conversation_notification(msg: &ConversationMessage) -> Res { } } - push_notification_to_users(¬if, dest_list) + push_notification_to_users(¬if, dest_list).await } /// Dismiss a conversation notification -pub fn cancel_conversation_notification(conv_id: ConvID, users: Option>) -> Res { +pub async fn cancel_conversation_notification(conv_id: ConvID, users: Option>) -> Res { let notif_id = format!("conv-{}", conv_id.id()); let list = match users { @@ -163,17 +163,18 @@ pub fn cancel_conversation_notification(conv_id: ConvID, users: Option Res { +pub async fn handle_event(e: &Event) -> Res { match e { Event::NewConversationMessage(msg) => { let msg = (*msg).clone(); std::thread::spawn(move || { - if let Err(err) = create_conversation_notification(&msg) { + if let Err(err) = tokio::runtime::Runtime::new().unwrap().block_on( + create_conversation_notification(&msg)) { eprintln!("Failed to create push for conversation! {}", err); } }); @@ -183,7 +184,8 @@ pub fn handle_event(e: &Event) -> Res { let user_id = user_id.as_owned(); let conv_id = *conv_id; std::thread::spawn(move || { - if let Err(e) = cancel_conversation_notification(conv_id, Some(vec![user_id])) { + if let Err(e) = tokio::runtime::Runtime::new().unwrap().block_on( + cancel_conversation_notification(conv_id, Some(vec![user_id]))) { eprintln!("Failed to cancel push conversation! {}", e); } });