diff --git a/sea_battle_backend/src/human_player_ws.rs b/sea_battle_backend/src/human_player_ws.rs index 74328fa..9a7de2c 100644 --- a/sea_battle_backend/src/human_player_ws.rs +++ b/sea_battle_backend/src/human_player_ws.rs @@ -1,7 +1,8 @@ use std::sync::Arc; +use std::time::{Duration, Instant}; -use actix::prelude::*; use actix::{Actor, Handler, StreamHandler}; +use actix::prelude::*; use actix_web_actors::ws; use actix_web_actors::ws::{CloseCode, CloseReason, Message, ProtocolError, WebsocketContext}; use uuid::Uuid; @@ -13,6 +14,11 @@ use crate::game::{AddPlayer, Game}; use crate::human_player::HumanPlayer; use crate::random_bot::RandomBot; +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10); +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(120); + #[derive(Default, Debug)] pub enum StartMode { Bot(GameRules), @@ -65,14 +71,45 @@ pub enum ServerMessage { }, } -#[derive(Default)] pub struct HumanPlayerWS { inner: Option>, pub start_mode: StartMode, - // TODO : add heartbeat stuff + hb: Instant, } +impl Default for HumanPlayerWS { + fn default() -> Self { + Self { + inner: None, + start_mode: Default::default(), + hb: Instant::now(), + } + } +} + + impl HumanPlayerWS { + /// helper method that sends ping to client every second. + /// + /// also this method checks heartbeats from client + fn hb(&self, ctx: &mut ::Context) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // heartbeat timed out + println!("Websocket Client heartbeat failed, disconnecting!"); + + // stop actor + ctx.stop(); + + // don't try to send a ping + return; + } + + ctx.ping(b""); + }); + } + fn send_message(&self, msg: ServerMessage, ctx: &mut ::Context) { ctx.text(serde_json::to_string(&msg).unwrap()); } @@ -82,6 +119,8 @@ impl Actor for HumanPlayerWS { type Context = WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { + self.hb(ctx); + self.send_message(ServerMessage::WaitingForAnotherPlayer, ctx); // Start game, according to appropriate start mode @@ -122,7 +161,10 @@ impl Actor for HumanPlayerWS { impl StreamHandler> for HumanPlayerWS { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { - Ok(Message::Ping(msg)) => ctx.pong(&msg), + Ok(Message::Ping(msg)) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } Ok(Message::Binary(_bin)) => log::warn!("Got unsupported binary message!"), Ok(Message::Text(msg)) => match serde_json::from_str::(&msg) { Ok(msg) => match &self.inner { @@ -141,7 +183,7 @@ impl StreamHandler> for HumanPlayerWS { } Ok(Message::Pong(_)) => { log::info!("Got pong message"); - // TODO : handle pong message + self.hb = Instant::now(); } Ok(Message::Close(reason)) => { log::info!("Client asked to close this socket! reason={:?}", reason);