210 lines
6.7 KiB
Rust
210 lines
6.7 KiB
Rust
use actix::{Actor, ActorContext, AsyncContext, Handler, StreamHandler};
|
|
use actix_http::ws::Item;
|
|
use actix_web_actors::ws;
|
|
use actix_web_actors::ws::Message;
|
|
use bytes::Bytes;
|
|
use image::EncodableLayout;
|
|
use std::path::Path;
|
|
use std::time::{Duration, Instant};
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
|
|
use tokio::net::UnixStream;
|
|
|
|
/// How often heartbeat pings are sent
|
|
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
|
|
|
|
/// How long before lack of client response causes a timeout
|
|
const CLIENT_TIMEOUT: Duration = Duration::from_secs(20);
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
enum VNCError {
|
|
#[error("Socket file does not exists!")]
|
|
SocketDoesNotExists,
|
|
}
|
|
|
|
pub struct VNCActor {
|
|
/// Qemu -> WS
|
|
read_half: Option<OwnedReadHalf>,
|
|
|
|
/// WS -> Qemu
|
|
write_half: OwnedWriteHalf,
|
|
|
|
// Client must respond to ping at a specific interval, otherwise we drop connection
|
|
hb: Instant,
|
|
}
|
|
|
|
impl VNCActor {
|
|
pub async fn new(socket_path: &str) -> anyhow::Result<Self> {
|
|
let socket_path = Path::new(socket_path);
|
|
|
|
if !socket_path.exists() {
|
|
return Err(VNCError::SocketDoesNotExists.into());
|
|
}
|
|
|
|
let socket = UnixStream::connect(socket_path).await?;
|
|
let (read_half, write_half) = socket.into_split();
|
|
|
|
Ok(Self {
|
|
read_half: Some(read_half),
|
|
write_half,
|
|
hb: Instant::now(),
|
|
})
|
|
}
|
|
|
|
/// helper method that sends ping to client every second.
|
|
///
|
|
/// also this method checks heartbeats from client
|
|
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
|
|
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
|
|
// check client heartbeats
|
|
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
|
|
// heartbeat timed out
|
|
log::warn!("WebSocket Client heartbeat failed, disconnecting!");
|
|
ctx.stop();
|
|
return;
|
|
}
|
|
|
|
ctx.ping(b"");
|
|
});
|
|
}
|
|
|
|
fn send_to_socket(&mut self, bytes: Bytes, ctx: &mut ws::WebsocketContext<Self>) {
|
|
log::trace!("Received {} bytes for VNC socket", bytes.len());
|
|
|
|
if let Err(e) = futures::executor::block_on(self.write_half.write(bytes.as_bytes())) {
|
|
log::error!("Failed to relay bytes to VNC socket {e}");
|
|
ctx.close(None);
|
|
}
|
|
}
|
|
|
|
fn start_qemu_to_ws_end(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
|
|
let mut read_half = self.read_half.take().unwrap();
|
|
let addr = ctx.address();
|
|
let future = async move {
|
|
let mut buff: [u8; 5000] = [0; 5000];
|
|
loop {
|
|
match read_half.read(&mut buff).await {
|
|
Ok(mut l) => {
|
|
if l == 0 {
|
|
log::warn!("Got empty read!");
|
|
|
|
// Ugly hack made to wait for next byte
|
|
let mut one_byte_buff: [u8; 1] = [0; 1];
|
|
match read_half.read_exact(&mut one_byte_buff).await {
|
|
Ok(b) => {
|
|
if b == 0 {
|
|
log::error!("Did not get a byte !");
|
|
let _ = addr.send(CloseWebSocketReq).await;
|
|
break;
|
|
}
|
|
buff[0] = one_byte_buff[0];
|
|
l = 1;
|
|
}
|
|
Err(e) => {
|
|
log::error!("Failed to read 1 BYTE from remote socket. Stopping now... {:?}", e);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
let to_send = SendBytesReq(Vec::from(&buff[0..l]));
|
|
if let Err(e) = addr.send(to_send).await {
|
|
log::error!("Failed to send to websocket. Stopping now... {:?}", e);
|
|
return;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
log::error!("Failed to read from remote socket. Stopping now... {:?}", e);
|
|
break;
|
|
}
|
|
};
|
|
}
|
|
|
|
log::info!("Exited read loop");
|
|
};
|
|
|
|
tokio::spawn(future);
|
|
}
|
|
}
|
|
|
|
impl Actor for VNCActor {
|
|
type Context = ws::WebsocketContext<Self>;
|
|
|
|
fn started(&mut self, ctx: &mut Self::Context) {
|
|
self.hb(ctx);
|
|
self.start_qemu_to_ws_end(ctx);
|
|
}
|
|
}
|
|
|
|
impl StreamHandler<Result<Message, ws::ProtocolError>> for VNCActor {
|
|
fn handle(&mut self, msg: Result<Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
|
match msg {
|
|
Ok(Message::Ping(msg)) => ctx.pong(&msg),
|
|
|
|
Ok(Message::Text(_text)) => {
|
|
log::error!("Received unexpected text on VNC WebSocket!");
|
|
}
|
|
|
|
Ok(Message::Binary(bin)) => {
|
|
log::info!("Forward {} bytes to VNC server", bin.len());
|
|
self.send_to_socket(bin, ctx);
|
|
}
|
|
|
|
Ok(Message::Continuation(msg)) => match msg {
|
|
Item::FirstText(_) => {
|
|
log::error!("Received unexpected split text!");
|
|
ctx.close(None);
|
|
}
|
|
Item::FirstBinary(bin) | Item::Continue(bin) | Item::Last(bin) => {
|
|
self.send_to_socket(bin, ctx);
|
|
}
|
|
},
|
|
|
|
Ok(Message::Pong(_)) => {
|
|
log::trace!("Received PONG message");
|
|
self.hb = Instant::now();
|
|
}
|
|
|
|
Ok(Message::Close(r)) => {
|
|
log::info!("WebSocket closed. Reason={r:?}");
|
|
ctx.close(r);
|
|
}
|
|
|
|
Ok(Message::Nop) => {
|
|
log::debug!("Received Nop message")
|
|
}
|
|
|
|
Err(e) => {
|
|
log::error!("WebSocket protocol error! {e}");
|
|
ctx.close(None)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(actix::Message)]
|
|
#[rtype(result = "()")]
|
|
pub struct SendBytesReq(Vec<u8>);
|
|
|
|
impl Handler<SendBytesReq> for VNCActor {
|
|
type Result = ();
|
|
|
|
fn handle(&mut self, msg: SendBytesReq, ctx: &mut Self::Context) -> Self::Result {
|
|
log::trace!("Send {} bytes to WS", msg.0.len());
|
|
ctx.binary(msg.0);
|
|
}
|
|
}
|
|
|
|
#[derive(actix::Message)]
|
|
#[rtype(result = "()")]
|
|
pub struct CloseWebSocketReq;
|
|
|
|
impl Handler<CloseWebSocketReq> for VNCActor {
|
|
type Result = ();
|
|
|
|
fn handle(&mut self, _msg: CloseWebSocketReq, ctx: &mut Self::Context) -> Self::Result {
|
|
log::trace!("Close websocket, because VNC socket has terminated");
|
|
ctx.close(None);
|
|
}
|
|
}
|