1
0
mirror of https://gitlab.com/comunic/comunicapiv3 synced 2025-06-20 08:25:16 +00:00

Start user WebSocket implementation

This commit is contained in:
2021-02-05 13:21:10 +01:00
parent 82b845c603
commit 5254524c50
12 changed files with 972 additions and 125 deletions

View File

@ -2,7 +2,7 @@ pub mod routes;
pub mod server;
pub mod server_controller;
pub mod user_web_socket_controller;
pub mod user_ws_controller;
pub mod account_controller;
pub mod user_controller;
pub mod settings_controller;
@ -18,4 +18,5 @@ pub mod notifications_controller;
pub mod movies_controller;
pub mod virtual_directory_controller;
pub mod web_app_controller;
pub mod calls_controller;
pub mod calls_controller;
pub mod ws_routes;

View File

@ -1,6 +1,6 @@
use std::error::Error;
use crate::controllers::{account_controller, calls_controller, comments_controller, conversations_controller, friends_controller, groups_controller, likes_controller, movies_controller, notifications_controller, posts_controller, search_controller, server_controller, settings_controller, surveys_controller, user_controller, user_web_socket_controller, virtual_directory_controller, web_app_controller};
use crate::controllers::{account_controller, calls_controller, comments_controller, conversations_controller, friends_controller, groups_controller, likes_controller, movies_controller, notifications_controller, posts_controller, search_controller, server_controller, settings_controller, surveys_controller, user_controller, user_ws_controller, virtual_directory_controller, web_app_controller};
use crate::controllers::routes::Method::{GET, POST};
use crate::data::http_request_handler::HttpRequestHandler;
@ -129,7 +129,7 @@ pub fn get_routes() -> Vec<Route> {
Route::get_without_login("/", Box::new(server_controller::main_index)),
// Main user WebSocket
Route::post("/ws/token", Box::new(user_web_socket_controller::get_token)),
Route::post("/ws/token", Box::new(user_ws_controller::get_token)),
// Account controller
Route::limited_post_without_login("/account/create", Box::new(account_controller::create), LimitPolicy::SUCCESS(10)),

View File

@ -16,6 +16,7 @@ use crate::api_data::http_error::HttpError;
use crate::constants::MAX_REQUEST_SIZE;
use crate::controllers::routes::{get_routes, RequestResult, Route};
use crate::controllers::routes::Method::{GET, POST};
use crate::controllers::user_ws_controller;
use crate::data::base_request_handler::{BaseRequestHandler, PostFile, RequestValue};
use crate::data::config::Config;
use crate::data::http_request_handler::HttpRequestHandler;
@ -313,6 +314,11 @@ pub async fn start_server(conf: &Config) -> std::io::Result<()> {
HttpServer::new(|| {
App::new()
// User WebSocket route
.service(actix_web::web::resource("/ws").to(user_ws_controller::ws_route))
// API routes
.route("**", web::get().to(process_request))
.route("**", web::post().to(process_request))
}).bind(&addr)?.run().await

View File

@ -1,92 +0,0 @@
//! # User Web Socket controller
//!
//! Handles the WebSocket offered to the users
use crate::api_data::res_get_ws_token::ResGetWsToken;
use crate::constants::WS_ACCESS_TOKEN_LENGTH;
use crate::data::base_request_handler::BaseRequestHandler;
use crate::data::error::ResultBoxError;
use crate::data::http_request_handler::HttpRequestHandler;
use crate::utils::crypt_utils::rand_str;
use crate::utils::date_utils::time;
/// WebSocket access tokens list
mod ws_tokens_list {
use std::sync::Arc;
use std::sync::Mutex;
use crate::constants::WS_ACCESS_TOKEN_LIFETIME;
use crate::data::user::UserID;
use crate::utils::date_utils::time;
#[derive(Debug)]
pub struct WsToken {
pub time: u64,
pub client_id: u32,
pub user_id: UserID,
pub token: String,
pub incognito: bool,
}
lazy_static! {
static ref WS_TOKENS: Arc<Mutex<Vec<WsToken>>> = {
Arc::new(Mutex::new(Vec::new()))
};
}
/// Get the list of WebSocket tokens
fn get_list() -> Arc<Mutex<Vec<WsToken>>> {
(*WS_TOKENS).clone()
}
/// Remove old entries from the list
pub fn clean_list() {
let list = get_list();
let mut list = list.lock().unwrap();
while let Some(first) = list.first() {
if first.time < time() - WS_ACCESS_TOKEN_LIFETIME {
list.remove(0);
} else {
break;
}
}
}
/// Add a new token to the list
pub fn add_token(t: WsToken) {
get_list().lock().unwrap().push(t)
}
/// Remove a specific access token from the list & return it
pub fn take_access_token(t: String) -> Option<WsToken> {
let list = get_list();
let mut list = list.lock().unwrap();
for i in 0..list.len() {
if list[i].token == t {
return Some(list.remove(i));
}
}
None
}
}
/// Get a WebSocket access token
pub fn get_token(r: &mut HttpRequestHandler) -> ResultBoxError {
ws_tokens_list::clean_list();
let access_token = rand_str(WS_ACCESS_TOKEN_LENGTH);
let token = ws_tokens_list::WsToken {
user_id: r.user_id()?,
client_id: r.api_client().id,
time: time(),
token: access_token.to_string(),
incognito: r.post_bool_opt("incognito", false),
};
ws_tokens_list::add_token(token);
r.set_response(ResGetWsToken::new(access_token))
}

View File

@ -0,0 +1,452 @@
//! # User Web Socket controller
//!
//! Handles the WebSocket offered to the users
use std::collections::HashMap;
use std::time::{Duration, Instant};
use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Running, StreamHandler};
use actix::prelude::*;
use actix_web_actors::ws;
use actix_web_actors::ws::ProtocolError;
use serde_json::Value;
use crate::api_data::res_get_ws_token::ResGetWsToken;
use crate::constants::WS_ACCESS_TOKEN_LENGTH;
use crate::controllers::user_ws_controller::ws_connections_list::{add_connection, find_connection, get_ws_connections_list, remove_connection};
pub use crate::controllers::user_ws_controller::ws_connections_list::WsConnection;
use crate::controllers::ws_routes::find_ws_route;
use crate::data::base_request_handler::BaseRequestHandler;
use crate::data::config::conf;
use crate::data::error::{ExecError, Res, ResultBoxError};
use crate::data::http_request_handler::HttpRequestHandler;
use crate::data::user::UserID;
use crate::data::user_ws_request_handler::{WsRequestHandler, WsResponseType};
use crate::data::ws_message::WsMessage;
use crate::utils::crypt_utils::rand_str;
use crate::utils::date_utils::time;
/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
/// WebSocket access tokens list
mod ws_tokens_list {
use std::sync::Arc;
use std::sync::Mutex;
use crate::constants::WS_ACCESS_TOKEN_LIFETIME;
use crate::data::user::UserID;
use crate::utils::date_utils::time;
#[derive(Debug)]
pub struct WsToken {
pub time: u64,
pub client_id: u32,
pub user_id: UserID,
pub token: String,
pub incognito: bool,
pub remote_ip: String,
}
lazy_static! {
static ref WS_TOKENS: Arc<Mutex<Vec<WsToken>>> = {
Arc::new(Mutex::new(Vec::new()))
};
}
/// Get the list of WebSocket tokens
fn get_list() -> Arc<Mutex<Vec<WsToken>>> {
(*WS_TOKENS).clone()
}
/// Remove old entries from the list
pub fn clean_list() {
let list = get_list();
let mut list = list.lock().unwrap();
while let Some(first) = list.first() {
if first.time < time() - WS_ACCESS_TOKEN_LIFETIME {
list.remove(0);
} else {
break;
}
}
}
/// Add a new token to the list
pub fn add_token(t: WsToken) {
get_list().lock().unwrap().push(t)
}
/// Remove a specific access token from the list & return it
pub fn take_access_token(t: String) -> Option<WsToken> {
let list = get_list();
let mut list = list.lock().unwrap();
for i in 0..list.len() {
if list[i].token == t {
return Some(list.remove(i));
}
}
None
}
}
/// WebSocket connections list
mod ws_connections_list {
use std::sync::Arc;
use std::sync::Mutex;
use actix::Addr;
use crate::controllers::user_ws_controller::WsSession;
use crate::data::user::UserID;
#[derive(Clone, Debug)]
pub struct WsConnection {
pub user_id: UserID,
pub remote_ip: String,
pub session: actix::Addr<WsSession>,
}
lazy_static! {
static ref WS_CONNECTIONS: Arc<Mutex<Vec<WsConnection>>> = {
Arc::new(Mutex::new(Vec::new()))
};
}
/// Get the list of WebSocket connections
pub fn get_ws_connections_list() -> Arc<Mutex<Vec<WsConnection>>> {
(*WS_CONNECTIONS).clone()
}
/// Add a new token to the list
pub fn add_connection(t: WsConnection) {
get_ws_connections_list().lock().unwrap().push(t)
}
/// Find a connection in the list
pub fn find_connection(t: Addr<WsSession>) -> Option<WsConnection> {
get_ws_connections_list()
.lock()
.unwrap()
.iter()
.find(|f| f.session == t)
.map(|f| f.clone())
}
/// Remove a connection from the list
pub fn remove_connection(t: Addr<WsSession>) -> Option<WsConnection> {
let list = get_ws_connections_list();
let mut list = list.lock().unwrap();
for i in 0..list.len() {
if list[i].session == t {
return Some(list.remove(i));
}
}
None
}
}
/// Get a WebSocket access token
pub fn get_token(r: &mut HttpRequestHandler) -> ResultBoxError {
ws_tokens_list::clean_list();
let access_token = rand_str(WS_ACCESS_TOKEN_LENGTH);
let token = ws_tokens_list::WsToken {
user_id: r.user_id()?,
client_id: r.api_client().id,
time: time(),
token: access_token.to_string(),
incognito: r.post_bool_opt("incognito", false),
remote_ip: r.remote_ip(),
};
ws_tokens_list::add_token(token);
r.set_response(ResGetWsToken::new(access_token))
}
#[derive(Debug)]
pub struct WsSession {
user_id: UserID,
// Client used for the connection
client_id: u32,
// Remote IP address
remote_ip: String,
// Check if the client is in incognito mode
incognito: bool,
// Client must respond to ping at a specific interval, otherwise we drop connection
hb: Instant,
}
impl WsSession {
/// helper method that sends ping to client every second.
///
/// also this method checks heartbeats from client
fn hb(&self, ctx: &mut actix_web_actors::ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// heartbeat timed out
println!("WebSocket Client heartbeat failed, disconnecting!");
// stop actor
ctx.stop();
// don't try to send a ping
return;
}
ctx.ping(b"");
});
}
/// Handle incoming message
fn handle_message(&self, ctx: &mut ws::WebsocketContext<Self>, msg: &str) -> Res<WsMessage> {
let incoming_msg: WsMessage = serde_json::from_str(&msg)?;
let data = incoming_msg.data.as_object()
.ok_or(ExecError::boxed_new("Could not parse values!"))?;
let mut args = HashMap::new();
for (k, v) in data {
args.insert(k.to_string(), match v {
Value::Null => "null".to_string(),
Value::Bool(b) => b.to_string(),
Value::Number(n) => n.to_string(),
Value::String(s) => s.to_string(),
_ => "invalid".to_string()
});
}
let mut handler = WsRequestHandler::new(
&find_connection(ctx.address()).ok_or(ExecError::boxed_new("Connection not found!"))?,
args,
);
let result = match find_ws_route(&incoming_msg.title) {
None => {
handler.not_found("Route not found!".to_string())
}
Some(r) => {
(r.handler)(&mut handler)
}
};
if !handler.has_response() {
match result {
Ok(_) => handler.success("Request successful").unwrap(),
Err(e) => {
println!("WS request error: {}", &e);
handler.internal_error(e).unwrap_err();
}
}
}
let response = handler.response();
Ok(WsMessage {
id: incoming_msg.id,
title: match response.r#type {
WsResponseType::SUCCESS => "success".to_string(),
WsResponseType::ERROR => "error".to_string(),
},
data: response.content,
})
}
}
impl Actor for WsSession {
type Context = actix_web_actors::ws::WebsocketContext<Self>;
/// Method is called on actor start.
fn started(&mut self, ctx: &mut Self::Context) {
// we'll start heartbeat process on session start.
self.hb(ctx);
add_connection(WsConnection {
user_id: self.user_id.clone(),
remote_ip: self.remote_ip.clone(),
session: ctx.address(),
})
}
fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
remove_connection(ctx.address());
Running::Stop
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSession {
fn handle(&mut self, msg: Result<ws::Message, ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
Err(_) => {
ctx.stop();
return;
}
Ok(msg) => msg,
};
if conf().database.log_all_queries
{
println!("USER WEBSOCKET MESSAGE: {:?}", msg);
}
match msg {
ws::Message::Ping(msg) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
ws::Message::Pong(_) => {
self.hb = Instant::now();
}
ws::Message::Text(msg) => {
match self.handle_message(ctx, &msg) {
Ok(msg) => {
let response = serde_json::to_string(&msg)
.unwrap_or("Failed to serialize".to_string());
if conf().database.log_all_queries {
println!("USER WEBSOCKET RESPONSE {}", response);
}
ctx.text(response)
}
Err(e) => {
println!("WS processing error: {}", e);
ctx.text("Failed to parse message");
}
}
}
ws::Message::Binary(_) => {
ctx.text("WS is text only!")
}
ws::Message::Close(_) => {
ctx.stop();
}
ws::Message::Continuation(_) => {
ctx.stop();
}
ws::Message::Nop => ()
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct WsQueuedMessage(String);
#[derive(Message)]
#[rtype(result = "()")]
pub struct WsCloseConnection();
impl Handler<WsQueuedMessage> for WsSession {
type Result = ();
fn handle(&mut self, msg: WsQueuedMessage, ctx: &mut Self::Context) -> Self::Result {
ctx.text(msg.0)
}
}
impl Handler<WsCloseConnection> for WsSession {
type Result = ();
fn handle(&mut self, _: WsCloseConnection, ctx: &mut Self::Context) -> Self::Result {
ctx.close(None)
}
}
/// Main WebSocket route
pub async fn ws_route(
req: actix_web::HttpRequest,
stream: actix_web::web::Payload,
) -> Result<actix_web::HttpResponse, actix_web::Error> {
ws_tokens_list::clean_list();
// Extract token
let query = req.uri().query().unwrap_or("");
if !query.starts_with("token=") {
return Ok(actix_web::HttpResponse::BadRequest().body("No token specified!"));
}
let token = query.replace("token=", "");
// Check access token
let token = ws_tokens_list::take_access_token(token)
.ok_or(actix_web::error::ErrorUnauthorized("Invalid access token!"))?;
actix_web_actors::ws::start(
WsSession {
remote_ip: token.remote_ip,
user_id: token.user_id,
hb: std::time::Instant::now(),
incognito: token.incognito,
client_id: token.client_id,
},
&req,
stream,
)
}
/// Send a message to a specific connection
fn send_message(session: Addr<WsSession>, msg: &WsMessage) -> Res {
session.do_send(WsQueuedMessage(serde_json::to_string(msg)?));
Ok(())
}
/// Send a message to specific users
pub fn send_message_to_users(msg: &WsMessage, users: &Vec<UserID>) -> Res {
let connections = get_ws_connections_list()
.lock()
.unwrap()
.iter()
.filter(|f| users.contains(&f.user_id))
.map(|f| f.session.clone())
.collect::<Vec<Addr<WsSession>>>();
for con in connections {
send_message(con, msg)?;
}
Ok(())
}
/// Check out whether user is connected or not
pub fn is_user_connected(user_id: &UserID) -> bool {
get_ws_connections_list().lock().unwrap().iter().any(|c| &c.user_id == user_id)
}
/// Disconnect a user from WebSocket
pub fn disconnect_user_from_all_sockets(user_id: &UserID) -> Res {
let connections = get_ws_connections_list()
.lock()
.unwrap()
.iter()
.filter(|f| &f.user_id == user_id)
.map(|f| f.session.clone())
.collect::<Vec<Addr<WsSession>>>();
for c in connections {
c.do_send(WsCloseConnection {});
}
Ok(())
}

View File

@ -0,0 +1,34 @@
//! # WebSocket routes
//!
//! @author Pierre Hubert
use crate::data::error::Res;
use crate::data::user_ws_request_handler::WsRequestHandler;
pub type WsRequestProcess = Box<dyn Fn(&mut WsRequestHandler) -> Res>;
/// WebSocket route
pub struct WsRoute {
pub route: String,
pub handler: WsRequestProcess,
}
impl WsRoute {
pub fn new<H>(route: &str, handler: H) -> WsRoute
where H: 'static + Fn(&mut WsRequestHandler) -> Res {
WsRoute {
route: route.to_string(),
handler: Box::new(handler),
}
}
}
/// Get the list of available WebSocket routes
pub fn get_ws_routes() -> Vec<WsRoute> {
vec![]
}
/// Search for a route
pub fn find_ws_route(uri: &str) -> Option<WsRoute> {
get_ws_routes().into_iter().find(|r| r.route == uri)
}

View File

@ -12,6 +12,7 @@ use serde::export::Formatter;
/// Simple result type
pub type ResultExecError<E> = Result<E, ExecError>;
pub type ResultBoxError<E = ()> = Result<E, Box<dyn Error>>;
pub type Res<E = ()> = ResultBoxError<E>;
#[derive(Debug, Clone)]
pub struct ExecError(pub String);

View File

@ -3,6 +3,7 @@ pub mod config;
pub mod base_request_handler;
pub mod http_request_handler;
pub mod user_ws_request_handler;
pub mod api_client;
pub mod user;
@ -34,4 +35,5 @@ pub mod survey_response;
pub mod general_settings;
pub mod lang_settings;
pub mod security_settings;
pub mod new_custom_emoji;
pub mod new_custom_emoji;
pub mod ws_message;

View File

@ -0,0 +1,86 @@
//! # User Web Socket Request handler
use std::collections::HashMap;
use serde::Serialize;
use crate::api_data::http_error::HttpError;
use crate::controllers::routes::RequestResult;
use crate::controllers::user_ws_controller::WsConnection;
use crate::data::base_request_handler::{BaseRequestHandler, RequestValue};
use crate::data::error::ResultBoxError;
use crate::data::user::UserID;
pub enum WsResponseType {
SUCCESS,
ERROR,
}
pub struct WsResponse {
pub r#type: WsResponseType,
pub content: serde_json::Value,
}
pub struct WsRequestHandler {
connection: WsConnection,
args: HashMap<String, RequestValue>,
response: Option<WsResponse>,
}
impl WsRequestHandler {
pub fn new(connection: &WsConnection, args: HashMap<String, String>) -> WsRequestHandler {
WsRequestHandler {
connection: connection.clone(),
args: args.into_iter().map(|f| (f.0, RequestValue::String(f.1))).collect(),
response: None,
}
}
/// Check if a response has been set
pub fn has_response(&self) -> bool {
self.response.is_some()
}
/// Get the response to the request
pub fn response(mut self) -> WsResponse {
if !self.has_response() {
self.success("Request done.").unwrap();
}
return self.response.unwrap();
}
}
impl BaseRequestHandler for WsRequestHandler {
fn post_parameter_opt(&self, name: &str) -> Option<&RequestValue> {
self.args.get(name)
}
fn set_response<T: Serialize>(&mut self, response: T) -> RequestResult {
self.response = Some(WsResponse {
r#type: WsResponseType::SUCCESS,
content: serde_json::to_value(response)?,
});
Ok(())
}
fn set_error(&mut self, error: HttpError) {
self.response = Some(WsResponse {
r#type: WsResponseType::ERROR,
content: serde_json::Value::String(error.error.message),
});
}
fn remote_ip(&self) -> String {
self.connection.remote_ip.to_string()
}
fn user_id_opt_ref(&self) -> Option<&UserID> {
Some(&self.connection.user_id)
}
fn user_id(&self) -> ResultBoxError<UserID> {
Ok(self.connection.user_id.clone())
}
}

12
src/data/ws_message.rs Normal file
View File

@ -0,0 +1,12 @@
//! # WebSocket message
//!
//! @author Pierre Hubert
use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize)]
pub struct WsMessage {
pub id: Option<String>,
pub title: String,
pub data: serde_json::Value,
}