From 09ce13c55423355a26c93d78fbf94ab4e83db3a8 Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Sat, 12 Mar 2022 07:47:22 +0100 Subject: [PATCH] Turned a lot of function to async mode --- Cargo.lock | 12 +++ Cargo.toml | 3 +- src/cleanup_thread.rs | 17 ++-- src/controllers/account_controller.rs | 6 +- src/controllers/calls_controller.rs | 94 +++++++++----------- src/controllers/comments_controller.rs | 19 ++-- src/controllers/conversations_controller.rs | 39 ++++----- src/controllers/friends_controller.rs | 8 +- src/controllers/groups_controller.rs | 44 +++++----- src/controllers/likes_controller.rs | 13 +-- src/controllers/notifications_controller.rs | 4 +- src/controllers/posts_controller.rs | 8 +- src/controllers/rtc_relay_controller.rs | 37 +++++--- src/controllers/user_ws_controller.rs | 31 ++++--- src/data/comment.rs | 1 + src/helpers/account_helper.rs | 34 ++++---- src/helpers/comments_helper.rs | 25 +++--- src/helpers/conversations_helper.rs | 97 +++++++++++---------- src/helpers/events_helper.rs | 59 +++++++------ src/helpers/groups_helper.rs | 46 +++++----- src/helpers/notifications_helper.rs | 88 +++++++++---------- src/helpers/posts_helper.rs | 18 ++-- src/routes.rs | 2 +- src/user_ws_routes.rs | 20 ++--- 24 files changed, 368 insertions(+), 357 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0d148ba..b14d04f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -370,6 +370,17 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "619743e34b5ba4e9703bba34deac3427c72507c7159f5fd030aea8cac0cfe341" +[[package]] +name = "async-recursion" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "attohttpc" version = "0.15.0" @@ -700,6 +711,7 @@ dependencies = [ "actix-multipart", "actix-web", "actix-web-actors", + "async-recursion", "bcrypt", "bytes", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 3b020d0..f8994e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,4 +41,5 @@ zip = "0.5.10" webpage = "1.2.0" gouth = "0.2.0" webauthn-rs = "0.3.2" -url = "2.2.2" \ No newline at end of file +url = "2.2.2" +async-recursion = "1.0.0" \ No newline at end of file diff --git a/src/cleanup_thread.rs b/src/cleanup_thread.rs index b76d746..0093ef2 100644 --- a/src/cleanup_thread.rs +++ b/src/cleanup_thread.rs @@ -16,7 +16,8 @@ pub fn start() -> Res { /// Clean up thread handler fn clean_up_thread_handler() { - // Let the server start before doing cleanup + // TODO : uncomment + /*// Let the server start before doing cleanup std::thread::sleep(INITIAL_REFRESH_LOAD_INTERVAL); loop { @@ -32,14 +33,14 @@ fn clean_up_thread_handler() { } std::thread::sleep(CLEAN_UP_INTERVAL); - } + }*/ } /// Do the cleanup -fn do_clean() -> Res { +async fn do_clean() -> Res { // Clean old login tokens - account_helper::clean_up_old_access_tokens()?; + account_helper::clean_up_old_access_tokens().await?; // Automatic account cleanup for user in user_helper::get_all_users()? { @@ -51,16 +52,16 @@ fn do_clean() -> Res { notifications_helper::clean_old_user_notifications(&user)?; // Clean old comments - comments_helper::clean_old_comments(&user)?; + comments_helper::clean_old_comments(&user).await?; // Clean old posts - posts_helper::clean_old_posts(&user)?; + posts_helper::clean_old_posts(&user).await?; // Clean old conversation messages - conversations_helper::clean_old_messages(&user)?; + conversations_helper::clean_old_messages(&user).await?; // Remove the account, if it have been inactive for a long time - account_helper::remove_if_inactive_for_too_long_time(&user)?; + account_helper::remove_if_inactive_for_too_long_time(&user).await?; } // Clean up old admin actions diff --git a/src/controllers/account_controller.rs b/src/controllers/account_controller.rs index f77a486..64c2cf6 100644 --- a/src/controllers/account_controller.rs +++ b/src/controllers/account_controller.rs @@ -94,7 +94,7 @@ pub async fn login_user(request: &mut HttpRequestHandler) -> RequestResult { /// Sign out user pub async fn logout_user(request: &mut HttpRequestHandler) -> RequestResult { if let Some(token) = request.user_access_token() { - account_helper::destroy_login_tokens(token)?; + account_helper::destroy_login_tokens(token).await?; } request.success("User disconnected.") @@ -102,7 +102,7 @@ pub async fn logout_user(request: &mut HttpRequestHandler) -> RequestResult { /// Disconnect a user from all his devices pub async fn disconnect_all_devices(r: &mut HttpRequestHandler) -> RequestResult { - account_helper::destroy_all_user_tokens(r.user_id_ref()?)?; + account_helper::destroy_all_user_tokens(r.user_id_ref()?).await?; r.success("Successfully disconnected!") } @@ -208,7 +208,7 @@ pub async fn delete_account(r: &mut HttpRequestHandler) -> RequestResult { r.forbidden("You shall not delete MY account (whoever you are, please note that hacking is bad !!!)".to_string())?; } - account_helper::delete(r.user_id_ref()?)?; + account_helper::delete(r.user_id_ref()?).await?; r.success("Account deleted.") } \ No newline at end of file diff --git a/src/controllers/calls_controller.rs b/src/controllers/calls_controller.rs index d502b0a..7bbbb8c 100644 --- a/src/controllers/calls_controller.rs +++ b/src/controllers/calls_controller.rs @@ -88,7 +88,7 @@ impl UserWsRequestHandler { } /// Get calls configuration -pub fn get_config(r: &mut UserWsRequestHandler) -> RequestResult { +pub async fn get_config(r: &mut UserWsRequestHandler) -> RequestResult { // Check whether the user is the member of a call or not if let None = r.get_conn().active_call { r.forbidden("You do not belong to any call yet!".to_string())?; @@ -124,7 +124,7 @@ pub fn is_conversation_having_call(conv_id: &ConvID) -> bool { } /// Join a call -pub fn join_call(r: &mut UserWsRequestHandler) -> RequestResult { +pub async fn join_call(r: &mut UserWsRequestHandler) -> RequestResult { let conv_id = r.post_conv("convID")?.conv_id; // Check if the conversation can have a call @@ -135,23 +135,21 @@ pub fn join_call(r: &mut UserWsRequestHandler) -> RequestResult { // Remove any other active call with current WebSocket if let Some(call) = &r.get_conn().active_call { - make_user_leave_call(&call.conv_id, r.get_conn())?; + make_user_leave_call(&call.conv_id, r.get_conn()).await?; } // Remove any other active connection to current call of current user - user_ws_controller::foreach_connection(|conn| { + for conn in user_ws_controller::get_all_connections()? { if conn.user_id() != r.user_id_ref()? || conn.session.eq(&r.get_conn().session) { return Ok(()); } if let Some(call) = &conn.active_call { if call.conv_id == conv_id { - make_user_leave_call(&call.conv_id, conn)?; + make_user_leave_call(&call.conv_id, &conn).await?; } } - - Ok(()) - })?; + }; r.update_conn(|r| r.active_call = Some(ActiveCall { conv_id, @@ -159,13 +157,13 @@ pub fn join_call(r: &mut UserWsRequestHandler) -> RequestResult { }))?; // Propagate event - events_helper::propagate_event(&Event::UserJoinedCall(&conv_id, r.user_id_ref()?))?; + events_helper::propagate_event(Event::UserJoinedCall(conv_id, r.user_id()?)).await?; Ok(()) } /// Leave a call -pub fn leave_call(r: &mut UserWsRequestHandler) -> RequestResult { +pub async fn leave_call(r: &mut UserWsRequestHandler) -> RequestResult { // Warning ! For some technical reasons, we do not check if the user // really belongs to the conversation, so be careful when manipulating // conversation ID here @@ -177,13 +175,13 @@ pub fn leave_call(r: &mut UserWsRequestHandler) -> RequestResult { } // Make the user leave the call - make_user_leave_call(&conv_id, r.get_conn())?; + make_user_leave_call(&conv_id, r.get_conn()).await?; r.success("Left call!") } /// Get the list of members of a call -pub fn get_members_list(r: &mut UserWsRequestHandler) -> RequestResult { +pub async fn get_members_list(r: &mut UserWsRequestHandler) -> RequestResult { let conv_id = r.post_call_id("callID")?; let mut list = vec![]; @@ -204,7 +202,7 @@ pub fn gen_call_hash(call_id: &ConvID, peer_id: &UserID) -> String { } /// Handles client signal -pub fn on_client_signal(r: &mut UserWsRequestHandler) -> RequestResult { +pub async fn on_client_signal(r: &mut UserWsRequestHandler) -> RequestResult { let call_id = r.post_call_id("callID")?; let peer_id = r.post_call_peer_id(&call_id, "peerID")?; let sig_type = r.post_string("type")?; @@ -271,18 +269,18 @@ pub fn on_client_signal(r: &mut UserWsRequestHandler) -> RequestResult { } }; - events_helper::propagate_event(&Event::NewUserCallSignal(&NewUserCallSignal { + events_helper::propagate_event(Event::NewUserCallSignal(NewUserCallSignal { call_hash: gen_call_hash(&call_id, &peer_id), user_id: if r.user_id_ref()? == &peer_id { None } else { Some(r.user_id()?) }, signal, raw_data: r.post_string("data")?, - }))?; + })).await?; r.success("Signal sent") } /// Mark user ready for streaming -pub fn mark_user_ready(r: &mut UserWsRequestHandler) -> Res { +pub async fn mark_user_ready(r: &mut UserWsRequestHandler) -> Res { let call_id = r.post_call_id("callID")?; let user_id = r.user_id()?; @@ -291,14 +289,13 @@ pub fn mark_user_ready(r: &mut UserWsRequestHandler) -> Res { user_ws_controller::send_message_to_specific_connections( |c| c.user_id() != &user_id && c.is_having_call_with_conversation(&call_id), |_| UserWsMessage::no_id_message("call_peer_ready", CallPeerReadyAPI::new(&call_id, r.user_id_ref()?)), - None:: _>, )?; r.success("Information propagated.") } /// Request an offer from the server -pub fn request_offer(r: &mut UserWsRequestHandler) -> Res { +pub async fn request_offer(r: &mut UserWsRequestHandler) -> Res { let call_id = r.post_call_id("callID")?; // The ID of the user we stream the audio / video from @@ -308,16 +305,16 @@ pub fn request_offer(r: &mut UserWsRequestHandler) -> Res { r.forbidden("You can not request an offer for yourself!".to_string())?; } - events_helper::propagate_event(&Event::UserRequestedCallOffer(&UserCallOfferRequest { + events_helper::propagate_event(Event::UserRequestedCallOffer(UserCallOfferRequest { call_hash: gen_call_hash(&call_id, &peer_id), user_id: r.user_id()?, - }))?; + })).await?; r.success("Request sent") } /// Notify the user stopped to stream -pub fn stop_streaming(r: &mut UserWsRequestHandler) -> Res { +pub async fn stop_streaming(r: &mut UserWsRequestHandler) -> Res { let call_id = r.post_call_id("callID")?; let user_id = r.user_id()?; @@ -329,22 +326,21 @@ pub fn stop_streaming(r: &mut UserWsRequestHandler) -> Res { user_ws_controller::send_message_to_specific_connections( |c| c.is_having_call_with_conversation(&call_id) && c.user_id() != &user_id, |_| UserWsMessage::no_id_message("call_peer_interrupted_streaming", CallPeerInterruptedStreamingAPI::new(&call_id, &user_id)), - None:: _>, )?; } // Notify proxy - events_helper::propagate_event(&Event::CloseCallStream(&CloseCallStream { + events_helper::propagate_event(Event::CloseCallStream(CloseCallStream { call_hash: gen_call_hash(&call_id, &user_id), peer_id: None, - }))?; + })).await?; r.success("ok") } /// Make the user leave the call -pub fn make_user_leave_call(conv_id: &ConvID, connection: &UserWsConnection) -> Res { +pub async fn make_user_leave_call(conv_id: &ConvID, connection: &UserWsConnection) -> Res { connection.clone().replace(|c| c.active_call = None); // Notify user (if possible) @@ -353,39 +349,37 @@ pub fn make_user_leave_call(conv_id: &ConvID, connection: &UserWsConnection) -> } // Close main stream (sender) - events_helper::propagate_event(&Event::CloseCallStream(&CloseCallStream { + events_helper::propagate_event(Event::CloseCallStream(CloseCallStream { call_hash: gen_call_hash(&conv_id, connection.user_id()), peer_id: None, - }))?; + })).await?; // Close receiver streams (other users streams) - user_ws_controller::foreach_connection( - |peer_conn| { - if peer_conn.is_having_call_with_conversation(conv_id) && peer_conn.user_id() != connection.user_id() { - events_helper::propagate_event(&Event::CloseCallStream(&CloseCallStream { - call_hash: gen_call_hash(&conv_id, peer_conn.user_id()), - peer_id: Some(connection.user_id().clone()), - }))?; - } - - Ok(()) - }, - )?; + for peer_conn in user_ws_controller::get_all_connections()? { + if peer_conn.is_having_call_with_conversation(conv_id) && peer_conn.user_id() != connection.user_id() { + events_helper::propagate_event(Event::CloseCallStream(CloseCallStream { + call_hash: gen_call_hash(&conv_id, peer_conn.user_id()), + peer_id: Some(connection.user_id().clone()), + })).await?; + } + }; // Create a notification - events_helper::propagate_event(&Event::UserLeftCall(conv_id, connection.user_id()))?; + events_helper::propagate_event(Event::UserLeftCall( + conv_id.clone(), + connection.user_id().clone(), + )).await?; Ok(()) } /// Events handler -pub fn handle_event(e: &events_helper::Event) -> Res { +pub async fn handle_event(e: &events_helper::Event) -> Res { match e { Event::UserJoinedCall(conv_id, user_id) => { user_ws_controller::send_message_to_specific_connections( |c| c.is_having_call_with_conversation(conv_id) && c.user_id() != user_id, |_| UserWsMessage::no_id_message("user_joined_call", JoinedCallMessage::new(conv_id, user_id)), - None:: _>, )?; } @@ -393,13 +387,12 @@ pub fn handle_event(e: &events_helper::Event) -> Res { user_ws_controller::send_message_to_specific_connections( |c| c.is_having_call_with_conversation(conv_id), |_| UserWsMessage::no_id_message("user_left_call", LeftCallMessage::new(conv_id, user_id)), - None:: _>, )?; } Event::UserWsClosed(c) => { if let Some(call) = c.active_call.clone() { - make_user_leave_call(&call.conv_id, c)?; + make_user_leave_call(&call.conv_id, c).await?; } } @@ -423,20 +416,17 @@ pub fn handle_event(e: &events_helper::Event) -> Res { user_ws_controller::send_message_to_specific_connections( |c| c.user_id() == target_user && c.is_having_call_with_conversation(&call_id), |_| UserWsMessage::no_id_message("new_call_signal", NewCallSignalAPI::new(&call_id, &peer_id, &msg.data)?), - None:: _>, )?; } // Handle proxy disconnect => close all active calls Event::ClosedRTCRelayWebSocket => { - user_ws_controller::foreach_connection(|f| { + for f in user_ws_controller::get_all_connections()? { // Close all active connections if let Some(call) = &f.active_call { - make_user_leave_call(&call.conv_id, f)?; + make_user_leave_call(&call.conv_id, &f).await?; } - - Ok(()) - })?; + }; } // Call active call of user (if any) @@ -450,7 +440,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res { })?; if let Some(c) = conn { - make_user_leave_call(conv_id, &c)?; + make_user_leave_call(conv_id, &c).await?; } } @@ -465,7 +455,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res { })?; for con in connections { - make_user_leave_call(conv_id, &con)?; + make_user_leave_call(conv_id, &con).await?; } } diff --git a/src/controllers/comments_controller.rs b/src/controllers/comments_controller.rs index 36616f2..e626dc7 100644 --- a/src/controllers/comments_controller.rs +++ b/src/controllers/comments_controller.rs @@ -49,13 +49,13 @@ pub async fn create(r: &mut HttpRequestHandler) -> RequestResult { image_path: image, }; - let comment_id = comments_helper::create(&comment)?; + let comment_id = comments_helper::create(&comment).await?; // Create notifications - notifications_helper::create_post_notification(&r.user_id()?, post.id, NotifEventType::COMMENT_CREATED)?; + notifications_helper::create_post_notification(&r.user_id()?, post.id, NotifEventType::COMMENT_CREATED).await?; // Remove notifications targeting current user about the post - notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id)?; + notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id).await?; r.set_response(ResCreateComment::new(comment_id)) } @@ -72,7 +72,7 @@ pub async fn edit(r: &mut HttpRequestHandler) -> RequestResult { let comment = r.post_comment_with_full_access("commentID")?; let new_content = r.post_content("content", 2, true)?; - comments_helper::edit(comment.id, &new_content)?; + comments_helper::edit(comment.id, &new_content).await?; r.success("Content updated.") } @@ -81,13 +81,13 @@ pub async fn edit(r: &mut HttpRequestHandler) -> RequestResult { pub async fn delete(r: &mut HttpRequestHandler) -> RequestResult { let comment = r.post_comment_with_full_access("commentID")?; - comments_helper::delete(&comment)?; + comments_helper::delete(&comment).await?; r.success("Comment deleted.") } /// Events handler -pub fn handle_event(e: &events_helper::Event) -> Res { +pub async fn handle_event(e: &events_helper::Event) -> Res { match e { Event::NewComment(comment) => { user_ws_controller::send_message_to_specific_connections( @@ -95,8 +95,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res { |c| UserWsMessage::no_id_message( "new_comment", CommentAPI::new(comment, &Some(c.user_id().clone()))?, - ), - None:: _>, + ) )?; } @@ -106,8 +105,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res { |c| UserWsMessage::no_id_message( "comment_updated", CommentAPI::new(comment, &Some(c.user_id().clone()))?, - ), - None:: _>, + ) )?; } @@ -118,7 +116,6 @@ pub fn handle_event(e: &events_helper::Event) -> Res { "comment_deleted", comment.id.clone(), ), - None:: _>, )?; } diff --git a/src/controllers/conversations_controller.rs b/src/controllers/conversations_controller.rs index e3fe6f2..eed7ecf 100644 --- a/src/controllers/conversations_controller.rs +++ b/src/controllers/conversations_controller.rs @@ -21,7 +21,6 @@ use crate::data::error::Res; use crate::data::http_request_handler::HttpRequestHandler; use crate::data::new_conversation::NewConversation; use crate::data::new_conversation_message::NewConversationMessage; -use crate::data::user_ws_connection::UserWsConnection; use crate::data::user_ws_message::UserWsMessage; use crate::data::user_ws_request_handler::UserWsRequestHandler; use crate::helpers::{conversations_helper, events_helper}; @@ -50,7 +49,7 @@ pub async fn create(r: &mut HttpRequestHandler) -> RequestResult { }; // Create the conversation - let conv_id = conversations_helper::create(&conv)?; + let conv_id = conversations_helper::create(&conv).await?; r.set_response(ResCreateConversation::new(conv_id)) } @@ -147,7 +146,7 @@ pub async fn add_member(r: &mut HttpRequestHandler) -> RequestResult { r.bad_request("This user is already a member of this conversation!".to_string())?; } - conversations_helper::add_member(conv.id, &user_to_add, true, false, Some(r.user_id_ref()?))?; + conversations_helper::add_member(conv.id, &user_to_add, true, false, Some(r.user_id_ref()?)).await?; r.success("The user was added to the conversation!") } @@ -195,7 +194,7 @@ pub async fn remove_member(r: &mut HttpRequestHandler) -> RequestResult { r.bad_request("This user is not a member of this conversation!".to_string())?; } - conversations_helper::remove_member(&user_to_remove, conv.id, Some(r.user_id_ref()?))?; + conversations_helper::remove_member(&user_to_remove, conv.id, Some(r.user_id_ref()?)).await?; r.ok() } @@ -228,7 +227,7 @@ pub async fn find_private(r: &mut HttpRequestHandler) -> RequestResult { group_id: None, group_min_membership_level: None, }; - let conv_id = conversations_helper::create(&new_conv)?; + let conv_id = conversations_helper::create(&new_conv).await?; list.push(conv_id); } @@ -255,7 +254,7 @@ pub async fn refresh_single(r: &mut HttpRequestHandler) -> RequestResult { conv.conv_id, r.user_id_ref()?, &messages.last().unwrap(), - )?; + ).await?; } r.set_response(ConversationMessageAPI::for_list(&messages)) @@ -395,7 +394,7 @@ pub async fn send_message(r: &mut HttpRequestHandler) -> RequestResult { message, file, server_message: None, - })?; + }).await?; r.success("Conversation message was successfully sent!") } @@ -423,7 +422,7 @@ pub async fn delete_conversation(r: &mut HttpRequestHandler) -> RequestResult { r.bad_request("This conversation is managed, it can not be deleted by this way!".to_string())?; } - conversations_helper::remove_user_from_conversation(&r.user_id()?, &conv, r.user_id_ref()?)?; + conversations_helper::remove_user_from_conversation(&r.user_id()?, &conv, r.user_id_ref()?).await?; r.success("The conversation has been deleted") } @@ -447,7 +446,7 @@ pub async fn update_message(r: &mut HttpRequestHandler) -> RequestResult { r.bad_request("New message is too long!".to_string())?; } - conversations_helper::update_message_content(msg_id, &new_content)?; + conversations_helper::update_message_content(msg_id, &new_content).await?; r.success("Conversation message content successfully updated") } @@ -460,25 +459,25 @@ pub async fn delete_message(r: &mut HttpRequestHandler) -> RequestResult { r.forbidden("You are not the owner of this message!".to_string())?; } - conversations_helper::delete_message_by_id(msg_id)?; + conversations_helper::delete_message_by_id(msg_id).await?; r.success("The message has been successfully deleted!") } /// A user is writing a message in a conversation -pub fn member_is_writing(r: &mut UserWsRequestHandler) -> RequestResult { +pub async fn member_is_writing(r: &mut UserWsRequestHandler) -> RequestResult { let conv_id = r.post_registered_conv_id("convID")?; // Propagate event events_helper::propagate_event( - &Event::UserIsWritingMessageInConversation(r.user_id_ref()?, conv_id) - )?; + Event::UserIsWritingMessageInConversation(r.user_id()?, conv_id) + ).await?; r.ok() } /// Events handler -pub fn handle_event(e: &events_helper::Event) -> Res { +pub async fn handle_event(e: &events_helper::Event) -> Res { match e { Event::UpdatedNumberUnreadConversations(users) => { for user in users.iter() { @@ -498,23 +497,22 @@ pub fn handle_event(e: &events_helper::Event) -> Res { user_ws_controller::send_message_to_specific_connections( |s| s.conversations.contains(conv_id) && s.user_id() != user_id, |_| UserWsMessage::no_id_message("writing_message_in_conv", UserIsWritingMessageInConversation::new(user_id, *conv_id)), - None:: _>, )?; } Event::NewConversationMessage(msg) => { - user_ws_controller::send_message_to_specific_connections( + for conn in user_ws_controller::send_message_to_specific_connections( |f| f.conversations.contains(&msg.conv_id), |_| UserWsMessage::no_id_message("new_conv_message", ConversationMessageAPI::new(msg)), - Some(|conn: &UserWsConnection| conversations_helper::mark_user_seen(msg.conv_id, conn.user_id(), msg)), - )?; + )? { + conversations_helper::mark_user_seen(msg.conv_id, conn.user_id(), msg).await?; + } } Event::UpdatedConversationMessage(msg) => { user_ws_controller::send_message_to_specific_connections( |f| f.conversations.contains(&msg.conv_id), |_| UserWsMessage::no_id_message("updated_conv_message", ConversationMessageAPI::new(msg)), - None:: _>, )?; } @@ -522,7 +520,6 @@ pub fn handle_event(e: &events_helper::Event) -> Res { user_ws_controller::send_message_to_specific_connections( |f| f.conversations.contains(&msg.conv_id), |_| UserWsMessage::no_id_message("deleted_conv_message", ConversationMessageAPI::new(msg)), - None:: _>, )?; } @@ -531,7 +528,6 @@ pub fn handle_event(e: &events_helper::Event) -> Res { user_ws_controller::send_message_to_specific_connections( |f| f.conversations.contains(conv_id), |_| UserWsMessage::no_id_message("removed_user_from_conv", RemovedUserFromConversationMessage::new(user_id, *conv_id)), - None:: _>, )?; // Disconnect user from conversation @@ -551,7 +547,6 @@ pub fn handle_event(e: &events_helper::Event) -> Res { user_ws_controller::send_message_to_specific_connections( |f| f.conversations.contains(conv_id), |_| UserWsMessage::no_id_message("deleted_conversation", conv_id.id()), - None:: _>, )?; // Disconnect user from conversation diff --git a/src/controllers/friends_controller.rs b/src/controllers/friends_controller.rs index 2512460..016e1a9 100644 --- a/src/controllers/friends_controller.rs +++ b/src/controllers/friends_controller.rs @@ -85,7 +85,7 @@ pub async fn send_request(r: &mut HttpRequestHandler) -> RequestResult { notifications_helper::create_friends_notification( r.user_id_ref()?, &friend_id, - NotifEventType::SENT_FRIEND_REQUEST)?; + NotifEventType::SENT_FRIEND_REQUEST).await?; r.success("The friendship request was successfully sent!") } @@ -101,7 +101,7 @@ pub async fn cancel_request(r: &mut HttpRequestHandler) -> RequestResult { friends_helper::remove_request(&r.user_id()?, &friend_id)?; // Delete related notifications - notifications_helper::delete_all_related_with_friendship_request(r.user_id_ref()?, &friend_id)?; + notifications_helper::delete_all_related_with_friendship_request(r.user_id_ref()?, &friend_id).await?; r.success("Friendship request removed!") } @@ -125,7 +125,7 @@ pub async fn respond_request(r: &mut HttpRequestHandler) -> RequestResult { true => NotifEventType::ACCEPTED_FRIEND_REQUEST, false => NotifEventType::REJECTED_FRIEND_REQUEST }, - )?; + ).await?; r.set_response("Response to the friendship request successfully saved!") } @@ -137,7 +137,7 @@ pub async fn remove_friend(r: &mut HttpRequestHandler) -> RequestResult { friends_helper::remove_friendship(r.user_id_ref()?, &friend_id)?; // Delete any related notification - notifications_helper::delete_all_related_with_friendship_request(r.user_id_ref()?, &friend_id)?; + notifications_helper::delete_all_related_with_friendship_request(r.user_id_ref()?, &friend_id).await?; r.success("The friend was removed from the list!") } diff --git a/src/controllers/groups_controller.rs b/src/controllers/groups_controller.rs index 05e0970..f82ad11 100644 --- a/src/controllers/groups_controller.rs +++ b/src/controllers/groups_controller.rs @@ -57,7 +57,7 @@ pub async fn create(r: &mut HttpRequestHandler) -> RequestResult { owner_id: r.user_id()?, }; - let group_id = groups_helper::create(&new_group)?; + let group_id = groups_helper::create(&new_group).await?; r.set_response(GroupCreationResult::new(&group_id)) } @@ -187,7 +187,7 @@ pub async fn create_conversation(r: &mut HttpRequestHandler) -> RequestResult { let min_membership_level = r.post_group_membership_level_for_conversation("min_membership_level")?; let name = r.post_string("name")?; - let conv_id = conversations_helper::create_conversation_for_group(group, min_membership_level, &name)?; + let conv_id = conversations_helper::create_conversation_for_group(group, min_membership_level, &name).await?; r.set_response(ResCreateConversationForGroup::new(conv_id)) } @@ -197,7 +197,7 @@ pub async fn set_conversation_visibility(r: &mut HttpRequestHandler) -> RequestR let conv = r.post_group_conv_admin("conv_id")?; let min_level = r.post_group_membership_level_for_conversation("min_membership_level")?; - conversations_helper::set_min_group_conversation_membership_level(conv.id, min_level)?; + conversations_helper::set_min_group_conversation_membership_level(conv.id, min_level).await?; r.ok() } @@ -206,7 +206,7 @@ pub async fn set_conversation_visibility(r: &mut HttpRequestHandler) -> RequestR pub async fn delete_conversation(r: &mut HttpRequestHandler) -> RequestResult { let conv = r.post_group_conv_admin("conv_id")?; - conversations_helper::delete_conversation(&conv)?; + conversations_helper::delete_conversation(&conv).await?; r.ok() } @@ -236,10 +236,10 @@ pub async fn cancel_invitation(r: &mut HttpRequestHandler) -> RequestResult { r.forbidden("This user has not been invited to join this group!".to_string())?; } - groups_helper::delete_member(&group_id, &user_id)?; + groups_helper::delete_member(&group_id, &user_id).await?; // Delete related notifications - notifications_helper::delete_all_related_to_group_membership_notifications(&user_id, &group_id)?; + notifications_helper::delete_all_related_to_group_membership_notifications(&user_id, &group_id).await?; r.success("Membership invitation has been cancelled!") } @@ -253,11 +253,11 @@ pub async fn invite_user(r: &mut HttpRequestHandler) -> RequestResult { r.bad_request("The user is not a visitor of the group!".to_string())?; } - groups_helper::send_invitation(&group_id, &user_id)?; + groups_helper::send_invitation(&group_id, &user_id).await?; // Send a notification notifications_helper::create_group_membership_notification( - &user_id, Some(r.user_id_ref()?), &group_id, NotifEventType::SENT_GROUP_MEMBERSHIP_INVITATION)?; + &user_id, Some(r.user_id_ref()?), &group_id, NotifEventType::SENT_GROUP_MEMBERSHIP_INVITATION).await?; r.success("The user has been successfully invited to join the group!") } @@ -271,7 +271,7 @@ pub async fn respond_invitation(r: &mut HttpRequestHandler) -> RequestResult { r.not_found("Invitation not found!".to_string())? } - groups_helper::respond_invitation(&group_id, &r.user_id()?, accept)?; + groups_helper::respond_invitation(&group_id, &r.user_id()?, accept).await?; if accept { groups_helper::set_following(&group_id, &r.user_id()?, true)?; @@ -281,7 +281,7 @@ pub async fn respond_invitation(r: &mut HttpRequestHandler) -> RequestResult { notifications_helper::create_group_membership_notification(r.user_id_ref()?, None, &group_id, match accept { true => NotifEventType::ACCEPTED_GROUP_MEMBERSHIP_INVITATION, false => NotifEventType::REJECTED_GROUP_MEMBERSHIP_INVITATION - })?; + }).await?; r.success("Response to the invitation was successfully saved!") } @@ -312,12 +312,12 @@ pub async fn send_request(r: &mut HttpRequestHandler) -> RequestResult { time_create: time(), level, following: true, - })?; + }).await?; // Send a notification, if required if matches!(group.registration_level, GroupRegistrationLevel::MODERATED_REGISTRATION) { notifications_helper::create_group_membership_notification(r.user_id_ref()?, None, - &group_id, NotifEventType::SENT_GROUP_MEMBERSHIP_REQUEST)?; + &group_id, NotifEventType::SENT_GROUP_MEMBERSHIP_REQUEST).await?; } @@ -332,10 +332,10 @@ pub async fn cancel_request(r: &mut HttpRequestHandler) -> RequestResult { r.forbidden("You did not send a membership request to this group!".to_string())?; } - groups_helper::delete_member(&group_id, &r.user_id()?)?; + groups_helper::delete_member(&group_id, &r.user_id()?).await?; // Delete any related notification - notifications_helper::delete_all_related_to_group_membership_notifications(r.user_id_ref()?, &group_id)?; + notifications_helper::delete_all_related_to_group_membership_notifications(r.user_id_ref()?, &group_id).await?; r.success("The request has been successfully cancelled!") } @@ -360,10 +360,10 @@ pub async fn delete_member(r: &mut HttpRequestHandler) -> RequestResult { r.forbidden("Only administrators can delete this membership!".to_string())?; } - groups_helper::delete_member(&group_id, &user_id)?; + groups_helper::delete_member(&group_id, &user_id).await?; // Delete related notifications - notifications_helper::delete_all_related_to_group_membership_notifications(&user_id, &group_id)?; + notifications_helper::delete_all_related_to_group_membership_notifications(&user_id, &group_id).await?; r.success("Membership of the user has been successfully deleted!") } @@ -389,7 +389,7 @@ pub async fn update_membership(r: &mut HttpRequestHandler) -> RequestResult { r.forbidden("You can not assign this visibility level!".to_string())?; } - groups_helper::update_membership_level(&group_id, &user_id, new_level)?; + groups_helper::update_membership_level(&group_id, &user_id, new_level).await?; r.success("User membership has been successfully updated!") } @@ -404,13 +404,13 @@ pub async fn respond_request(r: &mut HttpRequestHandler) -> RequestResult { r.forbidden("This user has not requested a membership for this group!".to_string())?; } - groups_helper::respond_request(&group_id, &user_id, accept)?; + groups_helper::respond_request(&group_id, &user_id, accept).await?; // Create a notification notifications_helper::create_group_membership_notification(&user_id, Some(r.user_id_ref()?), &group_id, match accept { true => NotifEventType::ACCEPTED_GROUP_MEMBERSHIP_REQUEST, false => NotifEventType::REJECTED_GROUP_MEMBERSHIP_REQUEST - })?; + }).await?; r.success("The response to the request has been successfully saved!") } @@ -433,10 +433,10 @@ pub async fn remove_membership(r: &mut HttpRequestHandler) -> RequestResult { r.forbidden("You are the last administrator of this group!".to_string())?; } - groups_helper::delete_member(&group_id, &r.user_id()?)?; + groups_helper::delete_member(&group_id, &r.user_id()?).await?; // Delete group membership notifications - notifications_helper::delete_all_related_to_group_membership_notifications(r.user_id_ref()?, &group_id)?; + notifications_helper::delete_all_related_to_group_membership_notifications(r.user_id_ref()?, &group_id).await?; r.success("Your membership has been successfully deleted!") } @@ -456,7 +456,7 @@ pub async fn delete_group(r: &mut HttpRequestHandler) -> RequestResult { let group_id = r.post_group_id_with_access("groupID", GroupAccessLevel::ADMIN_ACCESS)?; r.need_user_password("password")?; - groups_helper::delete(&group_id)?; + groups_helper::delete(&group_id).await?; r.success("Group deleted.") } \ No newline at end of file diff --git a/src/controllers/likes_controller.rs b/src/controllers/likes_controller.rs index 504ef78..9b1cbe5 100644 --- a/src/controllers/likes_controller.rs +++ b/src/controllers/likes_controller.rs @@ -2,23 +2,18 @@ //! //! @author Pierre Hubert -use crate::routes::RequestResult; use crate::data::base_request_handler::BaseRequestHandler; use crate::data::error::ExecError; use crate::data::group::GroupAccessLevel; use crate::data::post::PostAccessLevel; use crate::helpers::{likes_helper, notifications_helper, user_helper}; use crate::helpers::likes_helper::LikeType; +use crate::routes::RequestResult; struct LikeTarget(u64, LikeType); -/// Update like status (async version) -pub async fn update_async(r: &mut H) -> RequestResult { - update(r) -} - /// Update like status -pub fn update(r: &mut H) -> RequestResult { +pub async fn update(r: &mut H) -> RequestResult { let req_type = r.post_string("type")?; let is_liking = r.post_bool("like")?; @@ -41,7 +36,7 @@ pub fn update(r: &mut H) -> RequestResult { let post = r.post_post_with_access("id", PostAccessLevel::BASIC_ACCESS)?; // Delete any notification targeting this user about the post - notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id)?; + notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id).await?; LikeTarget(post.id, LikeType::POST) } @@ -51,7 +46,7 @@ pub fn update(r: &mut H) -> RequestResult { let comment = r.post_comment_with_access("id")?; // Delete any notification targeting this user about the post - notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, comment.post_id)?; + notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, comment.post_id).await?; LikeTarget(comment.id, LikeType::COMMENT) } diff --git a/src/controllers/notifications_controller.rs b/src/controllers/notifications_controller.rs index 9d483a1..5fb4c9a 100644 --- a/src/controllers/notifications_controller.rs +++ b/src/controllers/notifications_controller.rs @@ -73,14 +73,14 @@ pub async fn mark_seen(r: &mut HttpRequestHandler) -> RequestResult { notif.from_user_id = None; } - notifications_helper::delete(¬if)?; + notifications_helper::delete(¬if).await?; r.success("Notification deleted") } /// Delete all the notifications of the current user pub async fn delete_all(r: &mut HttpRequestHandler) -> RequestResult { - notifications_helper::delete_all_user(r.user_id_ref()?)?; + notifications_helper::delete_all_user(r.user_id_ref()?).await?; r.success("Notifications deleted.") } diff --git a/src/controllers/posts_controller.rs b/src/controllers/posts_controller.rs index c8bc16d..c2a5104 100644 --- a/src/controllers/posts_controller.rs +++ b/src/controllers/posts_controller.rs @@ -235,7 +235,7 @@ pub async fn create_post(r: &mut HttpRequestHandler) -> RequestResult { } // Create a notification - notifications_helper::create_post_notification(r.user_id_ref()?, post_id, NotifEventType::ELEM_CREATED)?; + notifications_helper::create_post_notification(r.user_id_ref()?, post_id, NotifEventType::ELEM_CREATED).await?; r.set_response(ResCreatePost::new(post_id)) } @@ -249,7 +249,7 @@ pub async fn set_visibility_level(r: &mut HttpRequestHandler) -> RequestResult { // Depending on new level, delete (or not) notifications about the post if matches!(new_visibility, PostVisibilityLevel::VISIBILITY_USER) { - notifications_helper::delete_all_related_with_post(post.id)?; + notifications_helper::delete_all_related_with_post(post.id).await?; } r.success("Visibility level updated") @@ -263,7 +263,7 @@ pub async fn update_content(r: &mut HttpRequestHandler) -> RequestResult { posts_helper::set_content(post.id, &new_content)?; // Delete the notifications targeting the current user about this post - notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id)?; + notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id).await?; r.success("Content updated") } @@ -272,7 +272,7 @@ pub async fn update_content(r: &mut HttpRequestHandler) -> RequestResult { pub async fn delete(r: &mut HttpRequestHandler) -> RequestResult { let post = r.post_post_with_access("postID", PostAccessLevel::FULL_ACCESS)?; - posts_helper::delete(&post)?; + posts_helper::delete(&post).await?; r.success("Post deleted.") } diff --git a/src/controllers/rtc_relay_controller.rs b/src/controllers/rtc_relay_controller.rs index ebb92ca..50e824a 100644 --- a/src/controllers/rtc_relay_controller.rs +++ b/src/controllers/rtc_relay_controller.rs @@ -99,27 +99,36 @@ impl actix::Actor for RtcRelayActor { }).unwrap()) } - fn stopped(&mut self, _ctx: &mut Self::Context) { + fn stopped(&mut self, ctx: &mut Self::Context) { println!("Closed connection to RTC relay."); + let future = async move { + // Propagate information + if let Err(e) = events_helper::propagate_event(Event::ClosedRTCRelayWebSocket).await { + eprintln!("Failed to propagate rtc closed event! {:#?}", e); + } - // Propagate information - if let Err(e) = events_helper::propagate_event(&Event::ClosedRTCRelayWebSocket) { - eprintln!("Failed to propagate rtc closed event! {:#?}", e); - } + eprintln!("Successfully propagated RTC relay stopped envent!"); + }; + + future.into_actor(self).wait(ctx); } } impl RtcRelayActor { - fn handle_message(&self, txt: &str) { + fn handle_message(&self, txt: &str, ctx: &mut ::Context) { match serde_json::from_str::(&txt) { Err(e) => { eprintln!("Failed to parse a message from RTC proxy! {:#?}", e); } Ok(msg) => { - if let Err(e) = process_message_from_relay(&msg) { - eprintln!("Failed to process signal from RTC Relay! {:#?}", e); - } + let future = async move { + if let Err(e) = process_message_from_relay(&msg).await { + eprintln!("Failed to process signal from RTC Relay! {:#?}", e); + } + }; + + future.into_actor(self).spawn(ctx); } } } @@ -142,7 +151,7 @@ impl StreamHandler { - self.handle_message(&txt); + self.handle_message(&txt, ctx); } Message::Binary(_) => { eprintln!("RTC WS Message::Binary"); @@ -164,7 +173,7 @@ impl StreamHandler { self.recv_buff.extend_from_slice(c.as_ref()); - self.handle_message(&String::from_utf8_lossy(&self.recv_buff)) + self.handle_message(&String::from_utf8_lossy(&self.recv_buff), ctx) } } } @@ -182,15 +191,15 @@ impl StreamHandler Res { +async fn process_message_from_relay(msg: &RTCSocketMessage) -> Res { match msg.title.as_str() { "signal" => { - events_helper::propagate_event(&Event::NewRTCRelayMessage(&NewRtcRelayMessage { + events_helper::propagate_event(Event::NewRTCRelayMessage(NewRtcRelayMessage { call_hash: msg.callHash.clone().unwrap_or(String::new()), peer_id: msg.peerId.clone().unwrap_or("0".to_string()), data: serde_json::to_string(msg.data.as_ref().unwrap_or(&Value::Null)) .unwrap_or("failed to serialize signal data".to_string()), - }))?; + })).await?; } title => { diff --git a/src/controllers/user_ws_controller.rs b/src/controllers/user_ws_controller.rs index 18ee901..cb9142f 100644 --- a/src/controllers/user_ws_controller.rs +++ b/src/controllers/user_ws_controller.rs @@ -317,9 +317,16 @@ impl Actor for WsSession { fn stopping(&mut self, ctx: &mut Self::Context) -> Running { // Send an event (user_ws_closed) if let Some(conn) = find_connection(ctx.address()) { - if let Err(e) = events_helper::propagate_event(&Event::UserWsClosed(&conn)) { - eprintln!("Failed to propagate web socket closed event ! {:#?}", e); - } + let future = async move { + if let Err(e) = events_helper::propagate_event(Event::UserWsClosed(conn)).await { + eprintln!("Failed to propagate web socket closed event ! {:#?}", e); + } + + // TODO : remove + eprintln!("Successfully propagated user ws stopping event!"); + }; + + future.into_actor(self).wait(ctx); } remove_connection(ctx.address()); @@ -495,10 +502,9 @@ pub fn send_to_client(conn: &UserWsConnection, msg: &UserWsMessage) -> Res { } /// Send a message to specific users -pub fn send_message_to_specific_connections(filter: F, msg_generator: M, after_send: Option) -> Res +pub fn send_message_to_specific_connections(filter: F, msg_generator: M) -> Res> where F: Fn(&UserWsConnection) -> bool, - M: Fn(&UserWsConnection) -> Res, - A: Fn(&UserWsConnection) -> Res + M: Fn(&UserWsConnection) -> Res { let connections = get_ws_connections_list() .lock() @@ -508,15 +514,11 @@ pub fn send_message_to_specific_connections(filter: F, msg_generator: M .map(|f| f.clone()) .collect::>(); - for con in connections { + for con in &connections { send_message(con.session.clone(), &msg_generator(&con)?)?; - - if let Some(cb) = &after_send { - cb(&con)?; - } } - Ok(()) + Ok(connections) } /// Check out whether user is connected or not @@ -576,6 +578,11 @@ pub fn foreach_connection(mut f: F) -> Res Ok(()) } +/// Get a copy of the entire list of connections +pub fn get_all_connections() -> Res> { + Ok(get_ws_connections_list().lock().unwrap().clone()) +} + /// Events handler pub fn handle_event(e: &events_helper::Event) -> Res { match e { diff --git a/src/data/comment.rs b/src/data/comment.rs index 2149ee1..6445d9b 100644 --- a/src/data/comment.rs +++ b/src/data/comment.rs @@ -4,6 +4,7 @@ use crate::data::user::UserID; +#[derive(Debug, Clone)] pub struct Comment { pub id: u64, pub time_sent: u64, diff --git a/src/helpers/account_helper.rs b/src/helpers/account_helper.rs index 2c20c1f..615e519 100644 --- a/src/helpers/account_helper.rs +++ b/src/helpers/account_helper.rs @@ -100,7 +100,7 @@ pub fn refresh_access_token(token: &UserAccessToken) -> Res { } /// Destroy a given user login tokens -pub fn destroy_login_tokens(access_tokens: &UserAccessToken) -> Res { +pub async fn destroy_login_tokens(access_tokens: &UserAccessToken) -> Res { // Un-register from independent push notifications service // (continue to destroy token even in case of failure) push_notifications_helper::un_register_from_previous_service(access_tokens)?; @@ -110,20 +110,20 @@ pub fn destroy_login_tokens(access_tokens: &UserAccessToken) -> Res { .exec()?; // Send an event (destroyed_login_tokens) - events_helper::propagate_event(&Event::DestroyedLoginToken(access_tokens))?; + events_helper::propagate_event(Event::DestroyedLoginToken(access_tokens.clone())).await?; Ok(()) } /// Clean up old access tokens -pub fn clean_up_old_access_tokens() -> Res { +pub async fn clean_up_old_access_tokens() -> Res { let to_delete = QueryInfo::new(USER_ACCESS_TOKENS_TABLE) .set_custom_where("last_refresh + timeout < ?") .add_custom_where_argument_u64(time()) .exec(db_to_user_access_token)?; for token in to_delete { - destroy_login_tokens(&token)?; + destroy_login_tokens(&token).await?; } Ok(()) @@ -137,11 +137,11 @@ pub fn get_all_login_tokens(id: &UserID) -> Res> { } /// Destroy all login tokens of a user -pub fn destroy_all_user_tokens(id: &UserID) -> ResultBoxError { +pub async fn destroy_all_user_tokens(id: &UserID) -> ResultBoxError { user_ws_controller::disconnect_user_from_all_sockets(id)?; for token in get_all_login_tokens(id)? { - destroy_login_tokens(&token)?; + destroy_login_tokens(&token).await?; } Ok(()) @@ -365,18 +365,18 @@ pub fn export(user_id: &UserID) -> ResultBoxError { } /// Delete a user's account -pub fn delete(user_id: &UserID) -> ResultBoxError { +pub async fn delete(user_id: &UserID) -> ResultBoxError { // Close all WebSockets of user - destroy_all_user_tokens(user_id)?; + destroy_all_user_tokens(user_id).await?; // Delete all group membership - groups_helper::delete_all_user_groups(user_id)?; + groups_helper::delete_all_user_groups(user_id).await?; // Delete all user comments - comments_helper::delete_all_user(user_id)?; + comments_helper::delete_all_user(user_id).await?; // Delete all user posts - posts_helper::delete_all_user(user_id)?; + posts_helper::delete_all_user(user_id).await?; // Delete all responses of user to surveys survey_helper::delete_all_user_responses(user_id)?; @@ -385,13 +385,13 @@ pub fn delete(user_id: &UserID) -> ResultBoxError { likes_helper::delete_all_user(user_id)?; // Delete all conversation messages - conversations_helper::delete_all_user_messages(user_id)?; + conversations_helper::delete_all_user_messages(user_id).await?; // Remove the user from all its conversations - conversations_helper::delete_all_user_conversations(user_id)?; + conversations_helper::delete_all_user_conversations(user_id).await?; // Delete all the notifications related with the user - notifications_helper::delete_all_related_with_user(user_id)?; + notifications_helper::delete_all_related_with_user(user_id).await?; // Delete all user friends, including friendship requests friends_helper::delete_all_user(user_id)?; @@ -409,7 +409,7 @@ pub fn delete(user_id: &UserID) -> ResultBoxError { forez_presence_helper::delete_all_user(user_id)?; // Delete connections to all services - destroy_all_user_tokens(user_id)?; + destroy_all_user_tokens(user_id).await?; // Remove the user from the database database::DeleteQuery::new(USERS_TABLE) @@ -420,7 +420,7 @@ pub fn delete(user_id: &UserID) -> ResultBoxError { } /// Automatically delete the account, if it have been inactive for a too long time -pub fn remove_if_inactive_for_too_long_time(user: &User) -> Res { +pub async fn remove_if_inactive_for_too_long_time(user: &User) -> Res { let timeout = user.delete_account_after.unwrap_or(0); if timeout < 1 { @@ -428,7 +428,7 @@ pub fn remove_if_inactive_for_too_long_time(user: &User) -> Res { } if user.last_activity < time() - timeout { - delete(&user.id)?; + delete(&user.id).await?; } Ok(()) diff --git a/src/helpers/comments_helper.rs b/src/helpers/comments_helper.rs index 6eb336b..450124c 100644 --- a/src/helpers/comments_helper.rs +++ b/src/helpers/comments_helper.rs @@ -13,7 +13,7 @@ use crate::utils::date_utils::{mysql_date, time}; use crate::utils::user_data_utils::user_data_path; /// Create a new comment. In case of success, this function returns the ID of the created comment -pub fn create(c: &Comment) -> ResultBoxError { +pub async fn create(c: &Comment) -> ResultBoxError { let comment_id = database::InsertQuery::new(COMMENTS_TABLE) .add_u64("ID_texte", c.post_id) .add_user_id("ID_personne", &c.user_id) @@ -25,7 +25,7 @@ pub fn create(c: &Comment) -> ResultBoxError { .ok_or(ExecError::new("No ID returned after comment creation!"))?; // Emit an event - events_helper::propagate_event(&Event::NewComment(&get_single(comment_id)?))?; + events_helper::propagate_event(Event::NewComment(get_single(comment_id)?)).await?; Ok(comment_id) } @@ -67,20 +67,20 @@ fn db_to_comment(row: &database::RowResult) -> ResultBoxError { } /// Update comment content -pub fn edit(comment_id: u64, new_content: &str) -> ResultBoxError { +pub async fn edit(comment_id: u64, new_content: &str) -> ResultBoxError { database::UpdateInfo::new(COMMENTS_TABLE) .cond_u64("ID", comment_id) .set_str("commentaire", new_content) .exec()?; // Emit an event - events_helper::propagate_event(&Event::UpdatedComment(&get_single(comment_id)?))?; + events_helper::propagate_event(Event::UpdatedComment(get_single(comment_id)?)).await?; Ok(()) } /// Delete a single comment -pub fn delete(c: &Comment) -> ResultBoxError { +pub async fn delete(c: &Comment) -> ResultBoxError { // Delete associated image (if any) if let Some(image) = &c.image_path { let path = user_data_path(image.as_ref()); @@ -98,31 +98,32 @@ pub fn delete(c: &Comment) -> ResultBoxError { .exec()?; // Emit an event - events_helper::propagate_event(&Event::DeletedComment(c))?; + let c = (*c).clone(); + events_helper::propagate_event(Event::DeletedComment(c)).await?; Ok(()) } /// Delete all the comments associated to a post -pub fn delete_all(post_id: u64) -> ResultBoxError { +pub async fn delete_all(post_id: u64) -> ResultBoxError { for c in &get(post_id)? { - delete(c)?; + delete(c).await?; } Ok(()) } /// Delete all the comments created by a user -pub fn delete_all_user(user_id: &UserID) -> ResultBoxError { +pub async fn delete_all_user(user_id: &UserID) -> ResultBoxError { for comment in &export_all_user(user_id)? { - delete(comment)?; + delete(comment).await?; } Ok(()) } /// Clean old user comments -pub fn clean_old_comments(user: &User) -> Res { +pub async fn clean_old_comments(user: &User) -> Res { let lifetime = user.delete_comments_after.unwrap_or(0); if lifetime < 1 { return Ok(()); @@ -135,7 +136,7 @@ pub fn clean_old_comments(user: &User) -> Res { .exec(db_to_comment)?; for comment in comments { - delete(&comment)?; + delete(&comment).await?; } Ok(()) diff --git a/src/helpers/conversations_helper.rs b/src/helpers/conversations_helper.rs index 8de301d..f72739a 100644 --- a/src/helpers/conversations_helper.rs +++ b/src/helpers/conversations_helper.rs @@ -18,7 +18,7 @@ use crate::utils::date_utils::time; use crate::utils::user_data_utils::delete_user_data_file_if_exists; /// Create a new conversation. This method returns the ID of the created conversation -pub fn create(conv: &NewConversation) -> Res { +pub async fn create(conv: &NewConversation) -> Res { // Create the conversation in the main table let conv_id = InsertQuery::new(CONV_LIST_TABLE) .add_str("name", conv.name.clone().unwrap_or(String::new()).as_str()) @@ -36,15 +36,16 @@ pub fn create(conv: &NewConversation) -> Res { // Initialize the list of members of the group if conv.group_id.is_some() { - update_members_list_for_group_conversation(conv_id)?; + update_members_list_for_group_conversation(conv_id).await?; } else { // Add the creator of the conversation - add_member(conv_id, &conv.owner_id, conv.owner_following, true, Some(&conv.owner_id))?; + add_member(conv_id, &conv.owner_id, conv.owner_following, true, Some(&conv.owner_id)).await?; // Add other members to the conversation for member in &conv.members { if !member.eq(&conv.owner_id) { - add_member(conv_id, member, true, false, Some(&conv.owner_id))?; + add_member(conv_id, member, true, false, + Some(&conv.owner_id)).await?; } } } @@ -53,7 +54,7 @@ pub fn create(conv: &NewConversation) -> Res { } /// Create a conversation for a group -pub fn create_conversation_for_group(group_id: GroupID, min_membership_level: GroupMembershipLevel, name: &String) -> Res { +pub async fn create_conversation_for_group(group_id: GroupID, min_membership_level: GroupMembershipLevel, name: &String) -> Res { create(&NewConversation { owner_id: UserID::invalid(), name: Some(name.to_string()), @@ -64,11 +65,11 @@ pub fn create_conversation_for_group(group_id: GroupID, min_membership_level: Gr owner_following: false, members: Default::default(), can_everyone_add_members: false, - }) + }).await } /// Add a member to a conversation -pub fn add_member(conv_id: ConvID, user_id: &UserID, following: bool, admin: bool, adder: Option<&UserID>) -> Res { +pub async fn add_member(conv_id: ConvID, user_id: &UserID, following: bool, admin: bool, adder: Option<&UserID>) -> Res { InsertQuery::new(CONV_MEMBERS_TABLE) .add_conv_id("conv_id", conv_id) .add_user_id("user_id", user_id) @@ -91,11 +92,11 @@ pub fn add_member(conv_id: ConvID, user_id: &UserID, following: bool, admin: boo user_added: user_id.clone(), }), ) - )?; + ).await?; } else { send_message(&NewConversationMessage::new_server_message( conv_id, ConversationServerMessageType::UserCreatedConversation(user_id.clone()), - ))?; + )).await?; } } @@ -191,13 +192,13 @@ pub fn set_settings(settings: NewConversationSettings) -> Res { } /// Change minimal membership level to join a group conversation -pub fn set_min_group_conversation_membership_level(conv_id: ConvID, level: GroupMembershipLevel) -> Res { +pub async fn set_min_group_conversation_membership_level(conv_id: ConvID, level: GroupMembershipLevel) -> Res { database::UpdateInfo::new(CONV_LIST_TABLE) .cond_conv_id("id", conv_id) .set_u32("min_group_membership_level", level.to_db()) .exec()?; - update_members_list_for_group_conversation(conv_id) + update_members_list_for_group_conversation(conv_id).await } /// Search for private conversation between two users @@ -274,7 +275,7 @@ pub fn export_all_user_messages(user_id: &UserID) -> ResultBoxError Res { +pub async fn clean_old_messages(user: &User) -> Res { let lifetime = user.delete_conversation_messages_after.unwrap_or(0); if lifetime < 1 { return Ok(()); @@ -287,16 +288,16 @@ pub fn clean_old_messages(user: &User) -> Res { .exec(db_to_conversation_message)?; for message in messages { - delete_message(&message)?; + delete_message(&message).await?; } Ok(()) } /// Delete all the messages of a given user -pub fn delete_all_user_messages(user_id: &UserID) -> ResultBoxError { +pub async fn delete_all_user_messages(user_id: &UserID) -> ResultBoxError { for msg in &export_all_user_messages(user_id)? { - delete_message(msg)?; + delete_message(msg).await?; } // Remove all server messages related with the user @@ -312,9 +313,9 @@ pub fn delete_all_user_messages(user_id: &UserID) -> ResultBoxError { } /// Remove the user from all the conversations he belongs to -pub fn delete_all_user_conversations(user_id: &UserID) -> ResultBoxError { +pub async fn delete_all_user_conversations(user_id: &UserID) -> ResultBoxError { for conversation in &get_list_user(user_id)? { - remove_user_from_conversation(user_id, conversation, user_id)?; + remove_user_from_conversation(user_id, conversation, user_id).await?; } Ok(()) @@ -335,7 +336,7 @@ pub fn get_single_message(msg_id: u64) -> ResultBoxError { } /// Send a new conversation message -pub fn send_message(msg: &NewConversationMessage) -> ResultBoxError<()> { +pub async fn send_message(msg: &NewConversationMessage) -> ResultBoxError<()> { let t = time(); // Insert the message in the database @@ -378,33 +379,33 @@ pub fn send_message(msg: &NewConversationMessage) -> ResultBoxError<()> { // Mark the user has seen his message if let Some(user_id) = &msg.user_id { - mark_user_seen(msg.conv_id, user_id, &new_message)?; + mark_user_seen(msg.conv_id, user_id, &new_message).await?; } // Send an event (updated_number_unread_conversations) - events_helper::propagate_event(&Event::UpdatedNumberUnreadConversations(&list_to_notify))?; + events_helper::propagate_event(Event::UpdatedNumberUnreadConversations(list_to_notify)).await?; // Send an event (sent_conversation_message) - events_helper::propagate_event(&Event::NewConversationMessage(&new_message))?; + events_helper::propagate_event(Event::NewConversationMessage(new_message)).await?; Ok(()) } /// Update message content -pub fn update_message_content(msg_id: u64, new_content: &str) -> ResultBoxError<()> { +pub async fn update_message_content(msg_id: u64, new_content: &str) -> ResultBoxError<()> { database::UpdateInfo::new(CONV_MESSAGES_TABLE) .cond_u64("id", msg_id) .set_str("message", new_content) .exec()?; // Send an event (conv_message_updated) - events_helper::propagate_event(&Event::UpdatedConversationMessage(&get_single_message(msg_id)?))?; + events_helper::propagate_event(Event::UpdatedConversationMessage(get_single_message(msg_id)?)).await?; Ok(()) } /// Remove a message from a conversation -pub fn delete_message(msg: &ConversationMessage) -> ResultBoxError<()> { +pub async fn delete_message(msg: &ConversationMessage) -> ResultBoxError<()> { // Delete associated files if let Some(file) = &msg.file { @@ -420,14 +421,14 @@ pub fn delete_message(msg: &ConversationMessage) -> ResultBoxError<()> { .exec()?; // Send en event (conv_message_deleted) - events_helper::propagate_event(&Event::DeleteConversationMessage(msg))?; + events_helper::propagate_event(Event::DeleteConversationMessage(msg.clone())).await?; Ok(()) } /// Delete a message with a specific ID -pub fn delete_message_by_id(id: u64) -> ResultBoxError<()> { - delete_message(&get_single_message(id)?) +pub async fn delete_message_by_id(id: u64) -> ResultBoxError<()> { + delete_message(&get_single_message(id)?).await } /// Count the number of unread conversation for a specified user @@ -453,7 +454,7 @@ pub fn get_list_unread(user_id: &UserID) -> ResultBoxError> { } /// Indicate that a user has seen the last messages of a conversation -pub fn mark_user_seen(conv_id: ConvID, user_id: &UserID, last_msg: &ConversationMessage) -> ResultBoxError<()> { +pub async fn mark_user_seen(conv_id: ConvID, user_id: &UserID, last_msg: &ConversationMessage) -> Res { database::UpdateInfo::new(CONV_MEMBERS_TABLE) .cond_conv_id("conv_id", conv_id) .cond_user_id("user_id", user_id) @@ -462,34 +463,34 @@ pub fn mark_user_seen(conv_id: ConvID, user_id: &UserID, last_msg: &Conversation .exec()?; // Push an event - events_helper::propagate_event(&Event::SeenLastConversationMessage(user_id, conv_id))?; + events_helper::propagate_event(Event::SeenLastConversationMessage(user_id.clone(), conv_id)).await?; // Push an event (updated_number_unread_conversations) - events_helper::propagate_event(&Event::UpdatedNumberUnreadConversations(&vec![user_id.clone()]))?; + events_helper::propagate_event(Event::UpdatedNumberUnreadConversations(vec![user_id.clone()])).await?; Ok(()) } /// Remove a user from a conversation -pub fn remove_user_from_conversation(user_id: &UserID, conv: &Conversation, remover: &UserID) -> ResultBoxError<()> { +pub async fn remove_user_from_conversation(user_id: &UserID, conv: &Conversation, remover: &UserID) -> ResultBoxError<()> { if conv.is_last_admin(user_id) { - delete_conversation(conv) + delete_conversation(conv).await } else { - remove_member(user_id, conv.id, Some(remover)) + remove_member(user_id, conv.id, Some(remover)).await } } /// Update members list for all the conversations of a given group -pub fn update_members_list_for_group_conversations(group_id: &GroupID) -> Res { +pub async fn update_members_list_for_group_conversations(group_id: &GroupID) -> Res { for conv in get_list_group(group_id)? { - update_members_list_for_group_conversation(conv.id)?; + update_members_list_for_group_conversation(conv.id).await?; } Ok(()) } /// Update the list of members for a group conversation -pub fn update_members_list_for_group_conversation(conv_id: ConvID) -> Res { +pub async fn update_members_list_for_group_conversation(conv_id: ConvID) -> Res { let conv = get_single(conv_id)?; if !conv.is_linked_to_group() { @@ -511,7 +512,7 @@ pub fn update_members_list_for_group_conversation(conv_id: ConvID) -> Res { // Create the member else if conv.min_group_membership_level.as_ref().unwrap() >= &member.level { - add_member(conv_id, &member.user_id, true, member.is_admin(), None)?; + add_member(conv_id, &member.user_id, true, member.is_admin(), None).await?; } } @@ -521,7 +522,7 @@ pub fn update_members_list_for_group_conversation(conv_id: ConvID) -> Res { // Remove the member, if required if member.is_none() || conv.min_group_membership_level.as_ref().unwrap() < &member.unwrap().level { - remove_member(&conv_member.user_id, conv_id, None)?; + remove_member(&conv_member.user_id, conv_id, None).await?; } } @@ -529,10 +530,10 @@ pub fn update_members_list_for_group_conversation(conv_id: ConvID) -> Res { } /// Remove permanently a conversation -pub fn delete_conversation(conv: &Conversation) -> ResultBoxError<()> { +pub async fn delete_conversation(conv: &Conversation) -> ResultBoxError<()> { // Delete all the messages of the conversations for message in get_all_messages(conv.id)? { - delete_message(&message)?; + delete_message(&message).await?; } // Delete all the members of the conversation @@ -551,24 +552,24 @@ pub fn delete_conversation(conv: &Conversation) -> ResultBoxError<()> { .exec()?; // Propagate information - events_helper::propagate_event(&Event::DeletedConversation(conv.id))?; + events_helper::propagate_event(Event::DeletedConversation(conv.id)).await?; Ok(()) } /// Delete all the conversations of a group -pub fn delete_all_group_conversations(group_id: &GroupID) -> Res { +pub async fn delete_all_group_conversations(group_id: &GroupID) -> Res { for conv in get_list_group(group_id)? { - delete_conversation(&conv)?; + delete_conversation(&conv).await?; } Ok(()) } /// Delete a conversation membership -pub fn remove_member(user_id: &UserID, conv_id: ConvID, remover: Option<&UserID>) -> ResultBoxError<()> { +pub async fn remove_member(user_id: &UserID, conv_id: ConvID, remover: Option<&UserID>) -> ResultBoxError<()> { for msg in get_user_messages_for_conversations(conv_id, user_id)? { - delete_message(&msg)?; + delete_message(&msg).await?; } // Delete membership @@ -583,7 +584,7 @@ pub fn remove_member(user_id: &UserID, conv_id: ConvID, remover: Option<&UserID> send_message(&NewConversationMessage::new_server_message( conv_id, ConversationServerMessageType::UserLeftConversation(user_id.clone()), - ))?; + )).await?; } else { send_message(&NewConversationMessage::new_server_message( conv_id, @@ -591,12 +592,12 @@ pub fn remove_member(user_id: &UserID, conv_id: ConvID, remover: Option<&UserID> user_who_removed: remover.clone(), user_removed: user_id.clone(), }), - ))?; + )).await?; } } // Propagate event - events_helper::propagate_event(&Event::RemovedUserFromConversation(user_id, conv_id))?; + events_helper::propagate_event(Event::RemovedUserFromConversation(user_id.clone(), conv_id)).await?; Ok(()) } diff --git a/src/helpers/events_helper.rs b/src/helpers/events_helper.rs index dabfc53..393217f 100644 --- a/src/helpers/events_helper.rs +++ b/src/helpers/events_helper.rs @@ -2,7 +2,7 @@ //! //! @author Pierre Hubert - +use async_recursion::async_recursion; use crate::controllers::{calls_controller, comments_controller, conversations_controller, notifications_controller, rtc_relay_controller, user_ws_controller}; use crate::data::call_signal::{CloseCallStream, NewRtcRelayMessage, NewUserCallSignal, UserCallOfferRequest}; @@ -15,84 +15,85 @@ use crate::data::user_token::UserAccessToken; use crate::data::user_ws_connection::UserWsConnection; use crate::helpers::push_notifications_helper; -pub enum Event<'a> { +pub enum Event { /// Websocket of a user was closed /// /// This event is propagated BEFORE the removal of the connection from the list - UserWsClosed(&'a UserWsConnection), + UserWsClosed(UserWsConnection), /// Destroyed a login token - DestroyedLoginToken(&'a UserAccessToken), + DestroyedLoginToken(UserAccessToken), /// Updated the number of notifications of one of multiple users user - UpdatedNotificationsNumber(&'a Vec), + UpdatedNotificationsNumber(Vec), /// Indicate that a user has seen the last message of a conversation - SeenLastConversationMessage(&'a UserID, ConvID), + SeenLastConversationMessage(UserID, ConvID), /// Updated the number of unread conversations - UpdatedNumberUnreadConversations(&'a Vec), + UpdatedNumberUnreadConversations(Vec), /// Created a new conversation message - NewConversationMessage(&'a ConversationMessage), + NewConversationMessage(ConversationMessage), /// Updated conversation message - UpdatedConversationMessage(&'a ConversationMessage), + UpdatedConversationMessage(ConversationMessage), /// Deleted a conversation message - DeleteConversationMessage(&'a ConversationMessage), + DeleteConversationMessage(ConversationMessage), /// User is writing a message in a conversation - UserIsWritingMessageInConversation(&'a UserID, ConvID), + UserIsWritingMessageInConversation(UserID, ConvID), /// Removed a user from a conversation - RemovedUserFromConversation(&'a UserID, ConvID), + RemovedUserFromConversation(UserID, ConvID), /// Delete a conversation DeletedConversation(ConvID), /// Created a new comment - NewComment(&'a Comment), + NewComment(Comment), /// Updated a comment - UpdatedComment(&'a Comment), + UpdatedComment(Comment), /// Deleted a comment - DeletedComment(&'a Comment), + DeletedComment(Comment), /// Connection to RTC relay was closed ClosedRTCRelayWebSocket, /// User joined call - UserJoinedCall(&'a ConvID, &'a UserID), + UserJoinedCall(ConvID, UserID), /// User left call - UserLeftCall(&'a ConvID, &'a UserID), + UserLeftCall(ConvID, UserID), /// Got a new user call signal - NewUserCallSignal(&'a NewUserCallSignal), + NewUserCallSignal(NewUserCallSignal), /// Got a new RTC relay message - NewRTCRelayMessage(&'a NewRtcRelayMessage), + NewRTCRelayMessage(NewRtcRelayMessage), /// User requested an offer for a call - UserRequestedCallOffer(&'a UserCallOfferRequest), + UserRequestedCallOffer(UserCallOfferRequest), /// Close call stream - CloseCallStream(&'a CloseCallStream), + CloseCallStream(CloseCallStream), /// No event None, } /// Propagate an event through the different components of the application -pub fn propagate_event(e: &Event) -> Res { - conversations_controller::handle_event(e)?; - comments_controller::handle_event(e)?; - notifications_controller::handle_event(e)?; - user_ws_controller::handle_event(e)?; - calls_controller::handle_event(e)?; - rtc_relay_controller::handle_event(e)?; - push_notifications_helper::handle_event(e)?; +#[async_recursion(?Send)] +pub async fn propagate_event(e: Event) -> Res { + conversations_controller::handle_event(&e).await?; + comments_controller::handle_event(&e).await?; + notifications_controller::handle_event(&e)?; + user_ws_controller::handle_event(&e)?; + calls_controller::handle_event(&e).await?; + rtc_relay_controller::handle_event(&e)?; + push_notifications_helper::handle_event(&e)?; Ok(()) } \ No newline at end of file diff --git a/src/helpers/groups_helper.rs b/src/helpers/groups_helper.rs index 942c0be..f020f62 100644 --- a/src/helpers/groups_helper.rs +++ b/src/helpers/groups_helper.rs @@ -94,7 +94,7 @@ impl GroupPostsCreationLevel { } /// Create a new group. Returns the ID of the new group -pub fn create(group: &NewGroup) -> ResultBoxError { +pub async fn create(group: &NewGroup) -> ResultBoxError { // First, create the group let group_id = database::InsertQuery::new(GROUPS_LIST_TABLE) .add_u64("time_create", time()) @@ -111,13 +111,13 @@ pub fn create(group: &NewGroup) -> ResultBoxError { time_create: time(), level: GroupMembershipLevel::ADMINISTRATOR, following: true, - })?; + }).await?; Ok(group_id) } /// Insert a new group into the database -pub fn insert_member(m: &GroupMember) -> ResultBoxError<()> { +pub async fn insert_member(m: &GroupMember) -> ResultBoxError<()> { database::InsertQuery::new(GROUPS_MEMBERS_TABLE) .add_group_id("groups_id", &m.group_id) .add_user_id("user_id", &m.user_id) @@ -126,27 +126,27 @@ pub fn insert_member(m: &GroupMember) -> ResultBoxError<()> { .insert_drop_result()?; if m.level.is_at_least_member() { - conversations_helper::update_members_list_for_group_conversations(&m.group_id)?; + conversations_helper::update_members_list_for_group_conversations(&m.group_id).await?; } Ok(()) } /// Remove a user's membership -pub fn delete_member(group_id: &GroupID, user_id: &UserID) -> ResultBoxError { +pub async fn delete_member(group_id: &GroupID, user_id: &UserID) -> ResultBoxError { database::DeleteQuery::new(GROUPS_MEMBERS_TABLE) .cond_group_id("groups_id", group_id) .cond_user_id("user_id", user_id) .exec()?; // Update access to group's conversations - conversations_helper::update_members_list_for_group_conversations(&group_id)?; + conversations_helper::update_members_list_for_group_conversations(&group_id).await?; Ok(()) } /// Update a user's membership level -pub fn update_membership_level(group_id: &GroupID, user_id: &UserID, new_level: GroupMembershipLevel) -> ResultBoxError { +pub async fn update_membership_level(group_id: &GroupID, user_id: &UserID, new_level: GroupMembershipLevel) -> ResultBoxError { database::UpdateInfo::new(GROUPS_MEMBERS_TABLE) .cond_user_id("user_id", user_id) .cond_group_id("groups_id", group_id) @@ -154,7 +154,7 @@ pub fn update_membership_level(group_id: &GroupID, user_id: &UserID, new_level: .exec()?; // Update access to group's conversations - conversations_helper::update_members_list_for_group_conversations(&group_id)?; + conversations_helper::update_members_list_for_group_conversations(&group_id).await?; Ok(()) } @@ -440,7 +440,7 @@ pub fn get_list_members(g: &GroupID) -> ResultBoxError> { } /// Send an invitation to a user -pub fn send_invitation(group_id: &GroupID, user_id: &UserID) -> ResultBoxError { +pub async fn send_invitation(group_id: &GroupID, user_id: &UserID) -> ResultBoxError { insert_member(&GroupMember { id: 0, user_id: user_id.clone(), @@ -448,7 +448,7 @@ pub fn send_invitation(group_id: &GroupID, user_id: &UserID) -> ResultBoxError { time_create: time(), level: GroupMembershipLevel::INVITED, following: true, - }) + }).await } /// Check out whether a user received an invitation to join a group or not @@ -462,18 +462,18 @@ pub fn received_invitation(group_id: &GroupID, user_id: &UserID) -> ResultBoxErr } /// Respond to a group membership invitation -pub fn respond_invitation(g: &GroupID, u: &UserID, accept: bool) -> ResultBoxError { +pub async fn respond_invitation(g: &GroupID, u: &UserID, accept: bool) -> ResultBoxError { match accept { - true => update_membership_level(g, u, GroupMembershipLevel::MEMBER), - false => delete_member(g, u), + true => update_membership_level(g, u, GroupMembershipLevel::MEMBER).await, + false => delete_member(g, u).await, } } /// Respond to a group membership request -pub fn respond_request(group_id: &GroupID, user_id: &UserID, accept: bool) -> ResultBoxError { +pub async fn respond_request(group_id: &GroupID, user_id: &UserID, accept: bool) -> ResultBoxError { match accept { - true => update_membership_level(&group_id, &user_id, GroupMembershipLevel::MEMBER), - false => delete_member(&group_id, &user_id), + true => update_membership_level(&group_id, &user_id, GroupMembershipLevel::MEMBER).await, + false => delete_member(&group_id, &user_id).await, } } @@ -489,7 +489,7 @@ pub fn can_user_create_posts(group_id: &GroupID, user_id: &UserID) -> ResultBoxE } /// Delete a group -pub fn delete(group_id: &GroupID) -> ResultBoxError { +pub async fn delete(group_id: &GroupID) -> ResultBoxError { // Delete all likes of the group likes_helper::delete_all(group_id.id(), LikeType::GROUP)?; @@ -497,13 +497,13 @@ pub fn delete(group_id: &GroupID) -> ResultBoxError { delete_logo(group_id)?; // Delete all group posts - posts_helper::delete_all_group(group_id)?; + posts_helper::delete_all_group(group_id).await?; // Delete all group related notifications - notifications_helper::delete_all_related_with_group(group_id)?; + notifications_helper::delete_all_related_with_group(group_id).await?; // Delete all conversations related with the group - conversations_helper::delete_all_group_conversations(group_id)?; + conversations_helper::delete_all_group_conversations(group_id).await?; // Delete all Forez presences related with the group forez_presence_helper::delete_all_group(group_id)?; @@ -520,12 +520,12 @@ pub fn delete(group_id: &GroupID) -> ResultBoxError { } /// Delete all the groups a user belongs to -pub fn delete_all_user_groups(user_id: &UserID) -> ResultBoxError { +pub async fn delete_all_user_groups(user_id: &UserID) -> ResultBoxError { for group_id in &get_list_user(user_id, false)? { if is_last_admin(group_id, user_id)? { - delete(group_id)?; + delete(group_id).await?; } else { - delete_member(group_id, user_id)?; + delete_member(group_id, user_id).await?; } } diff --git a/src/helpers/notifications_helper.rs b/src/helpers/notifications_helper.rs index b6f21b4..7146fc3 100644 --- a/src/helpers/notifications_helper.rs +++ b/src/helpers/notifications_helper.rs @@ -17,18 +17,18 @@ use crate::utils::date_utils; use crate::utils::date_utils::time; /// Create post notification -pub fn create_post_notification(from_user: &UserID, post_id: u64, action: NotifEventType) -> ResultBoxError { +pub async fn create_post_notification(from_user: &UserID, post_id: u64, action: NotifEventType) -> ResultBoxError { let mut n = PartialNotification::new() .set_from_user_id(from_user) .set_on_elem_id(post_id) .set_on_elem_type(NotifElemType::POST) .set_type(action); - push(&mut n) + push(&mut n).await } /// Create & push friend notification -pub fn create_friends_notification(from_user: &UserID, dest_user: &UserID, action: NotifEventType) -> ResultBoxError { +pub async fn create_friends_notification(from_user: &UserID, dest_user: &UserID, action: NotifEventType) -> ResultBoxError { let mut n = PartialNotification::new() .set_from_user_id(from_user) .set_dest_user_id(dest_user) @@ -36,13 +36,13 @@ pub fn create_friends_notification(from_user: &UserID, dest_user: &UserID, actio .set_on_elem_type(NotifElemType::FRIENDSHIP_REQUEST) .set_type(action); - push(&mut n) + push(&mut n).await } /// Create & push a group membership notification -pub fn create_group_membership_notification(user_id: &UserID, moderator_id: Option<&UserID>, group_id: &GroupID, kind: NotifEventType) -> ResultBoxError { +pub async fn create_group_membership_notification(user_id: &UserID, moderator_id: Option<&UserID>, group_id: &GroupID, kind: NotifEventType) -> Res { // Delete related group membership notifications - delete_all_related_to_group_membership_notifications(user_id, group_id)?; + delete_all_related_to_group_membership_notifications(user_id, group_id).await?; let mut n = PartialNotification::new() .set_on_elem_id(group_id.id()) @@ -64,11 +64,11 @@ pub fn create_group_membership_notification(user_id: &UserID, moderator_id: Opti } } - push(&mut n) + push(&mut n).await } /// Push a new notification -fn push(n: &mut PartialNotification) -> ResultBoxError +async fn push(n: &mut PartialNotification) -> ResultBoxError { if n.time_create.is_none() { @@ -110,7 +110,7 @@ fn push(n: &mut PartialNotification) -> ResultBoxError n.dest_user_id = Some(post.user_id); } - return push_private(n); + return push_private(n).await; } // Posts on user page @@ -129,12 +129,12 @@ fn push(n: &mut PartialNotification) -> ResultBoxError .map(|f| f.friend_id) .collect(); - return push_public(n, friends); + return push_public(n, friends).await; } // Posts on group pages else if post.is_on_group_page() { - return push_group_members(n, post.group_id().unwrap()); + return push_group_members(n, post.group_id().unwrap()).await; } // Unsupported scenario @@ -148,7 +148,7 @@ fn push(n: &mut PartialNotification) -> ResultBoxError n.container_id = None; n.container_type = None; - return push_private(n); + return push_private(n).await; } @@ -165,9 +165,9 @@ fn push(n: &mut PartialNotification) -> ResultBoxError { // Push the notification in private way (if it has a destination, // generally the target user of the membership request) - push_private(n) + push_private(n).await } else { - push_group_moderators(n, &GroupID::new(n.on_elem_id.unwrap())) + push_group_moderators(n, &GroupID::new(n.on_elem_id.unwrap())).await }; } else { unimplemented!(); @@ -175,32 +175,32 @@ fn push(n: &mut PartialNotification) -> ResultBoxError } /// Push a notification to group members -fn push_group_members(n: &mut PartialNotification, group_id: &GroupID) -> ResultBoxError { +async fn push_group_members(n: &mut PartialNotification, group_id: &GroupID) -> ResultBoxError { let mut list = groups_helper::get_list_followers(group_id)?; list = list.into_iter().filter(|f| f != n.from_user_id.as_ref().unwrap()).collect(); - push_public(n, list) + push_public(n, list).await } /// Push a notification to all the moderators & administrators of a group -fn push_group_moderators(n: &mut PartialNotification, group_id: &GroupID) -> ResultBoxError { +async fn push_group_moderators(n: &mut PartialNotification, group_id: &GroupID) -> ResultBoxError { let list = groups_helper::get_list_members(group_id)?; let list: Vec = list .into_iter() .filter(|e| e.is_moderator()) .map(|f| f.user_id) .collect(); - push_public(n, list) + push_public(n, list).await } /// Push a public notification -fn push_public(n: &mut PartialNotification, users: Vec) -> ResultBoxError { +async fn push_public(n: &mut PartialNotification, users: Vec) -> Res { n.visibility = Some(NotifEventVisibility::EVENT_PUBLIC); for user_id in users { n.dest_user_id = Some(user_id); if !similar_exists(n)? { - create(n)?; + create(n).await?; } } @@ -208,18 +208,18 @@ fn push_public(n: &mut PartialNotification, users: Vec) -> ResultBoxErro } /// Push a private notification (to 1 user) -fn push_private(n: &mut PartialNotification) -> ResultBoxError { +async fn push_private(n: &mut PartialNotification) -> Res { n.visibility = Some(NotifEventVisibility::EVENT_PRIVATE); if !similar_exists(n)? { - create(n)?; + create(n).await?; } Ok(()) } /// Create a new notification -fn create(n: &PartialNotification) -> ResultBoxError { +async fn create(n: &PartialNotification) -> ResultBoxError { if n.dest_user_id.is_none() || n.from_user_id.is_none() { return Err(ExecError::boxed_new("Trying to send a notification without a source or a destination!")); @@ -230,14 +230,14 @@ fn create(n: &PartialNotification) -> ResultBoxError { .insert_drop_result()?; // Send a notification (updated_number_conversations) - events_helper::propagate_event(&Event::UpdatedNotificationsNumber(&vec![n.dest_user_id.clone().unwrap()]))?; + events_helper::propagate_event(Event::UpdatedNotificationsNumber(vec![n.dest_user_id.clone().unwrap()])).await?; Ok(()) } /// Delete notifications -pub fn delete(notification: &PartialNotification) -> ResultBoxError { +pub async fn delete(notification: &PartialNotification) -> ResultBoxError { let conditions = notif_to_db(notification, false); // Get the list of affected users @@ -252,23 +252,23 @@ pub fn delete(notification: &PartialNotification) -> ResultBoxError { .exec()?; // Send a notification (updated_number_conversations) - events_helper::propagate_event(&Event::UpdatedNotificationsNumber(&users))?; + events_helper::propagate_event(Event::UpdatedNotificationsNumber(users)).await?; Ok(()) } /// Delete all the notifications of a given user -pub fn delete_all_user(user_id: &UserID) -> ResultBoxError { - delete(&PartialNotification::new().set_dest_user_id(user_id)) +pub async fn delete_all_user(user_id: &UserID) -> ResultBoxError { + delete(&PartialNotification::new().set_dest_user_id(user_id)).await } /// Delete all the notifications related with a user -pub fn delete_all_related_with_user(user_id: &UserID) -> ResultBoxError { +pub async fn delete_all_related_with_user(user_id: &UserID) -> ResultBoxError { // Delete all the notifications targeting the user - delete_all_user(user_id)?; + delete_all_user(user_id).await?; // Delete all the notifications created by the user - delete(&PartialNotification::new().set_from_user_id(user_id))?; + delete(&PartialNotification::new().set_from_user_id(user_id)).await?; Ok(()) } @@ -288,66 +288,66 @@ pub fn clean_old_user_notifications(user: &User) -> Res { } /// Delete all the notifications related with a group -pub fn delete_all_related_with_group(group_id: &GroupID) -> ResultBoxError { +pub async fn delete_all_related_with_group(group_id: &GroupID) -> ResultBoxError { delete(&PartialNotification::new() .set_on_elem_type(NotifElemType::GROUP_MEMBERSHIP) .set_on_elem_id(group_id.id()) - )?; + ).await?; delete(&PartialNotification::new() .set_on_elem_type(NotifElemType::GROUP_PAGE) .set_on_elem_id(group_id.id()) - ) + ).await } /// Delete all the notifications related to a group membership -pub fn delete_all_related_to_group_membership_notifications(user_id: &UserID, group_id: &GroupID) -> ResultBoxError { +pub async fn delete_all_related_to_group_membership_notifications(user_id: &UserID, group_id: &GroupID) -> ResultBoxError { let mut n = PartialNotification::new() .set_on_elem_type(NotifElemType::GROUP_MEMBERSHIP) .set_on_elem_id(group_id.id()); n.dest_user_id = Some(user_id.clone()); n.from_user_id = None; - delete(&n)?; + delete(&n).await?; n.dest_user_id = None; n.from_user_id = Some(user_id.clone()); - delete(&n)?; + delete(&n).await?; Ok(()) } /// Delete all the notifications about a post targeting a specified user -pub fn delete_all_post_notifications_targeting_user(user_id: &UserID, post_id: PostID) -> ResultBoxError { +pub async fn delete_all_post_notifications_targeting_user(user_id: &UserID, post_id: PostID) -> ResultBoxError { let n = PartialNotification::new() .set_dest_user_id(user_id) .set_on_elem_type(NotifElemType::POST) .set_on_elem_id(post_id); - delete(&n) + delete(&n).await } /// Delete all the notifications related with a post -pub fn delete_all_related_with_post(post_id: PostID) -> ResultBoxError { +pub async fn delete_all_related_with_post(post_id: PostID) -> ResultBoxError { let n = PartialNotification::new() .set_on_elem_type(NotifElemType::POST) .set_on_elem_id(post_id); - delete(&n) + delete(&n).await } /// Delete all the notifications related with a friendship request -pub fn delete_all_related_with_friendship_request(user_one: &UserID, user_two: &UserID) -> ResultBoxError { +pub async fn delete_all_related_with_friendship_request(user_one: &UserID, user_two: &UserID) -> ResultBoxError { let mut n = PartialNotification::new() .set_on_elem_type(NotifElemType::FRIENDSHIP_REQUEST); n.from_user_id = Some(user_one.clone()); n.dest_user_id = Some(user_two.clone()); - delete(&n)?; + delete(&n).await?; n.from_user_id = Some(user_two.clone()); n.dest_user_id = Some(user_one.clone()); - delete(&n) + delete(&n).await } /// Check out whether a similar notification exists for given specifications diff --git a/src/helpers/posts_helper.rs b/src/helpers/posts_helper.rs index 4c14af5..ae8cad9 100644 --- a/src/helpers/posts_helper.rs +++ b/src/helpers/posts_helper.rs @@ -412,15 +412,15 @@ pub fn set_content(post_id: u64, new_content: &str) -> ResultBoxError { } /// Delete a post -pub fn delete(p: &Post) -> ResultBoxError { +pub async fn delete(p: &Post) -> ResultBoxError { // Delete all the notifications related with the post - notifications_helper::delete_all_related_with_post(p.id)?; + notifications_helper::delete_all_related_with_post(p.id).await?; // Delete all the likes associated with the post likes_helper::delete_all(p.id, LikeType::POST)?; // Delete all the comments associated to the post - comments_helper::delete_all(p.id)?; + comments_helper::delete_all(p.id).await?; // Delete associated file / resource (if any) match &p.kind { @@ -452,25 +452,25 @@ pub fn delete(p: &Post) -> ResultBoxError { } /// Delete all the posts related with a group -pub fn delete_all_group(group_id: &GroupID) -> ResultBoxError { +pub async fn delete_all_group(group_id: &GroupID) -> ResultBoxError { for post in export_all_posts_group(group_id)? { - delete(&post)?; + delete(&post).await?; } Ok(()) } /// Delete all the posts of a given user -pub fn delete_all_user(user_id: &UserID) -> ResultBoxError { +pub async fn delete_all_user(user_id: &UserID) -> ResultBoxError { for post in &export_all_posts_user(user_id)? { - delete(&post)?; + delete(&post).await?; } Ok(()) } /// Clean the old posts of a user -pub fn clean_old_posts(user: &User) -> Res { +pub async fn clean_old_posts(user: &User) -> Res { let lifetime = user.delete_posts_after.unwrap_or(0); if lifetime < 1 { return Ok(()); @@ -484,7 +484,7 @@ pub fn clean_old_posts(user: &User) -> Res { .exec(db_to_post)?; for post in posts { - delete(&post)?; + delete(&post).await?; } Ok(()) diff --git a/src/routes.rs b/src/routes.rs index 43172e4..04d22c8 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -413,7 +413,7 @@ pub async fn find_route(req_uri: &str, call: Option<&mut HttpRequestHandler>) -> // Likes controller - route!(req_uri, call, POST_LOGIN, "/likes/update", likes_controller::update_async); + route!(req_uri, call, POST_LOGIN, "/likes/update", likes_controller::update); // Surveys controller diff --git a/src/user_ws_routes.rs b/src/user_ws_routes.rs index 7c5486a..640f26f 100644 --- a/src/user_ws_routes.rs +++ b/src/user_ws_routes.rs @@ -21,20 +21,20 @@ pub async fn exec_user_ws_route(uri: &str, handler: &mut UserWsRequestHandler) - "$main/unregister_post" => Some(user_ws_actions::unregister_post(handler)), // Likes controller - "likes/update" => Some(likes_controller::update(handler)), + "likes/update" => Some(likes_controller::update(handler).await), // Conversations controller - "conversations/is_writing" => Some(conversations_controller::member_is_writing(handler)), + "conversations/is_writing" => Some(conversations_controller::member_is_writing(handler).await), // Calls controller - "calls/config" => Some(calls_controller::get_config(handler)), - "calls/join" => Some(calls_controller::join_call(handler)), - "calls/leave" => Some(calls_controller::leave_call(handler)), - "calls/members" => Some(calls_controller::get_members_list(handler)), - "calls/signal" => Some(calls_controller::on_client_signal(handler)), - "calls/mark_ready" => Some(calls_controller::mark_user_ready(handler)), - "calls/request_offer" => Some(calls_controller::request_offer(handler)), - "calls/stop_streaming" => Some(calls_controller::stop_streaming(handler)), + "calls/config" => Some(calls_controller::get_config(handler).await), + "calls/join" => Some(calls_controller::join_call(handler).await), + "calls/leave" => Some(calls_controller::leave_call(handler).await), + "calls/members" => Some(calls_controller::get_members_list(handler).await), + "calls/signal" => Some(calls_controller::on_client_signal(handler).await), + "calls/mark_ready" => Some(calls_controller::mark_user_ready(handler).await), + "calls/request_offer" => Some(calls_controller::request_offer(handler).await), + "calls/stop_streaming" => Some(calls_controller::stop_streaming(handler).await), // Presence controller "forez_presence/list" => Some(forez_controller::get_list(handler)),