//! # Calls controller //! //! @author Pierre Hubert use webrtc_sdp::attribute_type::SdpAttribute; use crate::api_data::call_member_info::CallMemberInfo; use crate::api_data::call_peer_interrupted_streaming::CallPeerInterruptedStreamingAPI; use crate::api_data::call_peer_ready::CallPeerReadyAPI; use crate::api_data::joined_call_message::JoinedCallMessage; use crate::api_data::left_call_message::LeftCallMessage; use crate::api_data::new_call_signal::NewCallSignalAPI; use crate::api_data::user_calls_config::UserCallsConfig; use crate::controllers::user_ws_controller; use crate::data::base_request_handler::BaseRequestHandler; use crate::data::call_signal::{ CallSignal, CloseCallStream, IceCandidate, NewUserCallSignal, SdpType, UserCallOfferRequest, }; use crate::data::config::conf; use crate::data::conversation::ConvID; use crate::data::error::{ExecError, Res}; use crate::data::user::UserID; use crate::data::user_ws_connection::{ActiveCall, UserWsConnection}; use crate::data::user_ws_message::UserWsMessage; use crate::data::user_ws_request_handler::UserWsRequestHandler; use crate::helpers::events_helper::Event; use crate::helpers::{calls_helper, conversations_helper, events_helper}; use crate::routes::RequestResult; impl UserWsRequestHandler { /// Get the ID of a call included in a WebSocket request fn post_call_id(&mut self, name: &str) -> Res { let conv_id = ConvID::new(self.post_u64(name)?); if !self.get_conn().is_having_call_with_conversation(&conv_id) { self.forbidden("You do not belong to this call!".to_string())?; } Ok(conv_id) } /// Get the ID of a peer for a call included in the WebSocket request fn post_call_peer_id(&mut self, call_id: &ConvID, name: &str) -> Res { let peer_id = UserID::new(self.post_u64(name)?); if peer_id == self.user_id_ref()? { return Ok(peer_id); } let mut found = false; user_ws_controller::foreach_connection(|p| { if !found && p.user_id() == peer_id && p.is_having_call_with_conversation(call_id) { found = true; } Ok(()) })?; if !found { self.forbidden("This peer is not a member of the call !".to_string())?; } Ok(peer_id) } /// Get call information, if available fn call_info(&mut self, call_id: &ConvID) -> Res { let call = self .get_conn() .active_call .clone() .ok_or(ExecError::new("No call found !"))?; if &call.conv_id != call_id { self.bad_request("The call active is not the one requested!".to_string())?; } Ok(call) } /// Update call information fn update_call(&mut self, update: F) -> Res where F: FnOnce(&mut ActiveCall), { self.update_conn(|conn| { if let Some(call) = &mut conn.active_call { update(call); } })?; Ok(()) } } /// Get calls configuration pub async fn get_config(r: &mut UserWsRequestHandler) -> RequestResult { // Check whether the user is the member of a call or not if let None = r.get_conn().active_call { r.forbidden("You do not belong to any call yet!".to_string())?; } if conf().is_rtc_relay_enabled() { if let Some(conf) = conf().rtc_relay.as_ref() { return r.set_response(UserCallsConfig::new(conf)); } } r.internal_error(ExecError::boxed_new("Missing calls configuration!")) } /// Check out whether a conversation is having a call or not pub fn is_conversation_having_call(conv_id: &ConvID) -> bool { let mut found = false; let res = user_ws_controller::foreach_connection(|f| { if found || f.is_having_call_with_conversation(conv_id) { found = true; } Ok(()) }); if let Err(e) = res { eprintln!( "Failed to check if a conversation is having call! Conversation: {} / Error: {:#?}", conv_id.id(), e ); } found } /// Join a call pub async fn join_call(r: &mut UserWsRequestHandler) -> RequestResult { let conv_id = r.post_conv("convID")?.conv_id; // Check if the conversation can have a call let conv = conversations_helper::get_single(conv_id)?; if !calls_helper::can_have_call(&conv) { r.forbidden("This conversation can not be used to make calls!".to_string())?; } // Remove any other active call with current WebSocket if let Some(call) = &r.get_conn().active_call { make_user_leave_call(&call.conv_id, r.get_conn()).await?; } // Remove any other active connection to current call of current user for conn in user_ws_controller::get_all_connections()? { if conn.user_id() != r.user_id_ref()? || conn.session.eq(&r.get_conn().session) { continue; } if let Some(call) = &conn.active_call { if call.conv_id == conv_id { make_user_leave_call(&call.conv_id, &conn).await?; } } } r.update_conn(|r| { r.active_call = Some(ActiveCall { conv_id, ready: false, }) })?; // Propagate event events_helper::propagate_event(Event::UserJoinedCall(conv_id, r.user_id()?)).await?; Ok(()) } /// Leave a call pub async fn leave_call(r: &mut UserWsRequestHandler) -> RequestResult { // Warning ! For some technical reasons, we do not check if the user // really belongs to the conversation, so be careful when manipulating // conversation ID here let conv_id = ConvID::new(r.post_u64("convID")?); // Check if the user was not in the conversation if !r.get_conn().is_having_call_with_conversation(&conv_id) { return r.success("Left call!"); } // Make the user leave the call make_user_leave_call(&conv_id, r.get_conn()).await?; r.success("Left call!") } /// Get the list of members of a call pub async fn get_members_list(r: &mut UserWsRequestHandler) -> RequestResult { let conv_id = r.post_call_id("callID")?; let mut list = vec![]; user_ws_controller::foreach_connection(|conn| { if conn.is_having_call_with_conversation(&conv_id) { list.push(CallMemberInfo::new( conn.user_id(), &conn.active_call.as_ref().unwrap(), )); } Ok(()) })?; r.set_response(list) } /// Get the hash associated to a call pub fn gen_call_hash(call_id: &ConvID, peer_id: &UserID) -> String { format!("{}-{}", call_id.id(), peer_id.id()) } /// Handles client signal pub async fn on_client_signal(r: &mut UserWsRequestHandler) -> RequestResult { let call_id = r.post_call_id("callID")?; let peer_id = r.post_call_peer_id(&call_id, "peerID")?; let sig_type = r.post_string("type")?; let data: serde_json::Value = serde_json::from_str(&r.post_string("data")?)?; let data = data .as_object() .ok_or(ExecError::boxed_new("Signal data is not an object !"))?; let signal = match sig_type.as_str() { "SDP" => { // Get SDP type let sdp_type = SdpType::from_str( data.get("type") .unwrap_or(&serde_json::Value::Null) .as_str() .ok_or(ExecError::boxed_new("Missing SDP type !"))?, )?; let sdp = data .get("sdp") .unwrap_or(&serde_json::Value::Null) .as_str() .ok_or(ExecError::boxed_new("Failed to get sdp message data!"))?; let parsed_sdp = webrtc_sdp::parse_sdp(sdp, false)?; CallSignal::SDP(sdp.to_string(), sdp_type, parsed_sdp) } "CANDIDATE" => { let candidate = data .get("candidate") .unwrap_or(&serde_json::Value::Null) .as_str() .ok_or(ExecError::boxed_new( "Failed to get candidate message data!", ))? .to_string(); let sdp_m_line_index = data .get("sdpMLineIndex") .unwrap_or(&serde_json::Value::Null) .as_u64() .ok_or(ExecError::boxed_new("Failed to get sdp_mline_index data!"))?; let sdp_mid = data .get("sdpMid") .unwrap_or(&serde_json::Value::Null) .as_str() .ok_or(ExecError::boxed_new("Failed to get sdpMid message data!"))? .to_string(); let parsed_candidate: SdpAttribute = candidate.trim().parse()?; CallSignal::Candidate( IceCandidate { candidate, sdp_m_line_index, sdp_mid, }, parsed_candidate, ) } _ => { r.forbidden("Invalid signal type!".to_string())?; unreachable!() } }; events_helper::propagate_event(Event::NewUserCallSignal(NewUserCallSignal { call_hash: gen_call_hash(&call_id, &peer_id), user_id: if r.user_id_ref()? == &peer_id { None } else { Some(r.user_id()?) }, signal, raw_data: r.post_string("data")?, })) .await?; r.success("Signal sent") } /// Mark user ready for streaming pub async fn mark_user_ready(r: &mut UserWsRequestHandler) -> Res { let call_id = r.post_call_id("callID")?; let user_id = r.user_id()?; r.update_call(|call| call.ready = true)?; user_ws_controller::send_message_to_specific_connections( |c| c.user_id() != &user_id && c.is_having_call_with_conversation(&call_id), |_| { UserWsMessage::no_id_message( "call_peer_ready", CallPeerReadyAPI::new(&call_id, r.user_id_ref()?), ) }, )?; r.success("Information propagated.") } /// Request an offer from the server pub async fn request_offer(r: &mut UserWsRequestHandler) -> Res { let call_id = r.post_call_id("callID")?; // The ID of the user we stream the audio / video from let peer_id = r.post_call_peer_id(&call_id, "peerID")?; if !peer_id.is_valid() || peer_id == r.user_id_ref()? { r.forbidden("You can not request an offer for yourself!".to_string())?; } events_helper::propagate_event(Event::UserRequestedCallOffer(UserCallOfferRequest { call_hash: gen_call_hash(&call_id, &peer_id), user_id: r.user_id()?, })) .await?; r.success("Request sent") } /// Notify the user stopped to stream pub async fn stop_streaming(r: &mut UserWsRequestHandler) -> Res { let call_id = r.post_call_id("callID")?; let user_id = r.user_id()?; // Propagate notification only if required if r.call_info(&call_id)?.ready { r.update_call(|c| c.ready = false)?; // Notify all other users user_ws_controller::send_message_to_specific_connections( |c| c.is_having_call_with_conversation(&call_id) && c.user_id() != &user_id, |_| { UserWsMessage::no_id_message( "call_peer_interrupted_streaming", CallPeerInterruptedStreamingAPI::new(&call_id, &user_id), ) }, )?; } // Notify proxy events_helper::propagate_event(Event::CloseCallStream(CloseCallStream { call_hash: gen_call_hash(&call_id, &user_id), peer_id: None, })) .await?; r.success("ok") } /// Make the user leave the call pub async fn make_user_leave_call(conv_id: &ConvID, connection: &UserWsConnection) -> Res { connection.clone().replace(|c| c.active_call = None); // Notify user (if possible) if connection.session.connected() { user_ws_controller::send_to_client( connection, &UserWsMessage::no_id_message("call_closed", conv_id.id())?, )?; } // Close main stream (sender) events_helper::propagate_event(Event::CloseCallStream(CloseCallStream { call_hash: gen_call_hash(&conv_id, connection.user_id()), peer_id: None, })) .await?; // Close receiver streams (other users streams) for peer_conn in user_ws_controller::get_all_connections()? { if peer_conn.is_having_call_with_conversation(conv_id) && peer_conn.user_id() != connection.user_id() { events_helper::propagate_event(Event::CloseCallStream(CloseCallStream { call_hash: gen_call_hash(&conv_id, peer_conn.user_id()), peer_id: Some(connection.user_id().clone()), })) .await?; } } // Create a notification events_helper::propagate_event(Event::UserLeftCall( conv_id.clone(), connection.user_id().clone(), )) .await?; Ok(()) } /// Events handler pub async fn handle_event(e: &events_helper::Event) -> Res { match e { Event::UserJoinedCall(conv_id, user_id) => { user_ws_controller::send_message_to_specific_connections( |c| c.is_having_call_with_conversation(conv_id) && c.user_id() != user_id, |_| { UserWsMessage::no_id_message( "user_joined_call", JoinedCallMessage::new(conv_id, user_id), ) }, )?; } Event::UserLeftCall(conv_id, user_id) => { user_ws_controller::send_message_to_specific_connections( |c| c.is_having_call_with_conversation(conv_id), |_| { UserWsMessage::no_id_message( "user_left_call", LeftCallMessage::new(conv_id, user_id), ) }, )?; } Event::UserWsClosed(c) => { if let Some(call) = c.active_call.clone() { make_user_leave_call(&call.conv_id, c).await?; } } Event::NewRTCRelayMessage(msg) => { // Get call hash let split: Vec<&str> = msg.call_hash.split("-").collect(); if split.len() != 2 { return Ok(()); } let call_id = ConvID::new(split[0].parse::()?); let peer_id = UserID::new(split[1].parse::()?); let target_user = UserID::new(msg.peer_id.parse::()?); let target_user = match target_user.is_valid() { true => target_user, false => peer_id.clone(), }; user_ws_controller::send_message_to_specific_connections( |c| c.user_id() == target_user && c.is_having_call_with_conversation(&call_id), |_| { UserWsMessage::no_id_message( "new_call_signal", NewCallSignalAPI::new(&call_id, &peer_id, &msg.data)?, ) }, )?; } // Handle proxy disconnect => close all active calls Event::ClosedRTCRelayWebSocket => { for f in user_ws_controller::get_all_connections()? { // Close all active connections if let Some(call) = &f.active_call { make_user_leave_call(&call.conv_id, &f).await?; } } } // Call active call of user (if any) Event::RemovedUserFromConversation(user_id, conv_id) => { let mut conn = None; user_ws_controller::foreach_connection(|f| { if f.user_id() == user_id && f.is_having_call_with_conversation(conv_id) { conn = Some(f.clone()); } Ok(()) })?; if let Some(c) = conn { make_user_leave_call(conv_id, &c).await?; } } // Call active call of user (if any) Event::DeletedConversation(conv_id) => { let mut connections = vec![]; user_ws_controller::foreach_connection(|f| { if f.is_having_call_with_conversation(conv_id) { connections.push(f.clone()); } Ok(()) })?; for con in connections { make_user_leave_call(conv_id, &con).await?; } } _ => {} } Ok(()) }