First successful VNC connection
This commit is contained in:
@ -1,2 +1,3 @@
|
||||
pub mod libvirt_actor;
|
||||
pub mod vnc_actor;
|
||||
pub mod vnc_tokens_actor;
|
||||
|
193
virtweb_backend/src/actors/vnc_actor.rs
Normal file
193
virtweb_backend/src/actors/vnc_actor.rs
Normal file
@ -0,0 +1,193 @@
|
||||
use actix::{Actor, ActorContext, AsyncContext, Handler, StreamHandler};
|
||||
use actix_http::ws::Item;
|
||||
use actix_web_actors::ws;
|
||||
use actix_web_actors::ws::Message;
|
||||
use bytes::Bytes;
|
||||
use image::EncodableLayout;
|
||||
use std::path::Path;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
|
||||
use tokio::net::UnixStream;
|
||||
|
||||
/// How often heartbeat pings are sent
|
||||
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
/// How long before lack of client response causes a timeout
|
||||
const CLIENT_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum VNCError {
|
||||
#[error("Socket file does not exists!")]
|
||||
SocketDoesNotExists,
|
||||
}
|
||||
|
||||
pub struct VNCActor {
|
||||
/// Qemu -> WS
|
||||
read_half: Option<OwnedReadHalf>,
|
||||
|
||||
/// WS -> Qemu
|
||||
write_half: OwnedWriteHalf,
|
||||
|
||||
// Client must respond to ping at a specific interval, otherwise we drop connection
|
||||
hb: Instant,
|
||||
}
|
||||
|
||||
impl VNCActor {
|
||||
pub async fn new(socket_path: &str) -> anyhow::Result<Self> {
|
||||
let socket_path = Path::new(socket_path);
|
||||
|
||||
if !socket_path.exists() {
|
||||
return Err(VNCError::SocketDoesNotExists.into());
|
||||
}
|
||||
|
||||
let socket = UnixStream::connect(socket_path).await?;
|
||||
let (read_half, write_half) = socket.into_split();
|
||||
|
||||
Ok(Self {
|
||||
read_half: Some(read_half),
|
||||
write_half,
|
||||
hb: Instant::now(),
|
||||
})
|
||||
}
|
||||
|
||||
/// helper method that sends ping to client every second.
|
||||
///
|
||||
/// also this method checks heartbeats from client
|
||||
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
|
||||
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
|
||||
// check client heartbeats
|
||||
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
|
||||
// heartbeat timed out
|
||||
log::warn!("WebSocket Client heartbeat failed, disconnecting!");
|
||||
ctx.stop();
|
||||
return;
|
||||
}
|
||||
|
||||
ctx.ping(b"");
|
||||
});
|
||||
}
|
||||
|
||||
fn send_to_socket(&mut self, bytes: Bytes, ctx: &mut ws::WebsocketContext<Self>) {
|
||||
log::trace!("Received {} bytes for VNC socket", bytes.len());
|
||||
|
||||
if let Err(e) = futures::executor::block_on(self.write_half.write(bytes.as_bytes())) {
|
||||
log::error!("Failed to relay bytes to VNC socket {e}");
|
||||
ctx.close(None);
|
||||
}
|
||||
}
|
||||
|
||||
fn start_qemu_to_ws_end(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
|
||||
let mut read_half = self.read_half.take().unwrap();
|
||||
let addr = ctx.address();
|
||||
let future = async move {
|
||||
let mut buff: [u8; 5000] = [0; 5000];
|
||||
loop {
|
||||
match read_half.read(&mut buff).await {
|
||||
Ok(l) => {
|
||||
if l == 0 {
|
||||
log::error!("Got empty read. Closing read end...");
|
||||
addr.do_send(CloseWebSocketReq);
|
||||
return;
|
||||
}
|
||||
|
||||
let to_send = SendBytesReq(Vec::from(&buff[0..l]));
|
||||
if let Err(e) = addr.send(to_send).await {
|
||||
log::error!("Failed to send to websocket. Stopping now... {:?}", e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to read from remote socket. Stopping now... {:?}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
log::info!("Exited read loop");
|
||||
};
|
||||
|
||||
tokio::spawn(future);
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for VNCActor {
|
||||
type Context = ws::WebsocketContext<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
self.hb(ctx);
|
||||
self.start_qemu_to_ws_end(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamHandler<Result<Message, ws::ProtocolError>> for VNCActor {
|
||||
fn handle(&mut self, msg: Result<Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||
match msg {
|
||||
Ok(Message::Ping(msg)) => ctx.pong(&msg),
|
||||
|
||||
Ok(Message::Text(_text)) => {
|
||||
log::error!("Received unexpected text on VNC WebSocket!");
|
||||
}
|
||||
|
||||
Ok(Message::Binary(bin)) => {
|
||||
log::info!("Forward {} bytes to VNC server", bin.len());
|
||||
self.send_to_socket(bin, ctx);
|
||||
}
|
||||
|
||||
Ok(Message::Continuation(msg)) => match msg {
|
||||
Item::FirstText(_) => {
|
||||
log::error!("Received unexpected split text!");
|
||||
ctx.close(None);
|
||||
}
|
||||
Item::FirstBinary(bin) | Item::Continue(bin) | Item::Last(bin) => {
|
||||
self.send_to_socket(bin, ctx);
|
||||
}
|
||||
},
|
||||
|
||||
Ok(Message::Pong(_)) => {
|
||||
log::trace!("Received PONG message");
|
||||
self.hb = Instant::now();
|
||||
}
|
||||
|
||||
Ok(Message::Close(r)) => {
|
||||
log::info!("WebSocket closed. Reason={r:?}");
|
||||
ctx.close(r);
|
||||
}
|
||||
|
||||
Ok(Message::Nop) => {
|
||||
log::debug!("Received Nop message")
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
log::error!("WebSocket protocol error! {e}");
|
||||
ctx.close(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(actix::Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct SendBytesReq(Vec<u8>);
|
||||
|
||||
impl Handler<SendBytesReq> for VNCActor {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: SendBytesReq, ctx: &mut Self::Context) -> Self::Result {
|
||||
log::trace!("Send {} bytes to WS", msg.0.len());
|
||||
ctx.binary(msg.0);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(actix::Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct CloseWebSocketReq;
|
||||
|
||||
impl Handler<CloseWebSocketReq> for VNCActor {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, _msg: CloseWebSocketReq, ctx: &mut Self::Context) -> Self::Result {
|
||||
log::trace!("Close websocket, because VNC socket has terminated");
|
||||
ctx.close(None);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user