use actix::{Actor, ActorContext, AsyncContext, Handler, StreamHandler}; use actix_http::ws::Item; use actix_web_actors::ws; use actix_web_actors::ws::Message; use bytes::Bytes; use image::EncodableLayout; use std::path::Path; use std::time::{Duration, Instant}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::UnixStream; /// How often heartbeat pings are sent const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(20); #[derive(thiserror::Error, Debug)] enum VNCError { #[error("Socket file does not exists!")] SocketDoesNotExists, } pub struct VNCActor { /// Qemu -> WS read_half: Option, /// WS -> Qemu write_half: OwnedWriteHalf, // Client must respond to ping at a specific interval, otherwise we drop connection hb: Instant, } impl VNCActor { pub async fn new(socket_path: &str) -> anyhow::Result { let socket_path = Path::new(socket_path); if !socket_path.exists() { return Err(VNCError::SocketDoesNotExists.into()); } let socket = UnixStream::connect(socket_path).await?; let (read_half, write_half) = socket.into_split(); Ok(Self { read_half: Some(read_half), write_half, hb: Instant::now(), }) } /// helper method that sends ping to client every second. /// /// also this method checks heartbeats from client fn hb(&self, ctx: &mut ws::WebsocketContext) { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { // check client heartbeats if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { // heartbeat timed out log::warn!("WebSocket Client heartbeat failed, disconnecting!"); ctx.stop(); return; } ctx.ping(b""); }); } fn send_to_socket(&mut self, bytes: Bytes, ctx: &mut ws::WebsocketContext) { log::trace!("Received {} bytes for VNC socket", bytes.len()); if let Err(e) = futures::executor::block_on(self.write_half.write(bytes.as_bytes())) { log::error!("Failed to relay bytes to VNC socket {e}"); ctx.close(None); } } fn start_qemu_to_ws_end(&mut self, ctx: &mut ws::WebsocketContext) { let mut read_half = self.read_half.take().unwrap(); let addr = ctx.address(); let future = async move { let mut buff: [u8; 5000] = [0; 5000]; loop { match read_half.read(&mut buff).await { Ok(l) => { if l == 0 { log::error!("Got empty read. Closing read end..."); addr.do_send(CloseWebSocketReq); return; } let to_send = SendBytesReq(Vec::from(&buff[0..l])); if let Err(e) = addr.send(to_send).await { log::error!("Failed to send to websocket. Stopping now... {:?}", e); return; } } Err(e) => { log::error!("Failed to read from remote socket. Stopping now... {:?}", e); break; } }; } log::info!("Exited read loop"); }; tokio::spawn(future); } } impl Actor for VNCActor { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { self.hb(ctx); self.start_qemu_to_ws_end(ctx); } } impl StreamHandler> for VNCActor { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(Message::Ping(msg)) => ctx.pong(&msg), Ok(Message::Text(_text)) => { log::error!("Received unexpected text on VNC WebSocket!"); } Ok(Message::Binary(bin)) => { log::info!("Forward {} bytes to VNC server", bin.len()); self.send_to_socket(bin, ctx); } Ok(Message::Continuation(msg)) => match msg { Item::FirstText(_) => { log::error!("Received unexpected split text!"); ctx.close(None); } Item::FirstBinary(bin) | Item::Continue(bin) | Item::Last(bin) => { self.send_to_socket(bin, ctx); } }, Ok(Message::Pong(_)) => { log::trace!("Received PONG message"); self.hb = Instant::now(); } Ok(Message::Close(r)) => { log::info!("WebSocket closed. Reason={r:?}"); ctx.close(r); } Ok(Message::Nop) => { log::debug!("Received Nop message") } Err(e) => { log::error!("WebSocket protocol error! {e}"); ctx.close(None) } } } } #[derive(actix::Message)] #[rtype(result = "()")] pub struct SendBytesReq(Vec); impl Handler for VNCActor { type Result = (); fn handle(&mut self, msg: SendBytesReq, ctx: &mut Self::Context) -> Self::Result { log::trace!("Send {} bytes to WS", msg.0.len()); ctx.binary(msg.0); } } #[derive(actix::Message)] #[rtype(result = "()")] pub struct CloseWebSocketReq; impl Handler for VNCActor { type Result = (); fn handle(&mut self, _msg: CloseWebSocketReq, ctx: &mut Self::Context) -> Self::Result { log::trace!("Close websocket, because VNC socket has terminated"); ctx.close(None); } }