use actix_http::ws::Message; use futures_util::StreamExt as _; use std::time::{Duration, Instant}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::UnixStream; use tokio::select; use tokio::time::interval; /// How often heartbeat pings are sent const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(20); /// Broadcast text & binary messages received from a client, respond to ping messages, and monitor /// connection health to detect network issues and free up resources. pub async fn handle( mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream, mut socket: UnixStream, ) { log::info!("Connected to websocket"); let mut last_heartbeat = Instant::now(); let mut interval = interval(HEARTBEAT_INTERVAL); let mut buf_socket = [0u8; 1024]; let reason = loop { // waits for either `msg_stream` to receive a message from the client, the broadcast channel // to send a message, or the heartbeat interval timer to tick, yielding the value of // whichever one is ready first select! { // heartbeat interval ticked _tick = interval.tick() => { // if no heartbeat ping/pong received recently, close the connection if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { log::info!( "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" ); break None; } // send heartbeat ping let _ = session.ping(b"").await; } msg = msg_stream.next() => { let msg = match msg { // received message from WebSocket client Some(Ok(msg)) => msg, // client WebSocket stream error Some(Err(err)) => { log::error!("{err}"); break None; } // client WebSocket stream ended None => break None }; log::debug!("msg: {msg:?}"); match msg { Message::Text(_) => { log::error!("Received unexpected text on VNC WebSocket!"); } Message::Binary(bin) => { log::info!("Forward {} bytes to VNC server", bin.len()); if let Err(e) = socket.write(&bin).await { log::error!("Failed to relay bytes to VNC socket {e}"); break None; } } Message::Close(reason) => { break reason; } Message::Ping(bytes) => { last_heartbeat = Instant::now(); let _ = session.pong(&bytes).await; } Message::Pong(_) => { last_heartbeat = Instant::now(); } Message::Continuation(_) => { log::warn!("no support for continuation frames"); } // no-op; ignore Message::Nop => {} }; } // Forward socket packet to WS client count = socket.read(&mut buf_socket) => { let count = match count { Ok(count) => count, Err(e) => { log::error!("[VNC] Failed to read from upstream! {e}"); break None; } }; if count == 0 { log::warn!("[VNC] infinite loop (upstream), closing connection"); break None; } if let Err(e)=session.binary(buf_socket[0..count].to_vec()).await{ log::error!("[VNC] Failed to forward messages to upstream, will close connection! {e}"); break None } } } }; // attempt to close connection gracefully let _ = session.close(reason).await; log::info!("Disconnected from websocket"); }