From 4c8d4345a2c1c877261c7b4a873b39417fc5edd8 Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Thu, 11 Feb 2021 18:29:29 +0100 Subject: [PATCH] Relay client call signals to RTC proxy --- Cargo.lock | 11 +++ Cargo.toml | 3 +- src/controllers/calls_controller.rs | 124 ++++++++++++++++++++++++ src/controllers/rtc_relay_controller.rs | 76 ++++++++++++++- src/controllers/user_ws_routes.rs | 1 + src/data/call_signal.rs | 45 +++++++++ src/data/mod.rs | 3 +- src/data/user_ws_request_handler.rs | 14 +-- src/helpers/events_helper.rs | 7 +- 9 files changed, 264 insertions(+), 20 deletions(-) create mode 100644 src/data/call_signal.rs diff --git a/Cargo.lock b/Cargo.lock index bf6c0cf..6be611e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -640,6 +640,7 @@ dependencies = [ "serde", "serde_json", "sha1", + "webrtc-sdp", "yaml-rust", ] @@ -2846,6 +2847,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webrtc-sdp" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ca3fe7f015c28dcfbea0b0e05fbcb1a26ed902f80509dd0a4bf40556e73f58" +dependencies = [ + "log", + "url", +] + [[package]] name = "widestring" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 6066e53..7c6bd73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,4 +31,5 @@ mime_guess = "2.0.3" pdf = "0.6.3" regex = "1.4.2" dashmap = "3.11.10" -reqwest = { version = "0.10.6", features = ["json", "blocking"] } \ No newline at end of file +reqwest = { version = "0.10.6", features = ["json", "blocking"] } +webrtc-sdp = "0.3.6" \ No newline at end of file diff --git a/src/controllers/calls_controller.rs b/src/controllers/calls_controller.rs index 8a7bf4c..1c751f3 100644 --- a/src/controllers/calls_controller.rs +++ b/src/controllers/calls_controller.rs @@ -4,6 +4,8 @@ use std::collections::HashMap; +use webrtc_sdp::attribute_type::SdpAttribute; + use crate::api_data::call_member_info::CallMemberInfo; use crate::api_data::joined_call_message::JoinedCallMessage; use crate::api_data::left_call_message::LeftCallMessage; @@ -11,16 +13,55 @@ use crate::api_data::user_calls_config::UserCallsConfig; use crate::controllers::routes::RequestResult; use crate::controllers::user_ws_controller; use crate::data::base_request_handler::BaseRequestHandler; +use crate::data::call_signal::{CallSignal, IceCandidate, NewUserCallSignal, SdpType}; use crate::data::config::conf; use crate::data::conversation::ConvID; use crate::data::error::{ExecError, Res}; use crate::data::http_request_handler::HttpRequestHandler; +use crate::data::user::UserID; use crate::data::user_ws_connection::{ActiveCall, UserWsConnection}; use crate::data::user_ws_message::UserWsMessage; use crate::data::user_ws_request_handler::UserWsRequestHandler; use crate::helpers::{calls_helper, conversations_helper, events_helper}; use crate::helpers::events_helper::Event; +impl UserWsRequestHandler { + /// Get the ID of a call included in a WebSocket request + fn post_call_id(&mut self, name: &str) -> Res { + let conv_id = self.post_u64(name)?; + + if !self.get_conn().is_having_call_with_conversation(&conv_id) { + self.forbidden("You do not belong to this call!".to_string())?; + } + + Ok(conv_id) + } + + /// Get the ID of a peer for a call included in the WebSocket request + fn post_call_peer_id(&mut self, call_id: &ConvID, name: &str) -> Res { + let peer_id = UserID::new(self.post_u64(name)?); + + if peer_id == self.user_id_ref()? { + return Ok(peer_id); + } + + let mut found = false; + user_ws_controller::foreach_connection(|p| { + if !found && p.user_id == peer_id && p.is_having_call_with_conversation(call_id) { + found = true; + } + + Ok(()) + })?; + + if !found { + self.forbidden("This peer is not a member of the call !".to_string())?; + } + + Ok(peer_id) + } +} + /// Get legacy call configuration pub fn get_legacy_config(r: &mut HttpRequestHandler) -> RequestResult { let mut map = HashMap::new(); @@ -137,6 +178,89 @@ pub fn get_members_list(r: &mut UserWsRequestHandler) -> RequestResult { r.set_response(list) } +/// Get the hash associated to a call +pub fn gen_call_hash(call_id: &ConvID, peer_id: &UserID) -> String { + format!("{}-{}", call_id, peer_id.id()) +} + +/// Handles client signal +pub fn on_client_signal(r: &mut UserWsRequestHandler) -> RequestResult { + let call_id = r.post_call_id("callID")?; + let peer_id = r.post_call_peer_id(&call_id, "peerID")?; + let sig_type = r.post_string("type")?; + + let data: serde_json::Value = serde_json::from_str(&r.post_string("data")?)?; + let data = data + .as_object() + .ok_or(ExecError::boxed_new("Signal data is not an object !"))?; + + let signal = match sig_type.as_str() { + "SDP" => { + // Get SDP type + let sdp_type = SdpType::from_str(data.get("type") + .unwrap_or(&serde_json::Value::Null) + .as_str() + .ok_or(ExecError::boxed_new("Missing SDP type !"))? + )?; + + let sdp = data + .get("sdp") + .unwrap_or(&serde_json::Value::Null) + .as_str() + .ok_or(ExecError::boxed_new("Failed to get sdp message data!"))?; + + let parsed_sdp = webrtc_sdp::parse_sdp(sdp, false)?; + + CallSignal::SDP(sdp.to_string(), sdp_type, parsed_sdp) + } + + "CANDIDATE" => { + let candidate = data + .get("candidate") + .unwrap_or(&serde_json::Value::Null) + .as_str() + .ok_or(ExecError::boxed_new("Failed to get candidate message data!"))? + .to_string(); + + let sdp_m_line_index = data + .get("sdpMLineIndex") + .unwrap_or(&serde_json::Value::Null) + .as_u64() + .ok_or(ExecError::boxed_new("Failed to get sdp_mline_index data!"))?; + + let sdp_mid = data + .get("sdpMid") + .unwrap_or(&serde_json::Value::Null) + .as_str() + .ok_or(ExecError::boxed_new("Failed to get sdpMid message data!"))? + .to_string(); + + + let parsed_candidate: SdpAttribute = candidate.trim().parse()?; + + CallSignal::Candidate(IceCandidate { + candidate, + sdp_m_line_index, + sdp_mid, + }, parsed_candidate) + } + + _ => { + r.forbidden("Invalid signal type!".to_string())?; + unreachable!() + } + }; + + events_helper::propagate_event(&Event::NewUserCallSignal(&NewUserCallSignal { + call_hash: gen_call_hash(&call_id, &peer_id), + user_id: if r.user_id_ref()? == &peer_id { None } else { Some(r.user_id()?) }, + signal, + raw_data: r.post_string("data")?, + }))?; + + r.success("Signal sent") +} + /// Make the user leave the call pub fn make_user_leave_call(conv_id: &ConvID, connection: &UserWsConnection) -> Res { diff --git a/src/controllers/rtc_relay_controller.rs b/src/controllers/rtc_relay_controller.rs index 2884411..7774dc2 100644 --- a/src/controllers/rtc_relay_controller.rs +++ b/src/controllers/rtc_relay_controller.rs @@ -7,7 +7,9 @@ use actix::prelude::*; use actix_web_actors::ws::{Message, ProtocolError}; use serde::Serialize; +use crate::data::call_signal::CallSignal; use crate::data::config::conf; +use crate::data::error::{ExecError, Res}; use crate::helpers::events_helper; use crate::helpers::events_helper::Event; @@ -16,7 +18,8 @@ struct RtcRelayActor {} #[derive(Message)] #[rtype(result = "()")] enum RTCMessages { - CLOSE + Close, + SendMessage(serde_json::Value), } #[allow(non_snake_case)] @@ -32,6 +35,21 @@ struct CallsConfigWrapper { data: CallsConfig, } +#[derive(Serialize)] +struct CallUserSignalData { + r#type: String, + data: serde_json::Value, +} + +#[allow(non_snake_case)] +#[derive(Serialize)] +struct CallUserSignal { + title: String, + callHash: String, + peerId: String, + data: CallUserSignalData, +} + /// Current WebSocket connection static mut ACTIVE_RTC_CONNECTION: Option> = None; @@ -87,8 +105,24 @@ impl StreamHandler for RtcRelayActor { type Result = (); - fn handle(&mut self, _msg: RTCMessages, ctx: &mut Self::Context) -> Self::Result { - ctx.close(None) + fn handle(&mut self, msg: RTCMessages, ctx: &mut Self::Context) -> Self::Result { + match msg { + + // Close connection + RTCMessages::Close => { + ctx.close(None) + } + + // Send message + RTCMessages::SendMessage(msg) => { + match serde_json::to_string(&msg) { + Ok(txt) => { ctx.text(txt) } + Err(e) => { + eprintln!("Failed to send message to RTC relay ! {:#?}", e); + } + } + } + } } } @@ -146,9 +180,43 @@ pub async fn open_ws(req: actix_web::HttpRequest, // Close previous connection (if any) if let Some(conn) = get_active_connection() { - conn.do_send(RTCMessages::CLOSE); + conn.do_send(RTCMessages::Close); } // Start the actor actix_web_actors::ws::start(RtcRelayActor {}, &req, stream) +} + +/// Send a message to the relay +fn send_message_to_relay(e: T) -> Res where T: Serialize { + let conn = get_active_connection() + .ok_or(ExecError::boxed_new("Connection to RTC relay missing!"))?; + + conn.do_send(RTCMessages::SendMessage(serde_json::to_value(e)?)); + + Ok(()) +} + +/// Events handler +pub fn handle_event(e: &events_helper::Event) -> Res { + match e { + Event::NewUserCallSignal(signal) => { + send_message_to_relay(CallUserSignal { + title: "signal".to_string(), + callHash: signal.call_hash.to_string(), + peerId: signal.user_id.as_ref().map(|i| i.id()).unwrap_or(0).to_string(), + data: CallUserSignalData { + r#type: match signal.signal { + CallSignal::SDP(_, _, _) => "SDP", + CallSignal::Candidate(_, _) => "CANDIDATE", + }.to_string(), + data: serde_json::from_str(&signal.raw_data)?, + }, + })?; + } + + _ => {} + } + + Ok(()) } \ No newline at end of file diff --git a/src/controllers/user_ws_routes.rs b/src/controllers/user_ws_routes.rs index 0eef054..6df648c 100644 --- a/src/controllers/user_ws_routes.rs +++ b/src/controllers/user_ws_routes.rs @@ -42,6 +42,7 @@ pub fn get_user_ws_routes() -> Vec { UserWsRoute::new("calls/join", calls_controller::join_call), UserWsRoute::new("calls/leave", calls_controller::leave_call), UserWsRoute::new("calls/members", calls_controller::get_members_list), + UserWsRoute::new("calls/signal", calls_controller::on_client_signal), ] } diff --git a/src/data/call_signal.rs b/src/data/call_signal.rs new file mode 100644 index 0000000..79c3581 --- /dev/null +++ b/src/data/call_signal.rs @@ -0,0 +1,45 @@ +//! # Call signal +//! +//! @author Pierre Hubert + +use crate::data::error::{ExecError, Res}; +use crate::data::user::UserID; + +pub enum SdpType { + Offer, + Answer, +} + +pub struct IceCandidate { + pub candidate: String, + pub sdp_m_line_index: u64, + pub sdp_mid: String, +} + +pub enum CallSignal { + /// Session Description Protocol + SDP(String, SdpType, webrtc_sdp::SdpSession), + + /// ICE candidate + Candidate(IceCandidate, webrtc_sdp::attribute_type::SdpAttribute), +} + +pub struct NewUserCallSignal { + pub call_hash: String, + + /// This value is set to none if the user who streams content is the same + /// as the receiver + pub user_id: Option, + pub signal: CallSignal, + pub raw_data: String, +} + +impl SdpType { + pub fn from_str(val: &str) -> Res { + match val { + "offer" => Ok(SdpType::Offer), + "answer" => Ok(SdpType::Answer), + _ => Err(ExecError::boxed_new(&format!("SDP type {} is unknown!", val))) + } + } +} \ No newline at end of file diff --git a/src/data/mod.rs b/src/data/mod.rs index 01d23f7..ab094e0 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -37,4 +37,5 @@ pub mod lang_settings; pub mod security_settings; pub mod new_custom_emoji; pub mod user_ws_message; -pub mod user_ws_connection; \ No newline at end of file +pub mod user_ws_connection; +pub mod call_signal; \ No newline at end of file diff --git a/src/data/user_ws_request_handler.rs b/src/data/user_ws_request_handler.rs index 78f0d7a..6840a8e 100644 --- a/src/data/user_ws_request_handler.rs +++ b/src/data/user_ws_request_handler.rs @@ -7,8 +7,7 @@ use serde::Serialize; use crate::api_data::http_error::HttpError; use crate::controllers::routes::RequestResult; use crate::data::base_request_handler::{BaseRequestHandler, RequestValue}; -use crate::data::conversation::ConvID; -use crate::data::error::{Res, ResultBoxError}; +use crate::data::error::ResultBoxError; use crate::data::user::UserID; use crate::data::user_ws_connection::UserWsConnection; @@ -62,17 +61,6 @@ impl UserWsRequestHandler { Ok(()) } - - /// Get the ID of a call included in a WebSocket request - pub fn post_call_id(&mut self, name: &str) -> Res { - let conv_id = self.post_u64(name)?; - - if !self.connection.is_having_call_with_conversation(&conv_id) { - self.forbidden("You do not belong to this call!".to_string())?; - } - - Ok(conv_id) - } } impl BaseRequestHandler for UserWsRequestHandler { diff --git a/src/helpers/events_helper.rs b/src/helpers/events_helper.rs index 2acbd1a..c744b8b 100644 --- a/src/helpers/events_helper.rs +++ b/src/helpers/events_helper.rs @@ -4,8 +4,9 @@ -use crate::controllers::{calls_controller, comments_controller, conversations_controller, notifications_controller, user_ws_controller}; +use crate::controllers::{calls_controller, comments_controller, conversations_controller, notifications_controller, rtc_relay_controller, user_ws_controller}; use crate::data::api_client::APIClient; +use crate::data::call_signal::NewUserCallSignal; use crate::data::comment::Comment; use crate::data::conversation::ConvID; use crate::data::conversation_message::ConversationMessage; @@ -55,6 +56,9 @@ pub enum Event<'a> { /// User left call UserLeftCall(&'a ConvID, &'a UserID), + /// Got new user call signal + NewUserCallSignal(&'a NewUserCallSignal), + /// No event None, } @@ -66,5 +70,6 @@ pub fn propagate_event(e: &Event) -> Res { notifications_controller::handle_event(e)?; user_ws_controller::handle_event(e)?; calls_controller::handle_event(e)?; + rtc_relay_controller::handle_event(e)?; Ok(()) } \ No newline at end of file