Add basic ping-pong websocket

This commit is contained in:
2025-02-06 21:40:11 +01:00
parent c573d2f74a
commit 4ff72e073e
6 changed files with 151 additions and 4 deletions

View File

@ -1,3 +1,5 @@
use std::time::Duration;
/// Session key for OpenID login state
pub const STATE_KEY: &str = "oidc-state";
@ -6,3 +8,11 @@ pub const USER_SESSION_KEY: &str = "user";
/// Token length
pub const TOKEN_LEN: usize = 20;
/// How often heartbeat pings are sent.
///
/// Should be half (or less) of the acceptable client timeout.
pub const WS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout.
pub const WS_CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

View File

@ -48,6 +48,7 @@ async fn main() -> std::io::Result<()> {
.route("/api", web::get().to(api::api_home))
.route("/api", web::post().to(api::api_home))
.route("/api/account/whoami", web::get().to(api::account::who_am_i))
.service(web::resource("/api/ws").route(web::get().to(api::ws)))
})
.bind(&AppConfig::get().listen_address)?
.run()

View File

@ -1,6 +1,13 @@
use crate::constants::{WS_CLIENT_TIMEOUT, WS_HEARTBEAT_INTERVAL};
use crate::extractors::client_auth::APIClientAuth;
use crate::server::HttpResult;
use actix_web::HttpResponse;
use actix_web::dev::Payload;
use actix_web::{web, FromRequest, HttpRequest, HttpResponse};
use actix_ws::Message;
use futures_util::future::Either;
use futures_util::{future, StreamExt};
use std::time::Instant;
use tokio::{pin, time::interval};
pub mod account;
@ -8,3 +15,98 @@ pub mod account;
pub async fn api_home(auth: APIClientAuth) -> HttpResult {
Ok(HttpResponse::Ok().body(format!("Welcome user {}!", auth.user.user_id.0)))
}
/// Main WS route
pub async fn ws(req: HttpRequest, stream: web::Payload) -> HttpResult {
// Forcefully ignore request payload by manually extracting authentication information
let auth = APIClientAuth::from_request(&req, &mut Payload::None).await?;
let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
// spawn websocket handler (and don't await it) so that the response is returned immediately
actix_web::rt::spawn(ws_handler(session, msg_stream));
Ok(res)
}
pub async fn ws_handler(mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream) {
log::info!("WS connected");
let mut last_heartbeat = Instant::now();
let mut interval = interval(WS_HEARTBEAT_INTERVAL);
let reason = loop {
// create "next client timeout check" future
let tick = interval.tick();
// required for select()
pin!(tick);
// waits for either `msg_stream` to receive a message from the client or the heartbeat
// interval timer to tick, yielding the value of whichever one is ready first
match future::select(msg_stream.next(), tick).await {
// received message from WebSocket client
Either::Left((Some(Ok(msg)), _)) => {
log::debug!("msg: {msg:?}");
match msg {
Message::Text(text) => {
session.text(text).await.unwrap();
}
Message::Binary(bin) => {
session.binary(bin).await.unwrap();
}
Message::Close(reason) => {
break reason;
}
Message::Ping(bytes) => {
last_heartbeat = Instant::now();
let _ = session.pong(&bytes).await;
}
Message::Pong(_) => {
last_heartbeat = Instant::now();
}
Message::Continuation(_) => {
log::warn!("no support for continuation frames");
}
// no-op; ignore
Message::Nop => {}
};
}
// client WebSocket stream error
Either::Left((Some(Err(err)), _)) => {
log::error!("{}", err);
break None;
}
// client WebSocket stream ended
Either::Left((None, _)) => break None,
// heartbeat interval ticked
Either::Right((_inst, _)) => {
// if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > WS_CLIENT_TIMEOUT {
log::info!(
"client has not sent heartbeat in over {WS_CLIENT_TIMEOUT:?}; disconnecting"
);
break None;
}
// send heartbeat ping
let _ = session.ping(b"").await;
}
}
};
// attempt to close connection gracefully
let _ = session.close(reason).await;
log::info!("WS disconnected");
}

View File

@ -12,6 +12,8 @@ pub enum HttpFailure {
Forbidden,
#[error("this resource was not found")]
NotFound,
#[error("Actix web error")]
ActixError(#[from] actix_web::Error),
#[error("an unhandled session insert error occurred")]
SessionInsertError(#[from] actix_session::SessionInsertError),
#[error("an unhandled session error occurred")]