1
0
mirror of https://gitlab.com/comunic/comunicapiv3 synced 2025-06-21 00:45:18 +00:00

Relay messages from RTC proxy to user

This commit is contained in:
2021-02-11 19:07:30 +01:00
parent 4c8d4345a2
commit 50b094acfb
6 changed files with 132 additions and 6 deletions

View File

@ -9,6 +9,7 @@ use webrtc_sdp::attribute_type::SdpAttribute;
use crate::api_data::call_member_info::CallMemberInfo;
use crate::api_data::joined_call_message::JoinedCallMessage;
use crate::api_data::left_call_message::LeftCallMessage;
use crate::api_data::new_call_signal::NewCallSignalAPI;
use crate::api_data::user_calls_config::UserCallsConfig;
use crate::controllers::routes::RequestResult;
use crate::controllers::user_ws_controller;
@ -306,6 +307,30 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
}
}
Event::NewRTCRelayMessage(msg) => {
// Get call hash
let split: Vec<&str> = msg.call_hash.split("-").collect();
if split.len() != 2 {
return Ok(());
}
let call_id = split[0].parse::<u64>()?;
let peer_id = UserID::new(split[1].parse::<u64>()?);
let target_user = UserID::new(msg.peer_id.parse::<u64>()?);
let target_user = match target_user.is_valid() {
true => target_user,
false => peer_id.clone()
};
user_ws_controller::send_message_to_specific_connections(
|c| c.user_id == target_user && c.is_having_call_with_conversation(&call_id),
|_| UserWsMessage::no_id_message("new_call_signal", NewCallSignalAPI::new(&call_id, &peer_id, &msg.data)?),
None::<fn(&_) -> _>,
)?;
}
_ => {}
}

View File

@ -5,9 +5,10 @@
use actix::{ActorContext, Addr, AsyncContext, Handler, StreamHandler};
use actix::prelude::*;
use actix_web_actors::ws::{Message, ProtocolError};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::data::call_signal::CallSignal;
use crate::data::call_signal::{CallSignal, NewRtcRelayMessage};
use crate::data::config::conf;
use crate::data::error::{ExecError, Res};
use crate::helpers::events_helper;
@ -15,6 +16,15 @@ use crate::helpers::events_helper::Event;
struct RtcRelayActor {}
#[allow(non_snake_case)]
#[derive(Deserialize)]
struct RTCSocketMessage {
title: String,
callHash: Option<String>,
peerId: Option<String>,
data: Option<serde_json::Value>,
}
#[derive(Message)]
#[rtype(result = "()")]
enum RTCMessages {
@ -98,10 +108,62 @@ impl StreamHandler<Result<actix_web_actors::ws::Message, actix_web_actors::ws::P
println!("RTC RELAY WS MESSAGE: {:?}", msg);
}
// TODO : handle messages
// Handle messages
match msg {
Message::Text(txt) => {
match serde_json::from_str::<RTCSocketMessage>(&txt) {
Err(e) => {
eprintln!("Failed to parse a message from RTC proxy! {:#?}", e);
}
Ok(msg) => {
if let Err(e) = process_message_from_relay(&msg) {
eprintln!("Failed to process signal from RTC Relay! {:#?}", e);
}
}
}
}
Message::Binary(_) => {
eprintln!("RTC WS Message::Binary");
ctx.stop();
}
Message::Continuation(_) => {
eprintln!("RTC WS Message::Continuation");
ctx.stop();
}
Message::Ping(data) => {
ctx.pong(&data);
}
Message::Pong(_) => {}
Message::Close(_) => {
eprintln!("RTC WS Message::Close");
ctx.stop();
}
Message::Nop => {}
}
}
}
/// Process a message from the server
fn process_message_from_relay(msg: &RTCSocketMessage) -> Res {
match msg.title.as_str() {
"signal" => {
events_helper::propagate_event(&Event::NewRTCRelayMessage(&NewRtcRelayMessage {
call_hash: msg.callHash.clone().unwrap_or(String::new()),
peer_id: msg.peerId.clone().unwrap_or("0".to_string()),
data: serde_json::to_string(msg.data.as_ref().unwrap_or(&Value::Null))
.unwrap_or("failed to serialize signal data".to_string()),
}))?;
}
title => {
eprintln!("Unknown message '{}' from RTC proxy!", title);
}
}
Ok(())
}
impl Handler<RTCMessages> for RtcRelayActor {
type Result = ();