From 92f89ec54982d90bcef4896282f05876e6717660 Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Sat, 23 Oct 2021 16:32:03 +0200 Subject: [PATCH] Handle Websocket continuation messages --- Cargo.lock | 3 ++ Cargo.toml | 1 + src/controllers/rtc_relay_controller.rs | 57 ++++++++++++++++++------- 3 files changed, 46 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 215b4da..e95d15a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "actix" version = "0.10.0" @@ -765,6 +767,7 @@ version = "0.1.0" dependencies = [ "actix", "actix-files", + "actix-http", "actix-multipart", "actix-rt", "actix-web", diff --git a/Cargo.toml b/Cargo.toml index 5917b57..d0f605b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ actix-files = "0.5.0" actix-rt = "1.1.1" actix-multipart = "0.3.0" actix-web-actors = "3.0.0" +actix-http = "2.2.0" serde = { version = "1.0.123", features = ["derive"] } serde_json = "1.0.62" futures = "0.3.12" diff --git a/src/controllers/rtc_relay_controller.rs b/src/controllers/rtc_relay_controller.rs index 4e6e76d..3dce1c9 100644 --- a/src/controllers/rtc_relay_controller.rs +++ b/src/controllers/rtc_relay_controller.rs @@ -14,8 +14,12 @@ use crate::data::error::{ExecError, Res}; use crate::helpers::events_helper; use crate::helpers::events_helper::Event; use crate::utils::network_utils::match_ip; +use actix_http::ws::Item; -struct RtcRelayActor {} +#[derive(Default)] +struct RtcRelayActor { + recv_buff: Vec, +} #[allow(non_snake_case)] #[derive(Deserialize)] @@ -105,6 +109,22 @@ impl actix::Actor for RtcRelayActor { } } +impl RtcRelayActor { + fn handle_message(&self, txt: &str) { + match serde_json::from_str::(&txt) { + Err(e) => { + eprintln!("Failed to parse a message from RTC proxy! {:#?}", e); + } + + Ok(msg) => { + if let Err(e) = process_message_from_relay(&msg) { + eprintln!("Failed to process signal from RTC Relay! {:#?}", e); + } + } + } + } +} + impl StreamHandler> for RtcRelayActor { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { @@ -122,24 +142,31 @@ impl StreamHandler { - match serde_json::from_str::(&txt) { - Err(e) => { - eprintln!("Failed to parse a message from RTC proxy! {:#?}", e); - } - - Ok(msg) => { - if let Err(e) = process_message_from_relay(&msg) { - eprintln!("Failed to process signal from RTC Relay! {:#?}", e); - } - } - } + self.handle_message(&txt); } Message::Binary(_) => { eprintln!("RTC WS Message::Binary"); ctx.stop(); } - Message::Continuation(_) => { - eprintln!("RTC WS Message::Continuation"); + Message::Continuation(c) => { + eprintln!("RTC WS Message::Continuation {:?}", c); + + match c { + Item::FirstText(c) => { + self.recv_buff = c.as_ref().to_vec(); + } + Item::FirstBinary(_) => { + eprintln!("Received binary !!!"); + ctx.stop(); + } + Item::Continue(c) => { + self.recv_buff.extend_from_slice(c.as_ref()); + } + Item::Last(c) => { + self.recv_buff.extend_from_slice(c.as_ref()); + self.handle_message(&String::from_utf8_lossy(&self.recv_buff)) + } + } } Message::Ping(data) => { ctx.pong(&data); @@ -262,7 +289,7 @@ pub async fn open_ws(req: actix_web::HttpRequest, } // Start the actor - actix_web_actors::ws::start(RtcRelayActor {}, &req, stream) + actix_web_actors::ws::start(RtcRelayActor::default(), &req, stream) } /// Send a message to the relay