From 67549d54a30d4e63faa8fa6d7c28b7a5a13a457c Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Thu, 20 Mar 2025 22:08:02 +0100 Subject: [PATCH] Migrate from actix-web-actor to actix-ws --- virtweb_backend/Cargo.lock | 46 ++-- virtweb_backend/Cargo.toml | 4 +- virtweb_backend/src/actors/mod.rs | 2 +- virtweb_backend/src/actors/vnc_actor.rs | 209 ------------------ virtweb_backend/src/actors/vnc_handler.rs | 129 +++++++++++ .../src/controllers/vm_controller.rs | 23 +- 6 files changed, 178 insertions(+), 235 deletions(-) delete mode 100644 virtweb_backend/src/actors/vnc_actor.rs create mode 100644 virtweb_backend/src/actors/vnc_handler.rs diff --git a/virtweb_backend/Cargo.lock b/virtweb_backend/Cargo.lock index 1704573..05ae65d 100644 --- a/virtweb_backend/Cargo.lock +++ b/virtweb_backend/Cargo.lock @@ -318,24 +318,6 @@ dependencies = [ "url", ] -[[package]] -name = "actix-web-actors" -version = "4.3.1+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f98c5300b38fd004fe7d2a964f9a90813fdbe8a81fed500587e78b1b71c6f980" -dependencies = [ - "actix", - "actix-codec", - "actix-http", - "actix-web", - "bytes", - "bytestring", - "futures-core", - "pin-project-lite", - "tokio", - "tokio-util", -] - [[package]] name = "actix-web-codegen" version = "4.3.0" @@ -348,6 +330,20 @@ dependencies = [ "syn", ] +[[package]] +name = "actix-ws" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3a1fb4f9f2794b0aadaf2ba5f14a6f034c7e86957b458c506a8cb75953f2d99" +dependencies = [ + "actix-codec", + "actix-http", + "actix-web", + "bytestring", + "futures-core", + "tokio", +] + [[package]] name = "actix_derive" version = "0.6.2" @@ -3428,9 +3424,21 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "socket2", + "tokio-macros", "windows-sys 0.52.0", ] +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -3729,7 +3737,7 @@ dependencies = [ "actix-remote-ip", "actix-session", "actix-web", - "actix-web-actors", + "actix-ws", "anyhow", "basic-jwt", "bytes", diff --git a/virtweb_backend/Cargo.toml b/virtweb_backend/Cargo.toml index 75855c3..9c81981 100644 --- a/virtweb_backend/Cargo.toml +++ b/virtweb_backend/Cargo.toml @@ -18,7 +18,7 @@ actix-session = { version = "0.10.1", features = ["cookie-session"] } actix-identity = "0.8.0" actix-cors = "0.7.1" actix-files = "0.6.6" -actix-web-actors = "4.3.1" +actix-ws = "0.3.0" actix-http = "3.10.0" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" @@ -37,7 +37,7 @@ thiserror = "2.0.12" image = "0.25.5" rand = "0.9.0" bytes = "1.10.1" -tokio = "1.44.1" +tokio = { version = "1.44.1", features = ["rt", "time", "macros"] } futures = "0.3.31" ipnetwork = { version = "0.21.1", features = ["serde"] } num = "0.4.2" diff --git a/virtweb_backend/src/actors/mod.rs b/virtweb_backend/src/actors/mod.rs index 36e3496..56d0677 100644 --- a/virtweb_backend/src/actors/mod.rs +++ b/virtweb_backend/src/actors/mod.rs @@ -1,3 +1,3 @@ pub mod libvirt_actor; -pub mod vnc_actor; +pub mod vnc_handler; pub mod vnc_tokens_actor; diff --git a/virtweb_backend/src/actors/vnc_actor.rs b/virtweb_backend/src/actors/vnc_actor.rs deleted file mode 100644 index fd4814c..0000000 --- a/virtweb_backend/src/actors/vnc_actor.rs +++ /dev/null @@ -1,209 +0,0 @@ -use actix::{Actor, ActorContext, AsyncContext, Handler, StreamHandler}; -use actix_http::ws::Item; -use actix_web_actors::ws; -use actix_web_actors::ws::Message; -use bytes::Bytes; -use image::EncodableLayout; -use std::path::Path; -use std::time::{Duration, Instant}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf}; -use tokio::net::UnixStream; - -/// How often heartbeat pings are sent -const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); - -/// How long before lack of client response causes a timeout -const CLIENT_TIMEOUT: Duration = Duration::from_secs(20); - -#[derive(thiserror::Error, Debug)] -enum VNCError { - #[error("Socket file does not exists!")] - SocketDoesNotExists, -} - -pub struct VNCActor { - /// Qemu -> WS - read_half: Option, - - /// WS -> Qemu - write_half: OwnedWriteHalf, - - // Client must respond to ping at a specific interval, otherwise we drop connection - hb: Instant, -} - -impl VNCActor { - pub async fn new(socket_path: &str) -> anyhow::Result { - let socket_path = Path::new(socket_path); - - if !socket_path.exists() { - return Err(VNCError::SocketDoesNotExists.into()); - } - - let socket = UnixStream::connect(socket_path).await?; - let (read_half, write_half) = socket.into_split(); - - Ok(Self { - read_half: Some(read_half), - write_half, - hb: Instant::now(), - }) - } - - /// helper method that sends ping to client every second. - /// - /// also this method checks heartbeats from client - fn hb(&self, ctx: &mut ws::WebsocketContext) { - ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { - // check client heartbeats - if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { - // heartbeat timed out - log::warn!("WebSocket Client heartbeat failed, disconnecting!"); - ctx.stop(); - return; - } - - ctx.ping(b""); - }); - } - - fn send_to_socket(&mut self, bytes: Bytes, ctx: &mut ws::WebsocketContext) { - log::trace!("Received {} bytes for VNC socket", bytes.len()); - - if let Err(e) = futures::executor::block_on(self.write_half.write(bytes.as_bytes())) { - log::error!("Failed to relay bytes to VNC socket {e}"); - ctx.close(None); - } - } - - fn start_qemu_to_ws_end(&mut self, ctx: &mut ws::WebsocketContext) { - let mut read_half = self.read_half.take().unwrap(); - let addr = ctx.address(); - let future = async move { - let mut buff: [u8; 5000] = [0; 5000]; - loop { - match read_half.read(&mut buff).await { - Ok(mut l) => { - if l == 0 { - log::warn!("Got empty read!"); - - // Ugly hack made to wait for next byte - let mut one_byte_buff: [u8; 1] = [0; 1]; - match read_half.read_exact(&mut one_byte_buff).await { - Ok(b) => { - if b == 0 { - log::error!("Did not get a byte !"); - let _ = addr.send(CloseWebSocketReq).await; - break; - } - buff[0] = one_byte_buff[0]; - l = 1; - } - Err(e) => { - log::error!("Failed to read 1 BYTE from remote socket. Stopping now... {:?}", e); - break; - } - } - } - - let to_send = SendBytesReq(Vec::from(&buff[0..l])); - if let Err(e) = addr.send(to_send).await { - log::error!("Failed to send to websocket. Stopping now... {:?}", e); - return; - } - } - Err(e) => { - log::error!("Failed to read from remote socket. Stopping now... {:?}", e); - break; - } - }; - } - - log::info!("Exited read loop"); - }; - - tokio::spawn(future); - } -} - -impl Actor for VNCActor { - type Context = ws::WebsocketContext; - - fn started(&mut self, ctx: &mut Self::Context) { - self.hb(ctx); - self.start_qemu_to_ws_end(ctx); - } -} - -impl StreamHandler> for VNCActor { - fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { - match msg { - Ok(Message::Ping(msg)) => ctx.pong(&msg), - - Ok(Message::Text(_text)) => { - log::error!("Received unexpected text on VNC WebSocket!"); - } - - Ok(Message::Binary(bin)) => { - log::info!("Forward {} bytes to VNC server", bin.len()); - self.send_to_socket(bin, ctx); - } - - Ok(Message::Continuation(msg)) => match msg { - Item::FirstText(_) => { - log::error!("Received unexpected split text!"); - ctx.close(None); - } - Item::FirstBinary(bin) | Item::Continue(bin) | Item::Last(bin) => { - self.send_to_socket(bin, ctx); - } - }, - - Ok(Message::Pong(_)) => { - log::trace!("Received PONG message"); - self.hb = Instant::now(); - } - - Ok(Message::Close(r)) => { - log::info!("WebSocket closed. Reason={r:?}"); - ctx.close(r); - } - - Ok(Message::Nop) => { - log::debug!("Received Nop message") - } - - Err(e) => { - log::error!("WebSocket protocol error! {e}"); - ctx.close(None) - } - } - } -} - -#[derive(actix::Message)] -#[rtype(result = "()")] -pub struct SendBytesReq(Vec); - -impl Handler for VNCActor { - type Result = (); - - fn handle(&mut self, msg: SendBytesReq, ctx: &mut Self::Context) -> Self::Result { - log::trace!("Send {} bytes to WS", msg.0.len()); - ctx.binary(msg.0); - } -} - -#[derive(actix::Message)] -#[rtype(result = "()")] -pub struct CloseWebSocketReq; - -impl Handler for VNCActor { - type Result = (); - - fn handle(&mut self, _msg: CloseWebSocketReq, ctx: &mut Self::Context) -> Self::Result { - log::trace!("Close websocket, because VNC socket has terminated"); - ctx.close(None); - } -} diff --git a/virtweb_backend/src/actors/vnc_handler.rs b/virtweb_backend/src/actors/vnc_handler.rs new file mode 100644 index 0000000..c5163ad --- /dev/null +++ b/virtweb_backend/src/actors/vnc_handler.rs @@ -0,0 +1,129 @@ +use actix_http::ws::Message; +use futures_util::StreamExt as _; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::UnixStream; +use tokio::select; +use tokio::time::interval; + +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); + +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(20); + +/// Broadcast text & binary messages received from a client, respond to ping messages, and monitor +/// connection health to detect network issues and free up resources. +pub async fn handle( + mut session: actix_ws::Session, + mut msg_stream: actix_ws::MessageStream, + mut socket: UnixStream, +) { + log::info!("Connected to websocket"); + + let mut last_heartbeat = Instant::now(); + let mut interval = interval(HEARTBEAT_INTERVAL); + + let mut buf_socket = [0u8; 1024]; + + let reason = loop { + // waits for either `msg_stream` to receive a message from the client, the broadcast channel + // to send a message, or the heartbeat interval timer to tick, yielding the value of + // whichever one is ready first + select! { + + // heartbeat interval ticked + _tick = interval.tick() => { + // if no heartbeat ping/pong received recently, close the connection + if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { + log::info!( + "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" + ); + + break None; + } + + // send heartbeat ping + let _ = session.ping(b"").await; + } + + msg = msg_stream.next() => { + let msg = match msg { + // received message from WebSocket client + Some(Ok(msg)) => msg, + + // client WebSocket stream error + Some(Err(err)) => { + log::error!("{err}"); + break None; + } + + // client WebSocket stream ended + None => break None + }; + + log::debug!("msg: {msg:?}"); + + match msg { + Message::Text(_) => { + log::error!("Received unexpected text on VNC WebSocket!"); + } + + Message::Binary(bin) => { + log::info!("Forward {} bytes to VNC server", bin.len()); + if let Err(e) = socket.write(&bin).await { + log::error!("Failed to relay bytes to VNC socket {e}"); + break None; + } + } + + Message::Close(reason) => { + break reason; + } + + Message::Ping(bytes) => { + last_heartbeat = Instant::now(); + let _ = session.pong(&bytes).await; + } + + Message::Pong(_) => { + last_heartbeat = Instant::now(); + } + + Message::Continuation(_) => { + log::warn!("no support for continuation frames"); + } + + // no-op; ignore + Message::Nop => {} + }; + } + + // Forward socket packet to WS client + count = socket.read(&mut buf_socket) => { + let count = match count { + Ok(count) => count, + Err(e) => { + log::error!("[VNC] Failed to read from upstream! {e}"); + break None; + } + }; + + if count == 0 { + log::warn!("[VNC] infinite loop (upstream), closing connection"); + break None; + } + + if let Err(e)=session.binary(buf_socket[0..count].to_vec()).await{ + log::error!("[VNC] Failed to forward messages to upstream, will close connection! {e}"); + break None + } + } + } + }; + + // attempt to close connection gracefully + let _ = session.close(reason).await; + + log::info!("Disconnected from websocket"); +} diff --git a/virtweb_backend/src/controllers/vm_controller.rs b/virtweb_backend/src/controllers/vm_controller.rs index 2031767..6eb94b2 100644 --- a/virtweb_backend/src/controllers/vm_controller.rs +++ b/virtweb_backend/src/controllers/vm_controller.rs @@ -1,11 +1,12 @@ -use crate::actors::vnc_actor::VNCActor; +use crate::actors::vnc_handler; use crate::actors::vnc_tokens_actor::VNCTokensManager; use crate::controllers::{HttpResult, LibVirtReq}; use crate::libvirt_lib_structures::domain::DomainState; use crate::libvirt_lib_structures::XMLUuid; use crate::libvirt_rest_structures::vm::VMInfo; -use actix_web::{web, HttpRequest, HttpResponse}; -use actix_web_actors::ws; +use actix_web::{rt, web, HttpRequest, HttpResponse}; +use std::path::Path; +use tokio::net::UnixStream; #[derive(serde::Serialize)] struct VMInfoAndState { @@ -324,5 +325,19 @@ pub async fn vnc( }; log::info!("Start VNC connection on socket {socket_path}"); - Ok(ws::start(VNCActor::new(&socket_path).await?, &req, stream)?) + + let socket_path = Path::new(&socket_path); + if !socket_path.exists() { + log::error!("VNC socket path {socket_path:?} does not exist!"); + return Ok(HttpResponse::ServiceUnavailable().json("VNC socket path does not exists!")); + } + + let socket = UnixStream::connect(socket_path).await?; + + let (res, session, msg_stream) = actix_ws::handle(&req, stream)?; + + // spawn websocket handler (and don't await it) so that the response is returned immediately + rt::spawn(vnc_handler::handle(session, msg_stream, socket)); + + Ok(res) }