1
0
mirror of https://gitlab.com/comunic/comunicapiv3 synced 2025-01-26 20:13:00 +00:00

Notifications are pushed asynchronously

This commit is contained in:
Pierre HUBERT 2022-03-12 09:10:22 +01:00
parent 2bb6a48545
commit d440ab5145
7 changed files with 43 additions and 41 deletions

View File

@ -32,7 +32,7 @@ mime_guess = "2.0.3"
pdf = "0.7.1"
regex = "1.4.3"
dashmap = "5.1.0"
reqwest = { version = "0.11.1", features = ["json", "blocking"] }
reqwest = { version = "0.11.1", features = ["json"] }
webrtc-sdp = "0.3.8"
bcrypt = "0.12.0"
mp3-metadata = "0.3.3"
@ -43,4 +43,4 @@ gouth = "0.2.0"
webauthn-rs = "0.3.2"
url = "2.2.2"
async-recursion = "1.0.0"
tokio = "1.17.0"
tokio = { version = "1.17.0", features = ["rt-multi-thread"] }

View File

@ -37,7 +37,7 @@ pub async fn configure(r: &mut HttpRequestHandler) -> RequestResult {
return r.bad_request("Independent service is unavailable!".to_string());
}
let token = independent_push_notifications_service_helper::create_token()?;
let token = independent_push_notifications_service_helper::create_token().await?;
PushNotificationToken::INDEPENDENT(token)
}
@ -47,7 +47,7 @@ pub async fn configure(r: &mut HttpRequestHandler) -> RequestResult {
}
};
account_helper::set_push_notification_token(r.user_access_token().unwrap(), status)?;
account_helper::set_push_notification_token(r.user_access_token().unwrap(), status).await?;
r.ok()
}

View File

@ -103,7 +103,7 @@ pub fn refresh_access_token(token: &UserAccessToken) -> Res {
pub async fn destroy_login_tokens(access_tokens: &UserAccessToken) -> Res {
// Un-register from independent push notifications service
// (continue to destroy token even in case of failure)
push_notifications_helper::un_register_from_previous_service(access_tokens)?;
push_notifications_helper::un_register_from_previous_service(access_tokens).await?;
DeleteQuery::new(USER_ACCESS_TOKENS_TABLE)
.cond_u64("id", access_tokens.id)
@ -330,9 +330,9 @@ pub fn set_notifications_settings(new_settings: NewNotificationsSettings) -> Res
}
/// Set new push notification token
pub fn set_push_notification_token(client: &UserAccessToken, new_token: PushNotificationToken) -> Res {
pub async fn set_push_notification_token(client: &UserAccessToken, new_token: PushNotificationToken) -> Res {
// In case of independent push service, remove previous client
push_notifications_helper::un_register_from_previous_service(client)?;
push_notifications_helper::un_register_from_previous_service(client).await?;
database::UpdateInfo::new(USER_ACCESS_TOKENS_TABLE)
.cond_u64("id", client.id)

View File

@ -94,6 +94,6 @@ pub async fn propagate_event(e: Event) -> Res {
user_ws_controller::handle_event(&e)?;
calls_controller::handle_event(&e).await?;
rtc_relay_controller::handle_event(&e)?;
push_notifications_helper::handle_event(&e)?;
push_notifications_helper::handle_event(&e).await?;
Ok(())
}

View File

@ -6,7 +6,7 @@
use std::collections::HashMap;
use reqwest::blocking::Client;
use reqwest::Client;
use reqwest::header::{HeaderMap, HeaderValue};
use serde::Serialize;
@ -74,7 +74,7 @@ fn get_credentials(client: &APIClient) -> Res<String> {
}
/// Send a single notification through Firebase service
fn send_notification(n: &PushNotification, client_token: &str, access: &FirebaseClientIdentifier) -> Res {
async fn send_notification(n: &PushNotification, client_token: &str, access: &FirebaseClientIdentifier) -> Res {
let notif = FirebaseNotificationRequest {
validate_only: false,
message: FirebaseMessage {
@ -100,13 +100,13 @@ fn send_notification(n: &PushNotification, client_token: &str, access: &Firebase
client
.post(&FIREBASE_PUSH_MESSAGE_URL.replace("{PROJECT_ID}", &access.project_name))
.json(&notif)
.send()?;
.send().await?;
Ok(())
}
/// Send a notification
pub fn push_notifications(n: &PushNotification, targets: Vec<UserAccessToken>) -> Res {
pub async fn push_notifications(n: &PushNotification, targets: Vec<UserAccessToken>) -> Res {
let mut tokens_cache: HashMap<u64, FirebaseClientIdentifier> = HashMap::new();
for target in targets {
@ -136,7 +136,7 @@ pub fn push_notifications(n: &PushNotification, targets: Vec<UserAccessToken>) -
};
if let Err(e) = send_notification(n, client_token, authorization_token) {
if let Err(e) = send_notification(n, client_token, authorization_token).await {
eprintln!("Failed to send a push notification to a device! Error: {}", e);
}
}

View File

@ -5,7 +5,7 @@
use actix_http::header::HeaderValue;
use actix_web::http::StatusCode;
use reqwest::blocking::Client;
use reqwest::Client;
use reqwest::header::HeaderMap;
use serde::{Deserialize, Serialize};
@ -54,23 +54,23 @@ fn create_client() -> Res<Client> {
}
/// Create a new client token
pub fn create_token() -> Res<String> {
pub async fn create_token() -> Res<String> {
let client = create_client()?
.get(&format!("{}{}", svc_conf()?.control_url, "create_client"))
.send()?;
.send().await?;
let response: CreateTokenResult = client.json()?;
let response: CreateTokenResult = client.json().await?;
Ok(response.token)
}
/// Destroy a client token
pub fn remove_token(t: &str) -> Res {
pub async fn remove_token(t: &str) -> Res {
let client = create_client()?
.get(&format!("{}{}", svc_conf()?.control_url, "remove_client"))
.json(&RemoveTokenRequest { client: t.to_string() })
.send()?;
.send().await?;
if client.status() == StatusCode::OK {
return Ok(());
@ -81,7 +81,7 @@ pub fn remove_token(t: &str) -> Res {
}
/// Push notifications
pub fn push_notifications(n: &PushNotification, targets: Vec<String>) -> Res {
pub async fn push_notifications(n: &PushNotification, targets: Vec<String>) -> Res {
let client = create_client()?
.get(&format!("{}{}", svc_conf()?.control_url, "push_notification"))
.json(&PushNotificationRequest {
@ -92,7 +92,7 @@ pub fn push_notifications(n: &PushNotification, targets: Vec<String>) -> Res {
image: n.image.clone(),
timeout: n.timeout.clone(),
})
.send()?;
.send().await?;
if client.status() == StatusCode::OK {
return Ok(());
@ -103,14 +103,14 @@ pub fn push_notifications(n: &PushNotification, targets: Vec<String>) -> Res {
}
/// Cancel notifications
pub fn cancel_notifications(id: String, targets: Vec<String>) -> Res {
pub async fn cancel_notifications(id: String, targets: Vec<String>) -> Res {
let client = create_client()?
.get(&format!("{}{}", svc_conf()?.control_url, "remove_notification"))
.json(&CancelNotificationRequest {
clients: targets,
id: id.clone(),
})
.send()?;
.send().await?;
if client.status() == StatusCode::OK {
return Ok(());

View File

@ -13,11 +13,11 @@ use crate::helpers::{account_helper, conversations_helper, firebase_notification
use crate::helpers::events_helper::Event;
/// Un-register for previous push notifications service
pub fn un_register_from_previous_service(client: &UserAccessToken) -> Res {
pub async fn un_register_from_previous_service(client: &UserAccessToken) -> Res {
// This method must not fail in case of error
if let PushNotificationToken::INDEPENDENT(old_token) = &client.push_notifications_token {
if let Err(e) = independent_push_notifications_service_helper::remove_token(old_token) {
if let Err(e) = independent_push_notifications_service_helper::remove_token(old_token).await {
eprintln!("Failed to un-register from independent push notifications service! {}", e);
}
}
@ -48,35 +48,35 @@ fn split_tokens(targets: Vec<UserAccessToken>) -> (Vec<String>, Vec<UserAccessTo
}
/// Push a notification to specific tokens
fn push_notification(n: &PushNotification, targets: Vec<UserAccessToken>) -> Res {
async fn push_notification(n: &PushNotification, targets: Vec<UserAccessToken>) -> Res {
let (independents, firebase) = split_tokens(targets);
// Push independent notifications
if !independents.is_empty() {
independent_push_notifications_service_helper::push_notifications(n, independents)?;
independent_push_notifications_service_helper::push_notifications(n, independents).await?;
}
// Push Firebase notifications
if !firebase.is_empty() {
firebase_notifications_helper::push_notifications(n, firebase)?;
firebase_notifications_helper::push_notifications(n, firebase).await?;
}
Ok(())
}
/// Cancel a notification for specific tokens (optional)
fn cancel_notification(id: String, targets: Vec<UserAccessToken>) -> Res {
async fn cancel_notification(id: String, targets: Vec<UserAccessToken>) -> Res {
let (independents, _) = split_tokens(targets);
if !independents.is_empty() {
independent_push_notifications_service_helper::cancel_notifications(id, independents)?;
independent_push_notifications_service_helper::cancel_notifications(id, independents).await?;
}
Ok(())
}
/// Push a notification to specific users, only if they are not connected
fn push_notification_to_users(n: &PushNotification, users: Vec<UserID>) -> Res {
async fn push_notification_to_users(n: &PushNotification, users: Vec<UserID>) -> Res {
let dest_users: Vec<UserID> = users.into_iter()
.filter(|u| !user_ws_controller::is_user_connected(u))
.collect();
@ -89,11 +89,11 @@ fn push_notification_to_users(n: &PushNotification, users: Vec<UserID>) -> Res {
}
}
push_notification(n, dest_tokens)
push_notification(n, dest_tokens).await
}
/// Cancel a notification for one or more users
fn cancel_notification_for_users(id: String, users: Vec<UserID>) -> Res {
async fn cancel_notification_for_users(id: String, users: Vec<UserID>) -> Res {
let mut dest_tokens = vec![];
for user in users {
@ -102,11 +102,11 @@ fn cancel_notification_for_users(id: String, users: Vec<UserID>) -> Res {
}
}
cancel_notification(id, dest_tokens)
cancel_notification(id, dest_tokens).await
}
/// Create a conversation notification
pub fn create_conversation_notification(msg: &ConversationMessage) -> Res {
pub async fn create_conversation_notification(msg: &ConversationMessage) -> Res {
let user_id = match &msg.user_id {
Some(id) => id,
None => {
@ -150,11 +150,11 @@ pub fn create_conversation_notification(msg: &ConversationMessage) -> Res {
}
}
push_notification_to_users(&notif, dest_list)
push_notification_to_users(&notif, dest_list).await
}
/// Dismiss a conversation notification
pub fn cancel_conversation_notification(conv_id: ConvID, users: Option<Vec<UserID>>) -> Res {
pub async fn cancel_conversation_notification(conv_id: ConvID, users: Option<Vec<UserID>>) -> Res {
let notif_id = format!("conv-{}", conv_id.id());
let list = match users {
@ -163,17 +163,18 @@ pub fn cancel_conversation_notification(conv_id: ConvID, users: Option<Vec<UserI
.into_iter().map(|m| m.user_id).collect()
};
cancel_notification_for_users(notif_id, list)
cancel_notification_for_users(notif_id, list).await
}
/// Handle event. This method NEVER returns Err
pub fn handle_event(e: &Event) -> Res {
pub async fn handle_event(e: &Event) -> Res {
match e {
Event::NewConversationMessage(msg) => {
let msg = (*msg).clone();
std::thread::spawn(move || {
if let Err(err) = create_conversation_notification(&msg) {
if let Err(err) = tokio::runtime::Runtime::new().unwrap().block_on(
create_conversation_notification(&msg)) {
eprintln!("Failed to create push for conversation! {}", err);
}
});
@ -183,7 +184,8 @@ pub fn handle_event(e: &Event) -> Res {
let user_id = user_id.as_owned();
let conv_id = *conv_id;
std::thread::spawn(move || {
if let Err(e) = cancel_conversation_notification(conv_id, Some(vec![user_id])) {
if let Err(e) = tokio::runtime::Runtime::new().unwrap().block_on(
cancel_conversation_notification(conv_id, Some(vec![user_id]))) {
eprintln!("Failed to cancel push conversation! {}", e);
}
});