1
0
mirror of https://gitlab.com/comunic/comunicapiv3 synced 2024-11-22 13:29:21 +00:00

Handle Websocket continuation messages

This commit is contained in:
Pierre HUBERT 2021-10-23 16:32:03 +02:00
parent 451e9f702a
commit 92f89ec549
3 changed files with 46 additions and 15 deletions

3
Cargo.lock generated
View File

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3
[[package]] [[package]]
name = "actix" name = "actix"
version = "0.10.0" version = "0.10.0"
@ -765,6 +767,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"actix", "actix",
"actix-files", "actix-files",
"actix-http",
"actix-multipart", "actix-multipart",
"actix-rt", "actix-rt",
"actix-web", "actix-web",

View File

@ -15,6 +15,7 @@ actix-files = "0.5.0"
actix-rt = "1.1.1" actix-rt = "1.1.1"
actix-multipart = "0.3.0" actix-multipart = "0.3.0"
actix-web-actors = "3.0.0" actix-web-actors = "3.0.0"
actix-http = "2.2.0"
serde = { version = "1.0.123", features = ["derive"] } serde = { version = "1.0.123", features = ["derive"] }
serde_json = "1.0.62" serde_json = "1.0.62"
futures = "0.3.12" futures = "0.3.12"

View File

@ -14,8 +14,12 @@ use crate::data::error::{ExecError, Res};
use crate::helpers::events_helper; use crate::helpers::events_helper;
use crate::helpers::events_helper::Event; use crate::helpers::events_helper::Event;
use crate::utils::network_utils::match_ip; use crate::utils::network_utils::match_ip;
use actix_http::ws::Item;
struct RtcRelayActor {} #[derive(Default)]
struct RtcRelayActor {
recv_buff: Vec<u8>,
}
#[allow(non_snake_case)] #[allow(non_snake_case)]
#[derive(Deserialize)] #[derive(Deserialize)]
@ -105,6 +109,22 @@ impl actix::Actor for RtcRelayActor {
} }
} }
impl RtcRelayActor {
fn handle_message(&self, txt: &str) {
match serde_json::from_str::<RTCSocketMessage>(&txt) {
Err(e) => {
eprintln!("Failed to parse a message from RTC proxy! {:#?}", e);
}
Ok(msg) => {
if let Err(e) = process_message_from_relay(&msg) {
eprintln!("Failed to process signal from RTC Relay! {:#?}", e);
}
}
}
}
}
impl StreamHandler<Result<actix_web_actors::ws::Message, actix_web_actors::ws::ProtocolError>> for RtcRelayActor { impl StreamHandler<Result<actix_web_actors::ws::Message, actix_web_actors::ws::ProtocolError>> for RtcRelayActor {
fn handle(&mut self, msg: Result<Message, ProtocolError>, ctx: &mut Self::Context) { fn handle(&mut self, msg: Result<Message, ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg { let msg = match msg {
@ -122,24 +142,31 @@ impl StreamHandler<Result<actix_web_actors::ws::Message, actix_web_actors::ws::P
// Handle messages // Handle messages
match msg { match msg {
Message::Text(txt) => { Message::Text(txt) => {
match serde_json::from_str::<RTCSocketMessage>(&txt) { self.handle_message(&txt);
Err(e) => {
eprintln!("Failed to parse a message from RTC proxy! {:#?}", e);
}
Ok(msg) => {
if let Err(e) = process_message_from_relay(&msg) {
eprintln!("Failed to process signal from RTC Relay! {:#?}", e);
}
}
}
} }
Message::Binary(_) => { Message::Binary(_) => {
eprintln!("RTC WS Message::Binary"); eprintln!("RTC WS Message::Binary");
ctx.stop(); ctx.stop();
} }
Message::Continuation(_) => { Message::Continuation(c) => {
eprintln!("RTC WS Message::Continuation"); eprintln!("RTC WS Message::Continuation {:?}", c);
match c {
Item::FirstText(c) => {
self.recv_buff = c.as_ref().to_vec();
}
Item::FirstBinary(_) => {
eprintln!("Received binary !!!");
ctx.stop();
}
Item::Continue(c) => {
self.recv_buff.extend_from_slice(c.as_ref());
}
Item::Last(c) => {
self.recv_buff.extend_from_slice(c.as_ref());
self.handle_message(&String::from_utf8_lossy(&self.recv_buff))
}
}
} }
Message::Ping(data) => { Message::Ping(data) => {
ctx.pong(&data); ctx.pong(&data);
@ -262,7 +289,7 @@ pub async fn open_ws(req: actix_web::HttpRequest,
} }
// Start the actor // Start the actor
actix_web_actors::ws::start(RtcRelayActor {}, &req, stream) actix_web_actors::ws::start(RtcRelayActor::default(), &req, stream)
} }
/// Send a message to the relay /// Send a message to the relay