//! # RTC Relay controller //! //! @author Pierre Hubert use actix::{ActorContext, Addr, AsyncContext, Handler, StreamHandler}; use actix::prelude::*; use actix_web_actors::ws::{Message, ProtocolError}; use serde::Serialize; use crate::data::config::conf; use crate::helpers::events_helper; use crate::helpers::events_helper::Event; struct RtcRelayActor {} #[derive(Message)] #[rtype(result = "()")] enum RTCMessages { CLOSE } #[allow(non_snake_case)] #[derive(Serialize)] struct CallsConfig { allowVideo: bool, iceServers: Vec, } #[derive(Serialize)] struct CallsConfigWrapper { title: String, data: CallsConfig, } /// Current WebSocket connection static mut ACTIVE_RTC_CONNECTION: Option> = None; impl actix::Actor for RtcRelayActor { type Context = actix_web_actors::ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { // Replace known address of actor unsafe { ACTIVE_RTC_CONNECTION = Some(ctx.address()); } println!("Started new WebSocket connection to RTC relay!"); // Send calls configuration to server ctx.text(serde_json::to_string(&CallsConfigWrapper { title: "config".to_string(), data: CallsConfig { allowVideo: conf().rtc_relay.as_ref().unwrap().allow_video, iceServers: conf().rtc_relay.as_ref().unwrap().ice_servers.clone(), }, }).unwrap()) } fn stopped(&mut self, _ctx: &mut Self::Context) { println!("Closed connection to RTC relay."); // Propagate information if let Err(e) = events_helper::propagate_event(&Event::ClosedRTCRelayWebSocket) { eprintln!("Failed to propagate rtc closed event! {:#?}", e); } } } impl StreamHandler> for RtcRelayActor { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { Err(_) => { ctx.stop(); return; } Ok(msg) => msg }; if conf().verbose_mode { println!("RTC RELAY WS MESSAGE: {:?}", msg); } // TODO : handle messages } } impl Handler for RtcRelayActor { type Result = (); fn handle(&mut self, _msg: RTCMessages, ctx: &mut Self::Context) -> Self::Result { ctx.close(None) } } /// Get current actix connection fn get_active_connection() -> Option> { let conn; unsafe { conn = ACTIVE_RTC_CONNECTION.clone(); } conn } /// Check out whether a relay is currently connected to the API or not pub fn is_connected() -> bool { if let Some(conn) = get_active_connection() { return conn.connected(); } false } /// Establish a new connection with the RTC relay /// /// Debug with /// ```js /// ws = new WebSocket("ws://0.0.0.0:3000/rtc_proxy/ws"); /// ws.onmessage = (msg) => console.log("WS msg", msg); /// ws.onopen = () => console.log("Socket is open !"); /// ws.onerror = (e) => console.log("WS ERROR !", e); /// ws.onclose = (e) => console.log("WS CLOSED!"); /// ``` pub async fn open_ws(req: actix_web::HttpRequest, stream: actix_web::web::Payload) -> Result { let ip = req.peer_addr().unwrap(); // Check if video calls are enabled if conf().rtc_relay.is_none() { eprintln!("A relay from {} tried to connect to the server but the relay is disabled!", ip); return Ok(actix_web::HttpResponse::BadRequest().body("RTC Relay not configured!")); } let conf = conf().rtc_relay.as_ref().unwrap(); // Check remote IP address if !ip.ip().to_string().eq(&conf.ip) { eprintln!("A relay from {} tried to connect to the server but the IP address is not authorized!", ip); return Ok(actix_web::HttpResponse::Unauthorized().body("Access denied!")); } // Check the token if !req.query_string().eq(&format!("token={}", &conf.token)) { eprintln!("A relay from {} tried to connect with an invalid access token!", ip); return Ok(actix_web::HttpResponse::Unauthorized().body("Invalid token!")); } // Close previous connection (if any) if let Some(conn) = get_active_connection() { conn.do_send(RTCMessages::CLOSE); } // Start the actor actix_web_actors::ws::start(RtcRelayActor {}, &req, stream) }