1
0
mirror of https://gitlab.com/comunic/comunicapiv3 synced 2025-09-25 14:19:45 +00:00

Turned a lot of function to async mode

This commit is contained in:
2022-03-12 07:47:22 +01:00
parent cfaaef68b7
commit 09ce13c554
24 changed files with 368 additions and 357 deletions

View File

@@ -94,7 +94,7 @@ pub async fn login_user(request: &mut HttpRequestHandler) -> RequestResult {
/// Sign out user
pub async fn logout_user(request: &mut HttpRequestHandler) -> RequestResult {
if let Some(token) = request.user_access_token() {
account_helper::destroy_login_tokens(token)?;
account_helper::destroy_login_tokens(token).await?;
}
request.success("User disconnected.")
@@ -102,7 +102,7 @@ pub async fn logout_user(request: &mut HttpRequestHandler) -> RequestResult {
/// Disconnect a user from all his devices
pub async fn disconnect_all_devices(r: &mut HttpRequestHandler) -> RequestResult {
account_helper::destroy_all_user_tokens(r.user_id_ref()?)?;
account_helper::destroy_all_user_tokens(r.user_id_ref()?).await?;
r.success("Successfully disconnected!")
}
@@ -208,7 +208,7 @@ pub async fn delete_account(r: &mut HttpRequestHandler) -> RequestResult {
r.forbidden("You shall not delete MY account (whoever you are, please note that hacking is bad !!!)".to_string())?;
}
account_helper::delete(r.user_id_ref()?)?;
account_helper::delete(r.user_id_ref()?).await?;
r.success("Account deleted.")
}

View File

@@ -88,7 +88,7 @@ impl UserWsRequestHandler {
}
/// Get calls configuration
pub fn get_config(r: &mut UserWsRequestHandler) -> RequestResult {
pub async fn get_config(r: &mut UserWsRequestHandler) -> RequestResult {
// Check whether the user is the member of a call or not
if let None = r.get_conn().active_call {
r.forbidden("You do not belong to any call yet!".to_string())?;
@@ -124,7 +124,7 @@ pub fn is_conversation_having_call(conv_id: &ConvID) -> bool {
}
/// Join a call
pub fn join_call(r: &mut UserWsRequestHandler) -> RequestResult {
pub async fn join_call(r: &mut UserWsRequestHandler) -> RequestResult {
let conv_id = r.post_conv("convID")?.conv_id;
// Check if the conversation can have a call
@@ -135,23 +135,21 @@ pub fn join_call(r: &mut UserWsRequestHandler) -> RequestResult {
// Remove any other active call with current WebSocket
if let Some(call) = &r.get_conn().active_call {
make_user_leave_call(&call.conv_id, r.get_conn())?;
make_user_leave_call(&call.conv_id, r.get_conn()).await?;
}
// Remove any other active connection to current call of current user
user_ws_controller::foreach_connection(|conn| {
for conn in user_ws_controller::get_all_connections()? {
if conn.user_id() != r.user_id_ref()? || conn.session.eq(&r.get_conn().session) {
return Ok(());
}
if let Some(call) = &conn.active_call {
if call.conv_id == conv_id {
make_user_leave_call(&call.conv_id, conn)?;
make_user_leave_call(&call.conv_id, &conn).await?;
}
}
Ok(())
})?;
};
r.update_conn(|r| r.active_call = Some(ActiveCall {
conv_id,
@@ -159,13 +157,13 @@ pub fn join_call(r: &mut UserWsRequestHandler) -> RequestResult {
}))?;
// Propagate event
events_helper::propagate_event(&Event::UserJoinedCall(&conv_id, r.user_id_ref()?))?;
events_helper::propagate_event(Event::UserJoinedCall(conv_id, r.user_id()?)).await?;
Ok(())
}
/// Leave a call
pub fn leave_call(r: &mut UserWsRequestHandler) -> RequestResult {
pub async fn leave_call(r: &mut UserWsRequestHandler) -> RequestResult {
// Warning ! For some technical reasons, we do not check if the user
// really belongs to the conversation, so be careful when manipulating
// conversation ID here
@@ -177,13 +175,13 @@ pub fn leave_call(r: &mut UserWsRequestHandler) -> RequestResult {
}
// Make the user leave the call
make_user_leave_call(&conv_id, r.get_conn())?;
make_user_leave_call(&conv_id, r.get_conn()).await?;
r.success("Left call!")
}
/// Get the list of members of a call
pub fn get_members_list(r: &mut UserWsRequestHandler) -> RequestResult {
pub async fn get_members_list(r: &mut UserWsRequestHandler) -> RequestResult {
let conv_id = r.post_call_id("callID")?;
let mut list = vec![];
@@ -204,7 +202,7 @@ pub fn gen_call_hash(call_id: &ConvID, peer_id: &UserID) -> String {
}
/// Handles client signal
pub fn on_client_signal(r: &mut UserWsRequestHandler) -> RequestResult {
pub async fn on_client_signal(r: &mut UserWsRequestHandler) -> RequestResult {
let call_id = r.post_call_id("callID")?;
let peer_id = r.post_call_peer_id(&call_id, "peerID")?;
let sig_type = r.post_string("type")?;
@@ -271,18 +269,18 @@ pub fn on_client_signal(r: &mut UserWsRequestHandler) -> RequestResult {
}
};
events_helper::propagate_event(&Event::NewUserCallSignal(&NewUserCallSignal {
events_helper::propagate_event(Event::NewUserCallSignal(NewUserCallSignal {
call_hash: gen_call_hash(&call_id, &peer_id),
user_id: if r.user_id_ref()? == &peer_id { None } else { Some(r.user_id()?) },
signal,
raw_data: r.post_string("data")?,
}))?;
})).await?;
r.success("Signal sent")
}
/// Mark user ready for streaming
pub fn mark_user_ready(r: &mut UserWsRequestHandler) -> Res {
pub async fn mark_user_ready(r: &mut UserWsRequestHandler) -> Res {
let call_id = r.post_call_id("callID")?;
let user_id = r.user_id()?;
@@ -291,14 +289,13 @@ pub fn mark_user_ready(r: &mut UserWsRequestHandler) -> Res {
user_ws_controller::send_message_to_specific_connections(
|c| c.user_id() != &user_id && c.is_having_call_with_conversation(&call_id),
|_| UserWsMessage::no_id_message("call_peer_ready", CallPeerReadyAPI::new(&call_id, r.user_id_ref()?)),
None::<fn(&_) -> _>,
)?;
r.success("Information propagated.")
}
/// Request an offer from the server
pub fn request_offer(r: &mut UserWsRequestHandler) -> Res {
pub async fn request_offer(r: &mut UserWsRequestHandler) -> Res {
let call_id = r.post_call_id("callID")?;
// The ID of the user we stream the audio / video from
@@ -308,16 +305,16 @@ pub fn request_offer(r: &mut UserWsRequestHandler) -> Res {
r.forbidden("You can not request an offer for yourself!".to_string())?;
}
events_helper::propagate_event(&Event::UserRequestedCallOffer(&UserCallOfferRequest {
events_helper::propagate_event(Event::UserRequestedCallOffer(UserCallOfferRequest {
call_hash: gen_call_hash(&call_id, &peer_id),
user_id: r.user_id()?,
}))?;
})).await?;
r.success("Request sent")
}
/// Notify the user stopped to stream
pub fn stop_streaming(r: &mut UserWsRequestHandler) -> Res {
pub async fn stop_streaming(r: &mut UserWsRequestHandler) -> Res {
let call_id = r.post_call_id("callID")?;
let user_id = r.user_id()?;
@@ -329,22 +326,21 @@ pub fn stop_streaming(r: &mut UserWsRequestHandler) -> Res {
user_ws_controller::send_message_to_specific_connections(
|c| c.is_having_call_with_conversation(&call_id) && c.user_id() != &user_id,
|_| UserWsMessage::no_id_message("call_peer_interrupted_streaming", CallPeerInterruptedStreamingAPI::new(&call_id, &user_id)),
None::<fn(&_) -> _>,
)?;
}
// Notify proxy
events_helper::propagate_event(&Event::CloseCallStream(&CloseCallStream {
events_helper::propagate_event(Event::CloseCallStream(CloseCallStream {
call_hash: gen_call_hash(&call_id, &user_id),
peer_id: None,
}))?;
})).await?;
r.success("ok")
}
/// Make the user leave the call
pub fn make_user_leave_call(conv_id: &ConvID, connection: &UserWsConnection) -> Res {
pub async fn make_user_leave_call(conv_id: &ConvID, connection: &UserWsConnection) -> Res {
connection.clone().replace(|c| c.active_call = None);
// Notify user (if possible)
@@ -353,39 +349,37 @@ pub fn make_user_leave_call(conv_id: &ConvID, connection: &UserWsConnection) ->
}
// Close main stream (sender)
events_helper::propagate_event(&Event::CloseCallStream(&CloseCallStream {
events_helper::propagate_event(Event::CloseCallStream(CloseCallStream {
call_hash: gen_call_hash(&conv_id, connection.user_id()),
peer_id: None,
}))?;
})).await?;
// Close receiver streams (other users streams)
user_ws_controller::foreach_connection(
|peer_conn| {
if peer_conn.is_having_call_with_conversation(conv_id) && peer_conn.user_id() != connection.user_id() {
events_helper::propagate_event(&Event::CloseCallStream(&CloseCallStream {
call_hash: gen_call_hash(&conv_id, peer_conn.user_id()),
peer_id: Some(connection.user_id().clone()),
}))?;
}
Ok(())
},
)?;
for peer_conn in user_ws_controller::get_all_connections()? {
if peer_conn.is_having_call_with_conversation(conv_id) && peer_conn.user_id() != connection.user_id() {
events_helper::propagate_event(Event::CloseCallStream(CloseCallStream {
call_hash: gen_call_hash(&conv_id, peer_conn.user_id()),
peer_id: Some(connection.user_id().clone()),
})).await?;
}
};
// Create a notification
events_helper::propagate_event(&Event::UserLeftCall(conv_id, connection.user_id()))?;
events_helper::propagate_event(Event::UserLeftCall(
conv_id.clone(),
connection.user_id().clone(),
)).await?;
Ok(())
}
/// Events handler
pub fn handle_event(e: &events_helper::Event) -> Res {
pub async fn handle_event(e: &events_helper::Event) -> Res {
match e {
Event::UserJoinedCall(conv_id, user_id) => {
user_ws_controller::send_message_to_specific_connections(
|c| c.is_having_call_with_conversation(conv_id) && c.user_id() != user_id,
|_| UserWsMessage::no_id_message("user_joined_call", JoinedCallMessage::new(conv_id, user_id)),
None::<fn(&_) -> _>,
)?;
}
@@ -393,13 +387,12 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
user_ws_controller::send_message_to_specific_connections(
|c| c.is_having_call_with_conversation(conv_id),
|_| UserWsMessage::no_id_message("user_left_call", LeftCallMessage::new(conv_id, user_id)),
None::<fn(&_) -> _>,
)?;
}
Event::UserWsClosed(c) => {
if let Some(call) = c.active_call.clone() {
make_user_leave_call(&call.conv_id, c)?;
make_user_leave_call(&call.conv_id, c).await?;
}
}
@@ -423,20 +416,17 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
user_ws_controller::send_message_to_specific_connections(
|c| c.user_id() == target_user && c.is_having_call_with_conversation(&call_id),
|_| UserWsMessage::no_id_message("new_call_signal", NewCallSignalAPI::new(&call_id, &peer_id, &msg.data)?),
None::<fn(&_) -> _>,
)?;
}
// Handle proxy disconnect => close all active calls
Event::ClosedRTCRelayWebSocket => {
user_ws_controller::foreach_connection(|f| {
for f in user_ws_controller::get_all_connections()? {
// Close all active connections
if let Some(call) = &f.active_call {
make_user_leave_call(&call.conv_id, f)?;
make_user_leave_call(&call.conv_id, &f).await?;
}
Ok(())
})?;
};
}
// Call active call of user (if any)
@@ -450,7 +440,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
})?;
if let Some(c) = conn {
make_user_leave_call(conv_id, &c)?;
make_user_leave_call(conv_id, &c).await?;
}
}
@@ -465,7 +455,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
})?;
for con in connections {
make_user_leave_call(conv_id, &con)?;
make_user_leave_call(conv_id, &con).await?;
}
}

View File

@@ -49,13 +49,13 @@ pub async fn create(r: &mut HttpRequestHandler) -> RequestResult {
image_path: image,
};
let comment_id = comments_helper::create(&comment)?;
let comment_id = comments_helper::create(&comment).await?;
// Create notifications
notifications_helper::create_post_notification(&r.user_id()?, post.id, NotifEventType::COMMENT_CREATED)?;
notifications_helper::create_post_notification(&r.user_id()?, post.id, NotifEventType::COMMENT_CREATED).await?;
// Remove notifications targeting current user about the post
notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id)?;
notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id).await?;
r.set_response(ResCreateComment::new(comment_id))
}
@@ -72,7 +72,7 @@ pub async fn edit(r: &mut HttpRequestHandler) -> RequestResult {
let comment = r.post_comment_with_full_access("commentID")?;
let new_content = r.post_content("content", 2, true)?;
comments_helper::edit(comment.id, &new_content)?;
comments_helper::edit(comment.id, &new_content).await?;
r.success("Content updated.")
}
@@ -81,13 +81,13 @@ pub async fn edit(r: &mut HttpRequestHandler) -> RequestResult {
pub async fn delete(r: &mut HttpRequestHandler) -> RequestResult {
let comment = r.post_comment_with_full_access("commentID")?;
comments_helper::delete(&comment)?;
comments_helper::delete(&comment).await?;
r.success("Comment deleted.")
}
/// Events handler
pub fn handle_event(e: &events_helper::Event) -> Res {
pub async fn handle_event(e: &events_helper::Event) -> Res {
match e {
Event::NewComment(comment) => {
user_ws_controller::send_message_to_specific_connections(
@@ -95,8 +95,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
|c| UserWsMessage::no_id_message(
"new_comment",
CommentAPI::new(comment, &Some(c.user_id().clone()))?,
),
None::<fn(&_) -> _>,
)
)?;
}
@@ -106,8 +105,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
|c| UserWsMessage::no_id_message(
"comment_updated",
CommentAPI::new(comment, &Some(c.user_id().clone()))?,
),
None::<fn(&_) -> _>,
)
)?;
}
@@ -118,7 +116,6 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
"comment_deleted",
comment.id.clone(),
),
None::<fn(&_) -> _>,
)?;
}

View File

@@ -21,7 +21,6 @@ use crate::data::error::Res;
use crate::data::http_request_handler::HttpRequestHandler;
use crate::data::new_conversation::NewConversation;
use crate::data::new_conversation_message::NewConversationMessage;
use crate::data::user_ws_connection::UserWsConnection;
use crate::data::user_ws_message::UserWsMessage;
use crate::data::user_ws_request_handler::UserWsRequestHandler;
use crate::helpers::{conversations_helper, events_helper};
@@ -50,7 +49,7 @@ pub async fn create(r: &mut HttpRequestHandler) -> RequestResult {
};
// Create the conversation
let conv_id = conversations_helper::create(&conv)?;
let conv_id = conversations_helper::create(&conv).await?;
r.set_response(ResCreateConversation::new(conv_id))
}
@@ -147,7 +146,7 @@ pub async fn add_member(r: &mut HttpRequestHandler) -> RequestResult {
r.bad_request("This user is already a member of this conversation!".to_string())?;
}
conversations_helper::add_member(conv.id, &user_to_add, true, false, Some(r.user_id_ref()?))?;
conversations_helper::add_member(conv.id, &user_to_add, true, false, Some(r.user_id_ref()?)).await?;
r.success("The user was added to the conversation!")
}
@@ -195,7 +194,7 @@ pub async fn remove_member(r: &mut HttpRequestHandler) -> RequestResult {
r.bad_request("This user is not a member of this conversation!".to_string())?;
}
conversations_helper::remove_member(&user_to_remove, conv.id, Some(r.user_id_ref()?))?;
conversations_helper::remove_member(&user_to_remove, conv.id, Some(r.user_id_ref()?)).await?;
r.ok()
}
@@ -228,7 +227,7 @@ pub async fn find_private(r: &mut HttpRequestHandler) -> RequestResult {
group_id: None,
group_min_membership_level: None,
};
let conv_id = conversations_helper::create(&new_conv)?;
let conv_id = conversations_helper::create(&new_conv).await?;
list.push(conv_id);
}
@@ -255,7 +254,7 @@ pub async fn refresh_single(r: &mut HttpRequestHandler) -> RequestResult {
conv.conv_id,
r.user_id_ref()?,
&messages.last().unwrap(),
)?;
).await?;
}
r.set_response(ConversationMessageAPI::for_list(&messages))
@@ -395,7 +394,7 @@ pub async fn send_message(r: &mut HttpRequestHandler) -> RequestResult {
message,
file,
server_message: None,
})?;
}).await?;
r.success("Conversation message was successfully sent!")
}
@@ -423,7 +422,7 @@ pub async fn delete_conversation(r: &mut HttpRequestHandler) -> RequestResult {
r.bad_request("This conversation is managed, it can not be deleted by this way!".to_string())?;
}
conversations_helper::remove_user_from_conversation(&r.user_id()?, &conv, r.user_id_ref()?)?;
conversations_helper::remove_user_from_conversation(&r.user_id()?, &conv, r.user_id_ref()?).await?;
r.success("The conversation has been deleted")
}
@@ -447,7 +446,7 @@ pub async fn update_message(r: &mut HttpRequestHandler) -> RequestResult {
r.bad_request("New message is too long!".to_string())?;
}
conversations_helper::update_message_content(msg_id, &new_content)?;
conversations_helper::update_message_content(msg_id, &new_content).await?;
r.success("Conversation message content successfully updated")
}
@@ -460,25 +459,25 @@ pub async fn delete_message(r: &mut HttpRequestHandler) -> RequestResult {
r.forbidden("You are not the owner of this message!".to_string())?;
}
conversations_helper::delete_message_by_id(msg_id)?;
conversations_helper::delete_message_by_id(msg_id).await?;
r.success("The message has been successfully deleted!")
}
/// A user is writing a message in a conversation
pub fn member_is_writing(r: &mut UserWsRequestHandler) -> RequestResult {
pub async fn member_is_writing(r: &mut UserWsRequestHandler) -> RequestResult {
let conv_id = r.post_registered_conv_id("convID")?;
// Propagate event
events_helper::propagate_event(
&Event::UserIsWritingMessageInConversation(r.user_id_ref()?, conv_id)
)?;
Event::UserIsWritingMessageInConversation(r.user_id()?, conv_id)
).await?;
r.ok()
}
/// Events handler
pub fn handle_event(e: &events_helper::Event) -> Res {
pub async fn handle_event(e: &events_helper::Event) -> Res {
match e {
Event::UpdatedNumberUnreadConversations(users) => {
for user in users.iter() {
@@ -498,23 +497,22 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
user_ws_controller::send_message_to_specific_connections(
|s| s.conversations.contains(conv_id) && s.user_id() != user_id,
|_| UserWsMessage::no_id_message("writing_message_in_conv", UserIsWritingMessageInConversation::new(user_id, *conv_id)),
None::<fn(&_) -> _>,
)?;
}
Event::NewConversationMessage(msg) => {
user_ws_controller::send_message_to_specific_connections(
for conn in user_ws_controller::send_message_to_specific_connections(
|f| f.conversations.contains(&msg.conv_id),
|_| UserWsMessage::no_id_message("new_conv_message", ConversationMessageAPI::new(msg)),
Some(|conn: &UserWsConnection| conversations_helper::mark_user_seen(msg.conv_id, conn.user_id(), msg)),
)?;
)? {
conversations_helper::mark_user_seen(msg.conv_id, conn.user_id(), msg).await?;
}
}
Event::UpdatedConversationMessage(msg) => {
user_ws_controller::send_message_to_specific_connections(
|f| f.conversations.contains(&msg.conv_id),
|_| UserWsMessage::no_id_message("updated_conv_message", ConversationMessageAPI::new(msg)),
None::<fn(&_) -> _>,
)?;
}
@@ -522,7 +520,6 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
user_ws_controller::send_message_to_specific_connections(
|f| f.conversations.contains(&msg.conv_id),
|_| UserWsMessage::no_id_message("deleted_conv_message", ConversationMessageAPI::new(msg)),
None::<fn(&_) -> _>,
)?;
}
@@ -531,7 +528,6 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
user_ws_controller::send_message_to_specific_connections(
|f| f.conversations.contains(conv_id),
|_| UserWsMessage::no_id_message("removed_user_from_conv", RemovedUserFromConversationMessage::new(user_id, *conv_id)),
None::<fn(&_) -> _>,
)?;
// Disconnect user from conversation
@@ -551,7 +547,6 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
user_ws_controller::send_message_to_specific_connections(
|f| f.conversations.contains(conv_id),
|_| UserWsMessage::no_id_message("deleted_conversation", conv_id.id()),
None::<fn(&_) -> _>,
)?;
// Disconnect user from conversation

View File

@@ -85,7 +85,7 @@ pub async fn send_request(r: &mut HttpRequestHandler) -> RequestResult {
notifications_helper::create_friends_notification(
r.user_id_ref()?,
&friend_id,
NotifEventType::SENT_FRIEND_REQUEST)?;
NotifEventType::SENT_FRIEND_REQUEST).await?;
r.success("The friendship request was successfully sent!")
}
@@ -101,7 +101,7 @@ pub async fn cancel_request(r: &mut HttpRequestHandler) -> RequestResult {
friends_helper::remove_request(&r.user_id()?, &friend_id)?;
// Delete related notifications
notifications_helper::delete_all_related_with_friendship_request(r.user_id_ref()?, &friend_id)?;
notifications_helper::delete_all_related_with_friendship_request(r.user_id_ref()?, &friend_id).await?;
r.success("Friendship request removed!")
}
@@ -125,7 +125,7 @@ pub async fn respond_request(r: &mut HttpRequestHandler) -> RequestResult {
true => NotifEventType::ACCEPTED_FRIEND_REQUEST,
false => NotifEventType::REJECTED_FRIEND_REQUEST
},
)?;
).await?;
r.set_response("Response to the friendship request successfully saved!")
}
@@ -137,7 +137,7 @@ pub async fn remove_friend(r: &mut HttpRequestHandler) -> RequestResult {
friends_helper::remove_friendship(r.user_id_ref()?, &friend_id)?;
// Delete any related notification
notifications_helper::delete_all_related_with_friendship_request(r.user_id_ref()?, &friend_id)?;
notifications_helper::delete_all_related_with_friendship_request(r.user_id_ref()?, &friend_id).await?;
r.success("The friend was removed from the list!")
}

View File

@@ -57,7 +57,7 @@ pub async fn create(r: &mut HttpRequestHandler) -> RequestResult {
owner_id: r.user_id()?,
};
let group_id = groups_helper::create(&new_group)?;
let group_id = groups_helper::create(&new_group).await?;
r.set_response(GroupCreationResult::new(&group_id))
}
@@ -187,7 +187,7 @@ pub async fn create_conversation(r: &mut HttpRequestHandler) -> RequestResult {
let min_membership_level = r.post_group_membership_level_for_conversation("min_membership_level")?;
let name = r.post_string("name")?;
let conv_id = conversations_helper::create_conversation_for_group(group, min_membership_level, &name)?;
let conv_id = conversations_helper::create_conversation_for_group(group, min_membership_level, &name).await?;
r.set_response(ResCreateConversationForGroup::new(conv_id))
}
@@ -197,7 +197,7 @@ pub async fn set_conversation_visibility(r: &mut HttpRequestHandler) -> RequestR
let conv = r.post_group_conv_admin("conv_id")?;
let min_level = r.post_group_membership_level_for_conversation("min_membership_level")?;
conversations_helper::set_min_group_conversation_membership_level(conv.id, min_level)?;
conversations_helper::set_min_group_conversation_membership_level(conv.id, min_level).await?;
r.ok()
}
@@ -206,7 +206,7 @@ pub async fn set_conversation_visibility(r: &mut HttpRequestHandler) -> RequestR
pub async fn delete_conversation(r: &mut HttpRequestHandler) -> RequestResult {
let conv = r.post_group_conv_admin("conv_id")?;
conversations_helper::delete_conversation(&conv)?;
conversations_helper::delete_conversation(&conv).await?;
r.ok()
}
@@ -236,10 +236,10 @@ pub async fn cancel_invitation(r: &mut HttpRequestHandler) -> RequestResult {
r.forbidden("This user has not been invited to join this group!".to_string())?;
}
groups_helper::delete_member(&group_id, &user_id)?;
groups_helper::delete_member(&group_id, &user_id).await?;
// Delete related notifications
notifications_helper::delete_all_related_to_group_membership_notifications(&user_id, &group_id)?;
notifications_helper::delete_all_related_to_group_membership_notifications(&user_id, &group_id).await?;
r.success("Membership invitation has been cancelled!")
}
@@ -253,11 +253,11 @@ pub async fn invite_user(r: &mut HttpRequestHandler) -> RequestResult {
r.bad_request("The user is not a visitor of the group!".to_string())?;
}
groups_helper::send_invitation(&group_id, &user_id)?;
groups_helper::send_invitation(&group_id, &user_id).await?;
// Send a notification
notifications_helper::create_group_membership_notification(
&user_id, Some(r.user_id_ref()?), &group_id, NotifEventType::SENT_GROUP_MEMBERSHIP_INVITATION)?;
&user_id, Some(r.user_id_ref()?), &group_id, NotifEventType::SENT_GROUP_MEMBERSHIP_INVITATION).await?;
r.success("The user has been successfully invited to join the group!")
}
@@ -271,7 +271,7 @@ pub async fn respond_invitation(r: &mut HttpRequestHandler) -> RequestResult {
r.not_found("Invitation not found!".to_string())?
}
groups_helper::respond_invitation(&group_id, &r.user_id()?, accept)?;
groups_helper::respond_invitation(&group_id, &r.user_id()?, accept).await?;
if accept {
groups_helper::set_following(&group_id, &r.user_id()?, true)?;
@@ -281,7 +281,7 @@ pub async fn respond_invitation(r: &mut HttpRequestHandler) -> RequestResult {
notifications_helper::create_group_membership_notification(r.user_id_ref()?, None, &group_id, match accept {
true => NotifEventType::ACCEPTED_GROUP_MEMBERSHIP_INVITATION,
false => NotifEventType::REJECTED_GROUP_MEMBERSHIP_INVITATION
})?;
}).await?;
r.success("Response to the invitation was successfully saved!")
}
@@ -312,12 +312,12 @@ pub async fn send_request(r: &mut HttpRequestHandler) -> RequestResult {
time_create: time(),
level,
following: true,
})?;
}).await?;
// Send a notification, if required
if matches!(group.registration_level, GroupRegistrationLevel::MODERATED_REGISTRATION) {
notifications_helper::create_group_membership_notification(r.user_id_ref()?, None,
&group_id, NotifEventType::SENT_GROUP_MEMBERSHIP_REQUEST)?;
&group_id, NotifEventType::SENT_GROUP_MEMBERSHIP_REQUEST).await?;
}
@@ -332,10 +332,10 @@ pub async fn cancel_request(r: &mut HttpRequestHandler) -> RequestResult {
r.forbidden("You did not send a membership request to this group!".to_string())?;
}
groups_helper::delete_member(&group_id, &r.user_id()?)?;
groups_helper::delete_member(&group_id, &r.user_id()?).await?;
// Delete any related notification
notifications_helper::delete_all_related_to_group_membership_notifications(r.user_id_ref()?, &group_id)?;
notifications_helper::delete_all_related_to_group_membership_notifications(r.user_id_ref()?, &group_id).await?;
r.success("The request has been successfully cancelled!")
}
@@ -360,10 +360,10 @@ pub async fn delete_member(r: &mut HttpRequestHandler) -> RequestResult {
r.forbidden("Only administrators can delete this membership!".to_string())?;
}
groups_helper::delete_member(&group_id, &user_id)?;
groups_helper::delete_member(&group_id, &user_id).await?;
// Delete related notifications
notifications_helper::delete_all_related_to_group_membership_notifications(&user_id, &group_id)?;
notifications_helper::delete_all_related_to_group_membership_notifications(&user_id, &group_id).await?;
r.success("Membership of the user has been successfully deleted!")
}
@@ -389,7 +389,7 @@ pub async fn update_membership(r: &mut HttpRequestHandler) -> RequestResult {
r.forbidden("You can not assign this visibility level!".to_string())?;
}
groups_helper::update_membership_level(&group_id, &user_id, new_level)?;
groups_helper::update_membership_level(&group_id, &user_id, new_level).await?;
r.success("User membership has been successfully updated!")
}
@@ -404,13 +404,13 @@ pub async fn respond_request(r: &mut HttpRequestHandler) -> RequestResult {
r.forbidden("This user has not requested a membership for this group!".to_string())?;
}
groups_helper::respond_request(&group_id, &user_id, accept)?;
groups_helper::respond_request(&group_id, &user_id, accept).await?;
// Create a notification
notifications_helper::create_group_membership_notification(&user_id, Some(r.user_id_ref()?), &group_id, match accept {
true => NotifEventType::ACCEPTED_GROUP_MEMBERSHIP_REQUEST,
false => NotifEventType::REJECTED_GROUP_MEMBERSHIP_REQUEST
})?;
}).await?;
r.success("The response to the request has been successfully saved!")
}
@@ -433,10 +433,10 @@ pub async fn remove_membership(r: &mut HttpRequestHandler) -> RequestResult {
r.forbidden("You are the last administrator of this group!".to_string())?;
}
groups_helper::delete_member(&group_id, &r.user_id()?)?;
groups_helper::delete_member(&group_id, &r.user_id()?).await?;
// Delete group membership notifications
notifications_helper::delete_all_related_to_group_membership_notifications(r.user_id_ref()?, &group_id)?;
notifications_helper::delete_all_related_to_group_membership_notifications(r.user_id_ref()?, &group_id).await?;
r.success("Your membership has been successfully deleted!")
}
@@ -456,7 +456,7 @@ pub async fn delete_group(r: &mut HttpRequestHandler) -> RequestResult {
let group_id = r.post_group_id_with_access("groupID", GroupAccessLevel::ADMIN_ACCESS)?;
r.need_user_password("password")?;
groups_helper::delete(&group_id)?;
groups_helper::delete(&group_id).await?;
r.success("Group deleted.")
}

View File

@@ -2,23 +2,18 @@
//!
//! @author Pierre Hubert
use crate::routes::RequestResult;
use crate::data::base_request_handler::BaseRequestHandler;
use crate::data::error::ExecError;
use crate::data::group::GroupAccessLevel;
use crate::data::post::PostAccessLevel;
use crate::helpers::{likes_helper, notifications_helper, user_helper};
use crate::helpers::likes_helper::LikeType;
use crate::routes::RequestResult;
struct LikeTarget(u64, LikeType);
/// Update like status (async version)
pub async fn update_async<H: BaseRequestHandler>(r: &mut H) -> RequestResult {
update(r)
}
/// Update like status
pub fn update<H: BaseRequestHandler>(r: &mut H) -> RequestResult {
pub async fn update<H: BaseRequestHandler>(r: &mut H) -> RequestResult {
let req_type = r.post_string("type")?;
let is_liking = r.post_bool("like")?;
@@ -41,7 +36,7 @@ pub fn update<H: BaseRequestHandler>(r: &mut H) -> RequestResult {
let post = r.post_post_with_access("id", PostAccessLevel::BASIC_ACCESS)?;
// Delete any notification targeting this user about the post
notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id)?;
notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id).await?;
LikeTarget(post.id, LikeType::POST)
}
@@ -51,7 +46,7 @@ pub fn update<H: BaseRequestHandler>(r: &mut H) -> RequestResult {
let comment = r.post_comment_with_access("id")?;
// Delete any notification targeting this user about the post
notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, comment.post_id)?;
notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, comment.post_id).await?;
LikeTarget(comment.id, LikeType::COMMENT)
}

View File

@@ -73,14 +73,14 @@ pub async fn mark_seen(r: &mut HttpRequestHandler) -> RequestResult {
notif.from_user_id = None;
}
notifications_helper::delete(&notif)?;
notifications_helper::delete(&notif).await?;
r.success("Notification deleted")
}
/// Delete all the notifications of the current user
pub async fn delete_all(r: &mut HttpRequestHandler) -> RequestResult {
notifications_helper::delete_all_user(r.user_id_ref()?)?;
notifications_helper::delete_all_user(r.user_id_ref()?).await?;
r.success("Notifications deleted.")
}

View File

@@ -235,7 +235,7 @@ pub async fn create_post(r: &mut HttpRequestHandler) -> RequestResult {
}
// Create a notification
notifications_helper::create_post_notification(r.user_id_ref()?, post_id, NotifEventType::ELEM_CREATED)?;
notifications_helper::create_post_notification(r.user_id_ref()?, post_id, NotifEventType::ELEM_CREATED).await?;
r.set_response(ResCreatePost::new(post_id))
}
@@ -249,7 +249,7 @@ pub async fn set_visibility_level(r: &mut HttpRequestHandler) -> RequestResult {
// Depending on new level, delete (or not) notifications about the post
if matches!(new_visibility, PostVisibilityLevel::VISIBILITY_USER) {
notifications_helper::delete_all_related_with_post(post.id)?;
notifications_helper::delete_all_related_with_post(post.id).await?;
}
r.success("Visibility level updated")
@@ -263,7 +263,7 @@ pub async fn update_content(r: &mut HttpRequestHandler) -> RequestResult {
posts_helper::set_content(post.id, &new_content)?;
// Delete the notifications targeting the current user about this post
notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id)?;
notifications_helper::delete_all_post_notifications_targeting_user(r.user_id_ref()?, post.id).await?;
r.success("Content updated")
}
@@ -272,7 +272,7 @@ pub async fn update_content(r: &mut HttpRequestHandler) -> RequestResult {
pub async fn delete(r: &mut HttpRequestHandler) -> RequestResult {
let post = r.post_post_with_access("postID", PostAccessLevel::FULL_ACCESS)?;
posts_helper::delete(&post)?;
posts_helper::delete(&post).await?;
r.success("Post deleted.")
}

View File

@@ -99,27 +99,36 @@ impl actix::Actor for RtcRelayActor {
}).unwrap())
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
fn stopped(&mut self, ctx: &mut Self::Context) {
println!("Closed connection to RTC relay.");
let future = async move {
// Propagate information
if let Err(e) = events_helper::propagate_event(Event::ClosedRTCRelayWebSocket).await {
eprintln!("Failed to propagate rtc closed event! {:#?}", e);
}
// Propagate information
if let Err(e) = events_helper::propagate_event(&Event::ClosedRTCRelayWebSocket) {
eprintln!("Failed to propagate rtc closed event! {:#?}", e);
}
eprintln!("Successfully propagated RTC relay stopped envent!");
};
future.into_actor(self).wait(ctx);
}
}
impl RtcRelayActor {
fn handle_message(&self, txt: &str) {
fn handle_message(&self, txt: &str, ctx: &mut <RtcRelayActor as actix::Actor>::Context) {
match serde_json::from_str::<RTCSocketMessage>(&txt) {
Err(e) => {
eprintln!("Failed to parse a message from RTC proxy! {:#?}", e);
}
Ok(msg) => {
if let Err(e) = process_message_from_relay(&msg) {
eprintln!("Failed to process signal from RTC Relay! {:#?}", e);
}
let future = async move {
if let Err(e) = process_message_from_relay(&msg).await {
eprintln!("Failed to process signal from RTC Relay! {:#?}", e);
}
};
future.into_actor(self).spawn(ctx);
}
}
}
@@ -142,7 +151,7 @@ impl StreamHandler<Result<actix_web_actors::ws::Message, actix_web_actors::ws::P
// Handle messages
match msg {
Message::Text(txt) => {
self.handle_message(&txt);
self.handle_message(&txt, ctx);
}
Message::Binary(_) => {
eprintln!("RTC WS Message::Binary");
@@ -164,7 +173,7 @@ impl StreamHandler<Result<actix_web_actors::ws::Message, actix_web_actors::ws::P
}
Item::Last(c) => {
self.recv_buff.extend_from_slice(c.as_ref());
self.handle_message(&String::from_utf8_lossy(&self.recv_buff))
self.handle_message(&String::from_utf8_lossy(&self.recv_buff), ctx)
}
}
}
@@ -182,15 +191,15 @@ impl StreamHandler<Result<actix_web_actors::ws::Message, actix_web_actors::ws::P
}
/// Process a message from the server
fn process_message_from_relay(msg: &RTCSocketMessage) -> Res {
async fn process_message_from_relay(msg: &RTCSocketMessage) -> Res {
match msg.title.as_str() {
"signal" => {
events_helper::propagate_event(&Event::NewRTCRelayMessage(&NewRtcRelayMessage {
events_helper::propagate_event(Event::NewRTCRelayMessage(NewRtcRelayMessage {
call_hash: msg.callHash.clone().unwrap_or(String::new()),
peer_id: msg.peerId.clone().unwrap_or("0".to_string()),
data: serde_json::to_string(msg.data.as_ref().unwrap_or(&Value::Null))
.unwrap_or("failed to serialize signal data".to_string()),
}))?;
})).await?;
}
title => {

View File

@@ -317,9 +317,16 @@ impl Actor for WsSession {
fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
// Send an event (user_ws_closed)
if let Some(conn) = find_connection(ctx.address()) {
if let Err(e) = events_helper::propagate_event(&Event::UserWsClosed(&conn)) {
eprintln!("Failed to propagate web socket closed event ! {:#?}", e);
}
let future = async move {
if let Err(e) = events_helper::propagate_event(Event::UserWsClosed(conn)).await {
eprintln!("Failed to propagate web socket closed event ! {:#?}", e);
}
// TODO : remove
eprintln!("Successfully propagated user ws stopping event!");
};
future.into_actor(self).wait(ctx);
}
remove_connection(ctx.address());
@@ -495,10 +502,9 @@ pub fn send_to_client(conn: &UserWsConnection, msg: &UserWsMessage) -> Res {
}
/// Send a message to specific users
pub fn send_message_to_specific_connections<F, M, A>(filter: F, msg_generator: M, after_send: Option<A>) -> Res
pub fn send_message_to_specific_connections<F, M>(filter: F, msg_generator: M) -> Res<Vec<UserWsConnection>>
where F: Fn(&UserWsConnection) -> bool,
M: Fn(&UserWsConnection) -> Res<UserWsMessage>,
A: Fn(&UserWsConnection) -> Res
M: Fn(&UserWsConnection) -> Res<UserWsMessage>
{
let connections = get_ws_connections_list()
.lock()
@@ -508,15 +514,11 @@ pub fn send_message_to_specific_connections<F, M, A>(filter: F, msg_generator: M
.map(|f| f.clone())
.collect::<Vec<UserWsConnection>>();
for con in connections {
for con in &connections {
send_message(con.session.clone(), &msg_generator(&con)?)?;
if let Some(cb) = &after_send {
cb(&con)?;
}
}
Ok(())
Ok(connections)
}
/// Check out whether user is connected or not
@@ -576,6 +578,11 @@ pub fn foreach_connection<F>(mut f: F) -> Res
Ok(())
}
/// Get a copy of the entire list of connections
pub fn get_all_connections() -> Res<Vec<UserWsConnection>> {
Ok(get_ws_connections_list().lock().unwrap().clone())
}
/// Events handler
pub fn handle_event(e: &events_helper::Event) -> Res {
match e {