Automatically close unresponsive websockets

This commit is contained in:
Pierre HUBERT 2022-08-30 14:47:16 +02:00
parent dff4384bd8
commit a866deb3e4

View File

@ -1,4 +1,5 @@
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant};
use actix::{Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler}; use actix::{Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler};
use actix_web::{Error, HttpRequest, HttpResponse, web}; use actix_web::{Error, HttpRequest, HttpResponse, web};
@ -9,6 +10,12 @@ use tokio::net::TcpStream;
use crate::args::Args; use crate::args::Args;
/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
#[derive(Message)] #[derive(Message)]
#[rtype(result = "bool")] #[rtype(result = "bool")]
pub struct DataForWebSocket(Vec<u8>); pub struct DataForWebSocket(Vec<u8>);
@ -19,15 +26,42 @@ struct RelayWS {
tcp_read: Option<OwnedReadHalf>, tcp_read: Option<OwnedReadHalf>,
tcp_write: OwnedWriteHalf, tcp_write: OwnedWriteHalf,
// TODO : add disconnect after ping timeout // Client must respond to ping at a specific interval, otherwise we drop connection
hb: Instant,
// TODO : handle socket close // TODO : handle socket close
} }
impl RelayWS {
/// helper method that sends ping to client every second.
///
/// also this method checks heartbeats from client
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// heartbeat timed out
log::warn!("WebSocket Client heartbeat failed, disconnecting!");
// stop actor
ctx.stop();
// don't try to send a ping
return;
}
log::debug!("Send ping message...");
ctx.ping(b"");
});
}
}
impl Actor for RelayWS { impl Actor for RelayWS {
type Context = ws::WebsocketContext<Self>; type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
// Start to read on remote socket // Start to read on remote socket
let mut read_half = self.tcp_read.take().unwrap(); let mut read_half = self.tcp_read.take().unwrap();
let addr = ctx.address(); let addr = ctx.address();
@ -67,6 +101,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for RelayWS {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) { fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg { match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Pong(_)) => self.hb = Instant::now(),
Ok(ws::Message::Text(text)) => ctx.text(text), Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Close(_reason)) => ctx.stop(), Ok(ws::Message::Close(_reason)) => ctx.stop(),
Ok(ws::Message::Binary(data)) => { Ok(ws::Message::Binary(data)) => {
@ -125,7 +160,7 @@ pub async fn relay_ws(req: HttpRequest, stream: web::Payload,
} }
}; };
let relay = RelayWS { tcp_read: Some(tcp_read), tcp_write }; let relay = RelayWS { tcp_read: Some(tcp_read), tcp_write, hb: Instant::now() };
let resp = ws::start(relay, &req, stream); let resp = ws::start(relay, &req, stream);
log::info!("Opening new WS connection for {:?} to {}", req.peer_addr(), upstream_addr); log::info!("Opening new WS connection for {:?} to {}", req.peer_addr(), upstream_addr);
resp resp