Migrate from actix-web-actor to actix-ws
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-03-20 22:08:02 +01:00
parent c0690d888e
commit 67549d54a3
6 changed files with 178 additions and 235 deletions

View File

@ -1,3 +1,3 @@
pub mod libvirt_actor;
pub mod vnc_actor;
pub mod vnc_handler;
pub mod vnc_tokens_actor;

View File

@ -1,209 +0,0 @@
use actix::{Actor, ActorContext, AsyncContext, Handler, StreamHandler};
use actix_http::ws::Item;
use actix_web_actors::ws;
use actix_web_actors::ws::Message;
use bytes::Bytes;
use image::EncodableLayout;
use std::path::Path;
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::UnixStream;
/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(20);
#[derive(thiserror::Error, Debug)]
enum VNCError {
#[error("Socket file does not exists!")]
SocketDoesNotExists,
}
pub struct VNCActor {
/// Qemu -> WS
read_half: Option<OwnedReadHalf>,
/// WS -> Qemu
write_half: OwnedWriteHalf,
// Client must respond to ping at a specific interval, otherwise we drop connection
hb: Instant,
}
impl VNCActor {
pub async fn new(socket_path: &str) -> anyhow::Result<Self> {
let socket_path = Path::new(socket_path);
if !socket_path.exists() {
return Err(VNCError::SocketDoesNotExists.into());
}
let socket = UnixStream::connect(socket_path).await?;
let (read_half, write_half) = socket.into_split();
Ok(Self {
read_half: Some(read_half),
write_half,
hb: Instant::now(),
})
}
/// helper method that sends ping to client every second.
///
/// also this method checks heartbeats from client
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// heartbeat timed out
log::warn!("WebSocket Client heartbeat failed, disconnecting!");
ctx.stop();
return;
}
ctx.ping(b"");
});
}
fn send_to_socket(&mut self, bytes: Bytes, ctx: &mut ws::WebsocketContext<Self>) {
log::trace!("Received {} bytes for VNC socket", bytes.len());
if let Err(e) = futures::executor::block_on(self.write_half.write(bytes.as_bytes())) {
log::error!("Failed to relay bytes to VNC socket {e}");
ctx.close(None);
}
}
fn start_qemu_to_ws_end(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
let mut read_half = self.read_half.take().unwrap();
let addr = ctx.address();
let future = async move {
let mut buff: [u8; 5000] = [0; 5000];
loop {
match read_half.read(&mut buff).await {
Ok(mut l) => {
if l == 0 {
log::warn!("Got empty read!");
// Ugly hack made to wait for next byte
let mut one_byte_buff: [u8; 1] = [0; 1];
match read_half.read_exact(&mut one_byte_buff).await {
Ok(b) => {
if b == 0 {
log::error!("Did not get a byte !");
let _ = addr.send(CloseWebSocketReq).await;
break;
}
buff[0] = one_byte_buff[0];
l = 1;
}
Err(e) => {
log::error!("Failed to read 1 BYTE from remote socket. Stopping now... {:?}", e);
break;
}
}
}
let to_send = SendBytesReq(Vec::from(&buff[0..l]));
if let Err(e) = addr.send(to_send).await {
log::error!("Failed to send to websocket. Stopping now... {:?}", e);
return;
}
}
Err(e) => {
log::error!("Failed to read from remote socket. Stopping now... {:?}", e);
break;
}
};
}
log::info!("Exited read loop");
};
tokio::spawn(future);
}
}
impl Actor for VNCActor {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
self.start_qemu_to_ws_end(ctx);
}
}
impl StreamHandler<Result<Message, ws::ProtocolError>> for VNCActor {
fn handle(&mut self, msg: Result<Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(Message::Ping(msg)) => ctx.pong(&msg),
Ok(Message::Text(_text)) => {
log::error!("Received unexpected text on VNC WebSocket!");
}
Ok(Message::Binary(bin)) => {
log::info!("Forward {} bytes to VNC server", bin.len());
self.send_to_socket(bin, ctx);
}
Ok(Message::Continuation(msg)) => match msg {
Item::FirstText(_) => {
log::error!("Received unexpected split text!");
ctx.close(None);
}
Item::FirstBinary(bin) | Item::Continue(bin) | Item::Last(bin) => {
self.send_to_socket(bin, ctx);
}
},
Ok(Message::Pong(_)) => {
log::trace!("Received PONG message");
self.hb = Instant::now();
}
Ok(Message::Close(r)) => {
log::info!("WebSocket closed. Reason={r:?}");
ctx.close(r);
}
Ok(Message::Nop) => {
log::debug!("Received Nop message")
}
Err(e) => {
log::error!("WebSocket protocol error! {e}");
ctx.close(None)
}
}
}
}
#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct SendBytesReq(Vec<u8>);
impl Handler<SendBytesReq> for VNCActor {
type Result = ();
fn handle(&mut self, msg: SendBytesReq, ctx: &mut Self::Context) -> Self::Result {
log::trace!("Send {} bytes to WS", msg.0.len());
ctx.binary(msg.0);
}
}
#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct CloseWebSocketReq;
impl Handler<CloseWebSocketReq> for VNCActor {
type Result = ();
fn handle(&mut self, _msg: CloseWebSocketReq, ctx: &mut Self::Context) -> Self::Result {
log::trace!("Close websocket, because VNC socket has terminated");
ctx.close(None);
}
}

View File

@ -0,0 +1,129 @@
use actix_http::ws::Message;
use futures_util::StreamExt as _;
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
use tokio::select;
use tokio::time::interval;
/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(20);
/// Broadcast text & binary messages received from a client, respond to ping messages, and monitor
/// connection health to detect network issues and free up resources.
pub async fn handle(
mut session: actix_ws::Session,
mut msg_stream: actix_ws::MessageStream,
mut socket: UnixStream,
) {
log::info!("Connected to websocket");
let mut last_heartbeat = Instant::now();
let mut interval = interval(HEARTBEAT_INTERVAL);
let mut buf_socket = [0u8; 1024];
let reason = loop {
// waits for either `msg_stream` to receive a message from the client, the broadcast channel
// to send a message, or the heartbeat interval timer to tick, yielding the value of
// whichever one is ready first
select! {
// heartbeat interval ticked
_tick = interval.tick() => {
// if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!(
"client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
);
break None;
}
// send heartbeat ping
let _ = session.ping(b"").await;
}
msg = msg_stream.next() => {
let msg = match msg {
// received message from WebSocket client
Some(Ok(msg)) => msg,
// client WebSocket stream error
Some(Err(err)) => {
log::error!("{err}");
break None;
}
// client WebSocket stream ended
None => break None
};
log::debug!("msg: {msg:?}");
match msg {
Message::Text(_) => {
log::error!("Received unexpected text on VNC WebSocket!");
}
Message::Binary(bin) => {
log::info!("Forward {} bytes to VNC server", bin.len());
if let Err(e) = socket.write(&bin).await {
log::error!("Failed to relay bytes to VNC socket {e}");
break None;
}
}
Message::Close(reason) => {
break reason;
}
Message::Ping(bytes) => {
last_heartbeat = Instant::now();
let _ = session.pong(&bytes).await;
}
Message::Pong(_) => {
last_heartbeat = Instant::now();
}
Message::Continuation(_) => {
log::warn!("no support for continuation frames");
}
// no-op; ignore
Message::Nop => {}
};
}
// Forward socket packet to WS client
count = socket.read(&mut buf_socket) => {
let count = match count {
Ok(count) => count,
Err(e) => {
log::error!("[VNC] Failed to read from upstream! {e}");
break None;
}
};
if count == 0 {
log::warn!("[VNC] infinite loop (upstream), closing connection");
break None;
}
if let Err(e)=session.binary(buf_socket[0..count].to_vec()).await{
log::error!("[VNC] Failed to forward messages to upstream, will close connection! {e}");
break None
}
}
}
};
// attempt to close connection gracefully
let _ = session.close(reason).await;
log::info!("Disconnected from websocket");
}