//! # User Web Socket controller //! //! Handles the WebSocket offered to the users use std::collections::{HashMap, HashSet}; use std::time::{Duration, Instant}; use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Running, StreamHandler}; use actix::prelude::*; use actix_web_actors::ws; use actix_web_actors::ws::ProtocolError; use serde_json::Value; use crate::api_data::res_get_ws_token::ResGetWsToken; use crate::constants::{USER_LAST_ACTIVITY_REFRESH, WS_ACCESS_TOKEN_LENGTH}; use crate::controllers::user_ws_controller::ws_connections_list::{add_connection, find_connection, get_ws_connections_list, remove_connection}; use crate::data::base_request_handler::BaseRequestHandler; use crate::data::config::conf; use crate::data::error::{ExecError, Res, ResultBoxError}; use crate::data::http_request_handler::HttpRequestHandler; use crate::data::user::UserID; use crate::data::user_token::UserAccessToken; use crate::data::user_ws_connection::UserWsConnection; use crate::data::user_ws_message::UserWsMessage; use crate::data::user_ws_request_handler::{UserWsRequestHandler, UserWsResponseType}; use crate::helpers::{account_helper, events_helper}; use crate::helpers::events_helper::Event; use crate::user_ws_routes::find_user_ws_route; use crate::utils::crypt_utils::rand_str; use crate::utils::date_utils::time; /// How often heartbeat pings are sent const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); /// WebSocket access tokens list mod ws_tokens_list { use std::sync::Arc; use std::sync::Mutex; use crate::constants::WS_ACCESS_TOKEN_LIFETIME; use crate::data::user_token::UserAccessToken; use crate::utils::date_utils::time; #[derive(Debug)] pub struct WsToken { pub time: u64, pub user_token: UserAccessToken, pub ws_token: String, pub incognito: bool, pub remote_ip: String, } lazy_static! { static ref WS_TOKENS: Arc>> = { Arc::new(Mutex::new(Vec::new())) }; } /// Get the list of WebSocket tokens fn get_list() -> Arc>> { (*WS_TOKENS).clone() } /// Remove old entries from the list pub fn clean_list() { let list = get_list(); let mut list = list.lock().unwrap(); while let Some(first) = list.first() { if first.time < time() - WS_ACCESS_TOKEN_LIFETIME { list.remove(0); } else { break; } } } /// Add a new token to the list pub fn add_token(t: WsToken) { get_list().lock().unwrap().push(t) } /// Remove a specific access token from the list & return it pub fn take_access_token(t: String) -> Option { let list = get_list(); let mut list = list.lock().unwrap(); for i in 0..list.len() { if list[i].ws_token == t { return Some(list.remove(i)); } } None } } /// WebSocket connections list mod ws_connections_list { use std::sync::Arc; use std::sync::Mutex; use actix::Addr; use crate::controllers::user_ws_controller::WsSession; use crate::data::user_ws_connection::UserWsConnection; impl UserWsConnection { /// Change some of the properties of the connection pub fn replace(mut self, do_update: H) -> Self where H: FnOnce(&mut Self) { let list = get_ws_connections_list(); let mut list = list.lock().unwrap(); for i in 0..list.len() { if !list[i].session.eq(&self.session) { continue; } do_update(&mut list[i]); self = list[i].clone(); break; } drop(list); self } } lazy_static! { static ref WS_CONNECTIONS: Arc>> = { Arc::new(Mutex::new(Vec::new())) }; } /// Get the list of WebSocket connections pub fn get_ws_connections_list() -> Arc>> { (*WS_CONNECTIONS).clone() } /// Add a new token to the list pub fn add_connection(t: UserWsConnection) { get_ws_connections_list().lock().unwrap().push(t) } /// Find a connection in the list pub fn find_connection(t: Addr) -> Option { get_ws_connections_list() .lock() .unwrap() .iter() .find(|f| f.session == t) .map(|f| f.clone()) } /// Remove a connection from the list pub fn remove_connection(t: Addr) -> Option { let list = get_ws_connections_list(); let mut list = list.lock().unwrap(); for i in 0..list.len() { if list[i].session == t { return Some(list.remove(i)); } } None } } /// Get a WebSocket access token pub fn get_token(r: &mut HttpRequestHandler) -> ResultBoxError { ws_tokens_list::clean_list(); let access_token = rand_str(WS_ACCESS_TOKEN_LENGTH); let token = ws_tokens_list::WsToken { user_token: r.some_or_internal_error(r.user_access_token().cloned(), "No access token!")?, time: time(), ws_token: access_token.to_string(), incognito: r.post_bool_opt("incognito", false), remote_ip: r.remote_ip(), }; ws_tokens_list::add_token(token); r.set_response(ResGetWsToken::new(access_token)) } #[derive(Debug)] pub struct WsSession { // NOTE : apart from hb, the values here won't change ! // Information about user connection user_token: UserAccessToken, // Remote IP address remote_ip: String, // Check if the client is in incognito mode incognito: bool, // Client must respond to ping at a specific interval, otherwise we drop connection hb: Instant, } impl WsSession { /// helper method that sends ping to client every second. /// /// also this method checks heartbeats from client fn hb(&self, ctx: &mut actix_web_actors::ws::WebsocketContext) { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { // check client heartbeats if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { // heartbeat timed out println!("WebSocket Client heartbeat failed, disconnecting!"); // stop actor ctx.stop(); // don't try to send a ping return; } ctx.ping(b""); }); } /// Helper method that update user last activity at every specified amount of time fn user_activity(&self, ctx: &mut actix_web_actors::ws::WebsocketContext) { if !self.incognito && account_helper::update_last_activity(&self.user_token.user_id).is_err() { eprintln!("Failed to do initial refresh of last activity for user {} !", self.user_token.user_id.id()); } ctx.run_interval(USER_LAST_ACTIVITY_REFRESH, |_, ctx| { if let Some(conn) = find_connection(ctx.address()) { if !conn.incognito && account_helper::update_last_activity(conn.user_id()).is_err() { eprintln!("Failed to refresh last activity for user {} !", conn.user_id().id()); } } }); } /// Handle incoming message fn handle_message(&self, ctx: &mut ws::WebsocketContext, msg: &str) -> Res { let incoming_msg: UserWsMessage = serde_json::from_str(&msg)?; let data = incoming_msg.data.as_object() .ok_or(ExecError::boxed_new("Could not parse values!"))?; let mut args = HashMap::new(); for (k, v) in data { args.insert(k.to_string(), match v { Value::Null => "null".to_string(), Value::Bool(b) => b.to_string(), Value::Number(n) => n.to_string(), Value::String(s) => s.to_string(), _ => "invalid".to_string() }); } let mut handler = UserWsRequestHandler::new( &find_connection(ctx.address()).ok_or(ExecError::boxed_new("Connection not found!"))?, args, ); let result = match find_user_ws_route(&incoming_msg.title) { None => { handler.not_found("Route not found!".to_string()) } Some(r) => { (r.handler)(&mut handler) } }; if !handler.has_response() { match result { Ok(_) => handler.success("Request successful").unwrap(), Err(e) => { println!("WS request error: {}", &e); handler.internal_error(e).unwrap_err(); } } } let response = handler.response(); Ok(UserWsMessage { id: incoming_msg.id, title: match response.r#type { UserWsResponseType::SUCCESS => "success".to_string(), UserWsResponseType::ERROR => "error".to_string(), }, data: response.content, }) } } impl Actor for WsSession { type Context = actix_web_actors::ws::WebsocketContext; /// Method is called on actor start. fn started(&mut self, ctx: &mut Self::Context) { // we'll start heartbeat process on session start. self.hb(ctx); self.user_activity(ctx); add_connection(UserWsConnection { user_token: self.user_token.clone(), remote_ip: self.remote_ip.clone(), session: ctx.address(), incognito: self.incognito, conversations: HashSet::new(), posts: HashSet::new(), active_call: None, }) } fn stopping(&mut self, ctx: &mut Self::Context) -> Running { // Send an event (user_ws_closed) if let Some(conn) = find_connection(ctx.address()) { if let Err(e) = events_helper::propagate_event(&Event::UserWsClosed(&conn)) { eprintln!("Failed to propagate web socket closed event ! {:#?}", e); } } remove_connection(ctx.address()); Running::Stop } } impl StreamHandler> for WsSession { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { Err(_) => { ctx.stop(); return; } Ok(msg) => msg, }; if conf().verbose_mode { println!("USER WEBSOCKET MESSAGE: {:?}", msg); } match msg { ws::Message::Ping(msg) => { self.hb = Instant::now(); ctx.pong(&msg); } ws::Message::Pong(_) => { self.hb = Instant::now(); } ws::Message::Text(msg) => { match self.handle_message(ctx, &msg) { Ok(msg) => { let response = serde_json::to_string(&msg) .unwrap_or("Failed to serialize".to_string()); if conf().verbose_mode { println!("USER WEBSOCKET RESPONSE {}", response); } ctx.text(response) } Err(e) => { println!("WS processing error: {}", e); ctx.text("Failed to parse message"); } } } ws::Message::Binary(_) => { ctx.text("WS is text only!") } ws::Message::Close(_) => { ctx.stop(); } ws::Message::Continuation(_) => { ctx.stop(); } ws::Message::Nop => () } } } #[derive(Message)] #[rtype(result = "()")] pub struct WsQueuedMessage(String); #[derive(Message)] #[rtype(result = "()")] pub struct WsCloseConnection(); impl Handler for WsSession { type Result = (); fn handle(&mut self, msg: WsQueuedMessage, ctx: &mut Self::Context) -> Self::Result { ctx.text(msg.0) } } impl Handler for WsSession { type Result = (); fn handle(&mut self, _: WsCloseConnection, ctx: &mut Self::Context) -> Self::Result { ctx.close(None) } } /// Main WebSocket route pub async fn ws_route( req: actix_web::HttpRequest, stream: actix_web::web::Payload, ) -> Result { ws_tokens_list::clean_list(); // Extract token let query = req.uri().query().unwrap_or(""); if !query.starts_with("token=") { return Ok(actix_web::HttpResponse::BadRequest().body("No token specified!")); } let token = query.replace("token=", ""); // Check access token let token = ws_tokens_list::take_access_token(token) .ok_or(actix_web::error::ErrorUnauthorized("Invalid access token!"))?; actix_web_actors::ws::start( WsSession { remote_ip: token.remote_ip, user_token: token.user_token, hb: std::time::Instant::now(), incognito: token.incognito, }, &req, stream, ) } /// Send a message to a specific connection fn send_message(session: Addr, msg: &UserWsMessage) -> Res { session.do_send(WsQueuedMessage(serde_json::to_string(msg)?)); Ok(()) } /// Send a message to specific users pub fn send_message_to_users(msg: &UserWsMessage, users: &Vec) -> Res { let connections = get_ws_connections_list() .lock() .unwrap() .iter() .filter(|f| users.contains(f.user_id())) .map(|f| f.session.clone()) .collect::>>(); for con in connections { send_message(con, msg)?; } Ok(()) } /// Send a message to a specific user pub fn send_message_to_user(msg: &UserWsMessage, user: &UserID) -> Res { let connections = get_ws_connections_list() .lock() .unwrap() .iter() .filter(|f| user == f.user_id()) .map(|f| f.session.clone()) .collect::>>(); for con in connections { send_message(con, msg)?; } Ok(()) } /// Send a message to a specific connection pub fn send_to_client(conn: &UserWsConnection, msg: &UserWsMessage) -> Res { send_message(conn.session.clone(), msg) } /// Send a message to specific users pub fn send_message_to_specific_connections(filter: F, msg_generator: M, after_send: Option) -> Res where F: Fn(&UserWsConnection) -> bool, M: Fn(&UserWsConnection) -> Res, A: Fn(&UserWsConnection) -> Res { let connections = get_ws_connections_list() .lock() .unwrap() .iter() .filter(|f| filter(f)) .map(|f| f.clone()) .collect::>(); for con in connections { send_message(con.session.clone(), &msg_generator(&con)?)?; if let Some(cb) = &after_send { cb(&con)?; } } Ok(()) } /// Check out whether user is connected or not pub fn is_user_connected(user_id: &UserID) -> bool { get_ws_connections_list().lock().unwrap().iter().any(|c| c.user_id() == user_id) } /// Check out whether user is connected or not and has at list one not incognito connection pub fn is_user_connected_not_incognito(user_id: &UserID) -> bool { get_ws_connections_list().lock().unwrap().iter().any(|c| c.user_id() == user_id && !c.incognito) } /// Disconnect a user from all the WebSockets of a given access token pub fn disconnect_from_user_token(token: &UserAccessToken) -> Res { let connections = get_ws_connections_list() .lock() .unwrap() .iter() .filter(|f| &f.user_token == token) .map(|f| f.session.clone()) .collect::>>(); for c in connections { c.do_send(WsCloseConnection {}); } Ok(()) } /// Disconnect a user from all its WebSocket pub fn disconnect_user_from_all_sockets(user_id: &UserID) -> Res { let connections = get_ws_connections_list() .lock() .unwrap() .iter() .filter(|f| f.user_id() == user_id) .map(|f| f.session.clone()) .collect::>>(); for c in connections { c.do_send(WsCloseConnection {}); } Ok(()) } /// Do something with all active connections pub fn foreach_connection(mut f: F) -> Res where F: FnMut(&UserWsConnection) -> Res { let list = get_ws_connections_list().lock().unwrap().clone(); for conn in list.iter() { f(conn)?; } Ok(()) } /// Events handler pub fn handle_event(e: &events_helper::Event) -> Res { match e { Event::DestroyedLoginToken(token) => { disconnect_from_user_token(token)?; } _ => {} } Ok(()) }