From 5254524c50a582fbfcc253415ede1e83ae92fadf Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Fri, 5 Feb 2021 13:21:10 +0100 Subject: [PATCH] Start user WebSocket implementation --- Cargo.lock | 394 ++++++++++++++- Cargo.toml | 7 +- src/controllers/mod.rs | 5 +- src/controllers/routes.rs | 4 +- src/controllers/server.rs | 6 + src/controllers/user_web_socket_controller.rs | 92 ---- src/controllers/user_ws_controller.rs | 452 ++++++++++++++++++ src/controllers/ws_routes.rs | 34 ++ src/data/error.rs | 1 + src/data/mod.rs | 4 +- src/data/user_ws_request_handler.rs | 86 ++++ src/data/ws_message.rs | 12 + 12 files changed, 972 insertions(+), 125 deletions(-) delete mode 100644 src/controllers/user_web_socket_controller.rs create mode 100644 src/controllers/user_ws_controller.rs create mode 100644 src/controllers/ws_routes.rs create mode 100644 src/data/user_ws_request_handler.rs create mode 100644 src/data/ws_message.rs diff --git a/Cargo.lock b/Cargo.lock index ef46413..bf6c0cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,30 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "actix" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4af87564ff659dee8f9981540cac9418c45e910c8072fdedd643a262a38fcaf" +dependencies = [ + "actix-http", + "actix-rt", + "actix_derive", + "bitflags", + "bytes", + "crossbeam-channel", + "derive_more", + "futures", + "lazy_static", + "log", + "parking_lot", + "pin-project 0.4.17", + "smallvec", + "tokio", + "tokio-util 0.2.0", + "trust-dns-proto", + "trust-dns-resolver", +] + [[package]] name = "actix-codec" version = "0.2.0" @@ -46,7 +71,7 @@ dependencies = [ "actix-service", "actix-threadpool", "actix-utils", - "base64", + "base64 0.11.0", "bitflags", "brotli2", "bytes", @@ -70,7 +95,7 @@ dependencies = [ "log", "mime", "percent-encoding", - "pin-project", + "pin-project 0.4.17", "rand", "regex", "serde", @@ -88,7 +113,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a60f9ba7c4e6df97f3aacb14bb5c0cd7d98a49dcbaed0d7f292912ad9a6a3ed2" dependencies = [ "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", ] [[package]] @@ -165,7 +190,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3e4fc95dfa7e24171b2d0bb46b85f8ab0e8499e4e3caec691fc4ea65c287564" dependencies = [ "futures-util", - "pin-project", + "pin-project 0.4.17", ] [[package]] @@ -227,7 +252,7 @@ dependencies = [ "either", "futures", "log", - "pin-project", + "pin-project 0.4.17", "slab", ] @@ -258,7 +283,7 @@ dependencies = [ "log", "mime", "net2", - "pin-project", + "pin-project 0.4.17", "regex", "serde", "serde_json", @@ -267,6 +292,21 @@ dependencies = [ "url", ] +[[package]] +name = "actix-web-actors" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1bd41bd66c4e9b5274cec87aac30168e63d64e96fd19db38edef6b46ba2982" +dependencies = [ + "actix", + "actix-codec", + "actix-http", + "actix-web", + "bytes", + "futures", + "pin-project 0.4.17", +] + [[package]] name = "actix-web-codegen" version = "0.2.1" @@ -275,7 +315,18 @@ checksum = "4f00371942083469785f7e28c540164af1913ee7c96a4534acb9cea92c39f057" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", +] + +[[package]] +name = "actix_derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b95aceadaf327f18f0df5962fedc1bde2f870566a0b9f65c89508a3b1f79334c" +dependencies = [ + "proc-macro2", + "quote 1.0.6", + "syn 1.0.48", ] [[package]] @@ -346,7 +397,7 @@ checksum = "26c4f3195085c36ea8d24d32b2f828d23296a9370a28aa39d111f6f16bef9f3b" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", ] [[package]] @@ -371,7 +422,7 @@ dependencies = [ "actix-http", "actix-rt", "actix-service", - "base64", + "base64 0.11.0", "bytes", "derive_more", "futures-core", @@ -412,6 +463,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" +[[package]] +name = "base64" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" + [[package]] name = "bigdecimal" version = "0.1.2" @@ -477,6 +534,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40e38929add23cdf8a366df9b0e088953150724bcbe5fc330b0d8eb3b328eec8" +[[package]] +name = "bumpalo" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" + [[package]] name = "byte-tools" version = "0.3.1" @@ -553,9 +616,11 @@ checksum = "0dbbb57365263e881e805dc77d94697c9118fd94d8da011240555aa7b23445bd" name = "comunic_server" version = "0.1.0" dependencies = [ + "actix", "actix-multipart", "actix-rt", "actix-web", + "actix-web-actors", "bytes", "chrono", "dashmap", @@ -571,6 +636,7 @@ dependencies = [ "percent-encoding", "rand", "regex", + "reqwest", "serde", "serde_json", "sha1", @@ -628,6 +694,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" +dependencies = [ + "crossbeam-utils", + "maybe-uninit", +] + [[package]] name = "crossbeam-deque" version = "0.7.3" @@ -705,7 +781,7 @@ checksum = "2127768764f1556535c01b5326ef94bd60ff08dcfbdc544d53e69ed155610f5d" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", ] [[package]] @@ -716,7 +792,7 @@ checksum = "114aa287358087a616096186f3de19525d8083f83d437dc6b583f895316b02e6" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", ] [[package]] @@ -822,7 +898,7 @@ dependencies = [ "heck", "proc-macro2", "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", ] [[package]] @@ -852,7 +928,7 @@ checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", "synstructure", ] @@ -988,7 +1064,7 @@ dependencies = [ "proc-macro-hack", "proc-macro2", "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", ] [[package]] @@ -1019,7 +1095,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project", + "pin-project 0.4.17", "pin-utils", "proc-macro-hack", "proc-macro-nested", @@ -1140,12 +1216,65 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" +dependencies = [ + "bytes", + "http", +] + [[package]] name = "httparse" version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" +[[package]] +name = "httpdate" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" + +[[package]] +name = "hyper" +version = "0.13.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ad767baac13b44d4529fcf58ba2cd0995e36e7b435bc5b039de6f47e880dbf" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project 1.0.2", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-tls" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d979acc56dcb5b8dddba3917601745e877576475aa046df3226eabdecef78eed" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-tls", +] + [[package]] name = "idna" version = "0.2.0" @@ -1198,7 +1327,7 @@ checksum = "285762158c743ac75969bc8e9062ddcb013121d0c8aae32156740e621d2f3194" dependencies = [ "derive_utils", "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", ] [[package]] @@ -1219,9 +1348,15 @@ dependencies = [ "socket2", "widestring", "winapi 0.3.8", - "winreg", + "winreg 0.6.2", ] +[[package]] +name = "ipnet" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135" + [[package]] name = "isatty" version = "0.1.9" @@ -1259,6 +1394,15 @@ dependencies = [ "rayon", ] +[[package]] +name = "js-sys" +version = "0.3.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca059e81d9486668f12d455a4ea6daa600bd408134cd17e3d3fb5a32d1f016f8" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "kamadak-exif" version = "0.5.1" @@ -1536,7 +1680,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e3950cf532b88574a05c3fcd4f1b1e0cd0b84883462ae6811298fe7c3c92edc" dependencies = [ - "base64", + "base64 0.11.0", "bigdecimal", "bitflags", "byteorder", @@ -1808,7 +1952,16 @@ version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edc93aeee735e60ecb40cf740eb319ff23eab1c5748abfdb5c180e4ce49f7791" dependencies = [ - "pin-project-internal", + "pin-project-internal 0.4.17", +] + +[[package]] +name = "pin-project" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7" +dependencies = [ + "pin-project-internal 1.0.2", ] [[package]] @@ -1819,7 +1972,18 @@ checksum = "e58db2081ba5b4c93bd6be09c40fd36cb9193a8336c384f3b40012e531aa7e40" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f" +dependencies = [ + "proc-macro2", + "quote 1.0.6", + "syn 1.0.48", ] [[package]] @@ -1872,9 +2036,9 @@ checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" [[package]] name = "proc-macro2" -version = "1.0.13" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53f5ffe53a6b28e37c9c1ce74893477864d64f74778a93a4beb43c8fa167f639" +checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" dependencies = [ "unicode-xid 0.2.0", ] @@ -1999,6 +2163,42 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "reqwest" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9eaa17ac5d7b838b7503d118fa16ad88f440498bf9ffe5424e621f93190d61e" +dependencies = [ + "base64 0.12.3", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "mime_guess", + "native-tls", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tls", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg 0.7.0", +] + [[package]] name = "resolv-conf" version = "0.6.3" @@ -2119,7 +2319,7 @@ checksum = "818fbf6bfa9a42d3bfcaca148547aa00c7b915bec71d1757aa2d44ca68771984" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", ] [[package]] @@ -2216,9 +2416,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.22" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1425de3c33b0941002740a420b1a906a350b88d08b82b2c8a01035a3f9447bac" +checksum = "cc371affeffc477f42a221a1e4297aedcea33d47d19b61455588bd9d8f6b19ac" dependencies = [ "proc-macro2", "quote 1.0.6", @@ -2242,7 +2442,7 @@ checksum = "67656ea1dc1b41b1451851562ea232ec2e5a80242139f7e679ceccfb5d61f545" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.22", + "syn 1.0.48", "unicode-xid 0.2.0", ] @@ -2306,6 +2506,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" dependencies = [ "bytes", + "fnv", "futures-core", "iovec", "lazy_static", @@ -2313,12 +2514,23 @@ dependencies = [ "memchr", "mio", "mio-uds", + "num_cpus", "pin-project-lite", "signal-hook-registry", "slab", "winapi 0.3.8", ] +[[package]] +name = "tokio-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-util" version = "0.2.0" @@ -2347,6 +2559,33 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower-service" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" + +[[package]] +name = "tracing" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0987850db3733619253fe60e17cb59b82d37c7e6c0236bb81e4d6b87c879f27" +dependencies = [ + "cfg-if", + "log", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f50de3927f93d202783f4513cda820ab47ef17f624b03c096e86ef00c67e6b5f" +dependencies = [ + "lazy_static", +] + [[package]] name = "trust-dns-proto" version = "0.18.0-alpha.2" @@ -2386,6 +2625,12 @@ dependencies = [ "trust-dns-proto", ] +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "tuple" version = "0.4.2" @@ -2507,12 +2752,100 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" +[[package]] +name = "wasm-bindgen" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42" +dependencies = [ + "cfg-if", + "serde", + "serde_json", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f22b422e2a757c35a73774860af8e112bff612ce6cb604224e8e47641a9e4f68" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote 1.0.6", + "syn 1.0.48", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7866cab0aa01de1edf8b5d7936938a7e397ee50ce24119aef3e1eaa3b6171da" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b13312a745c08c469f0b292dd2fcd6411dba5f7160f593da6ef69b64e407038" +dependencies = [ + "quote 1.0.6", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f249f06ef7ee334cc3b8ff031bfc11ec99d00f34d86da7498396dc1e3b1498fe" +dependencies = [ + "proc-macro2", + "quote 1.0.6", + "syn 1.0.48", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d649a3145108d7d3fbcde896a468d1bd636791823c9921135218ad89be08307" + +[[package]] +name = "web-sys" +version = "0.3.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bf6ef87ad7ae8008e15a355ce696bed26012b7caa21605188cfd8214ab51e2d" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "widestring" version = "0.4.0" @@ -2562,6 +2895,15 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "winreg" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" +dependencies = [ + "winapi 0.3.8", +] + [[package]] name = "ws2_32-sys" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index ea5c7b9..6066e53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,12 @@ edition = "2018" [dependencies] yaml-rust = "0.4.3" mysql = "18.2.0" +actix = "0.9.0" actix-web = "2.0.0" actix-rt = "1.1.1" actix-multipart = "0.2.0" -serde = "1.0.110" +actix-web-actors = "2.0.0" +serde = { version = "1.0.110", features = ["derive"] } serde_json = "1.0.53" futures = "0.3.5" encoding_rs = "0.8.23" @@ -28,4 +30,5 @@ lazy_static = "1.4.0" mime_guess = "2.0.3" pdf = "0.6.3" regex = "1.4.2" -dashmap = "3.11.10" \ No newline at end of file +dashmap = "3.11.10" +reqwest = { version = "0.10.6", features = ["json", "blocking"] } \ No newline at end of file diff --git a/src/controllers/mod.rs b/src/controllers/mod.rs index eb3d47d..e3696f5 100644 --- a/src/controllers/mod.rs +++ b/src/controllers/mod.rs @@ -2,7 +2,7 @@ pub mod routes; pub mod server; pub mod server_controller; -pub mod user_web_socket_controller; +pub mod user_ws_controller; pub mod account_controller; pub mod user_controller; pub mod settings_controller; @@ -18,4 +18,5 @@ pub mod notifications_controller; pub mod movies_controller; pub mod virtual_directory_controller; pub mod web_app_controller; -pub mod calls_controller; \ No newline at end of file +pub mod calls_controller; +pub mod ws_routes; \ No newline at end of file diff --git a/src/controllers/routes.rs b/src/controllers/routes.rs index 1acfdff..a0e55a9 100644 --- a/src/controllers/routes.rs +++ b/src/controllers/routes.rs @@ -1,6 +1,6 @@ use std::error::Error; -use crate::controllers::{account_controller, calls_controller, comments_controller, conversations_controller, friends_controller, groups_controller, likes_controller, movies_controller, notifications_controller, posts_controller, search_controller, server_controller, settings_controller, surveys_controller, user_controller, user_web_socket_controller, virtual_directory_controller, web_app_controller}; +use crate::controllers::{account_controller, calls_controller, comments_controller, conversations_controller, friends_controller, groups_controller, likes_controller, movies_controller, notifications_controller, posts_controller, search_controller, server_controller, settings_controller, surveys_controller, user_controller, user_ws_controller, virtual_directory_controller, web_app_controller}; use crate::controllers::routes::Method::{GET, POST}; use crate::data::http_request_handler::HttpRequestHandler; @@ -129,7 +129,7 @@ pub fn get_routes() -> Vec { Route::get_without_login("/", Box::new(server_controller::main_index)), // Main user WebSocket - Route::post("/ws/token", Box::new(user_web_socket_controller::get_token)), + Route::post("/ws/token", Box::new(user_ws_controller::get_token)), // Account controller Route::limited_post_without_login("/account/create", Box::new(account_controller::create), LimitPolicy::SUCCESS(10)), diff --git a/src/controllers/server.rs b/src/controllers/server.rs index 9ff7b2e..d95f9da 100644 --- a/src/controllers/server.rs +++ b/src/controllers/server.rs @@ -16,6 +16,7 @@ use crate::api_data::http_error::HttpError; use crate::constants::MAX_REQUEST_SIZE; use crate::controllers::routes::{get_routes, RequestResult, Route}; use crate::controllers::routes::Method::{GET, POST}; +use crate::controllers::user_ws_controller; use crate::data::base_request_handler::{BaseRequestHandler, PostFile, RequestValue}; use crate::data::config::Config; use crate::data::http_request_handler::HttpRequestHandler; @@ -313,6 +314,11 @@ pub async fn start_server(conf: &Config) -> std::io::Result<()> { HttpServer::new(|| { App::new() + + // User WebSocket route + .service(actix_web::web::resource("/ws").to(user_ws_controller::ws_route)) + + // API routes .route("**", web::get().to(process_request)) .route("**", web::post().to(process_request)) }).bind(&addr)?.run().await diff --git a/src/controllers/user_web_socket_controller.rs b/src/controllers/user_web_socket_controller.rs deleted file mode 100644 index 37193a3..0000000 --- a/src/controllers/user_web_socket_controller.rs +++ /dev/null @@ -1,92 +0,0 @@ -//! # User Web Socket controller -//! -//! Handles the WebSocket offered to the users - -use crate::api_data::res_get_ws_token::ResGetWsToken; -use crate::constants::WS_ACCESS_TOKEN_LENGTH; -use crate::data::base_request_handler::BaseRequestHandler; -use crate::data::error::ResultBoxError; -use crate::data::http_request_handler::HttpRequestHandler; -use crate::utils::crypt_utils::rand_str; -use crate::utils::date_utils::time; - -/// WebSocket access tokens list -mod ws_tokens_list { - use std::sync::Arc; - use std::sync::Mutex; - - use crate::constants::WS_ACCESS_TOKEN_LIFETIME; - use crate::data::user::UserID; - use crate::utils::date_utils::time; - - #[derive(Debug)] - pub struct WsToken { - pub time: u64, - pub client_id: u32, - pub user_id: UserID, - pub token: String, - pub incognito: bool, - } - - lazy_static! { - static ref WS_TOKENS: Arc>> = { - Arc::new(Mutex::new(Vec::new())) - }; - } - - /// Get the list of WebSocket tokens - fn get_list() -> Arc>> { - (*WS_TOKENS).clone() - } - - /// Remove old entries from the list - pub fn clean_list() { - let list = get_list(); - let mut list = list.lock().unwrap(); - - while let Some(first) = list.first() { - if first.time < time() - WS_ACCESS_TOKEN_LIFETIME { - list.remove(0); - } else { - break; - } - } - } - - /// Add a new token to the list - pub fn add_token(t: WsToken) { - get_list().lock().unwrap().push(t) - } - - /// Remove a specific access token from the list & return it - pub fn take_access_token(t: String) -> Option { - let list = get_list(); - let mut list = list.lock().unwrap(); - for i in 0..list.len() { - if list[i].token == t { - return Some(list.remove(i)); - } - } - - None - } -} - -/// Get a WebSocket access token -pub fn get_token(r: &mut HttpRequestHandler) -> ResultBoxError { - ws_tokens_list::clean_list(); - - let access_token = rand_str(WS_ACCESS_TOKEN_LENGTH); - - let token = ws_tokens_list::WsToken { - user_id: r.user_id()?, - client_id: r.api_client().id, - time: time(), - token: access_token.to_string(), - incognito: r.post_bool_opt("incognito", false), - }; - - ws_tokens_list::add_token(token); - - r.set_response(ResGetWsToken::new(access_token)) -} \ No newline at end of file diff --git a/src/controllers/user_ws_controller.rs b/src/controllers/user_ws_controller.rs new file mode 100644 index 0000000..26d1be3 --- /dev/null +++ b/src/controllers/user_ws_controller.rs @@ -0,0 +1,452 @@ +//! # User Web Socket controller +//! +//! Handles the WebSocket offered to the users + +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Running, StreamHandler}; +use actix::prelude::*; +use actix_web_actors::ws; +use actix_web_actors::ws::ProtocolError; +use serde_json::Value; + +use crate::api_data::res_get_ws_token::ResGetWsToken; +use crate::constants::WS_ACCESS_TOKEN_LENGTH; +use crate::controllers::user_ws_controller::ws_connections_list::{add_connection, find_connection, get_ws_connections_list, remove_connection}; +pub use crate::controllers::user_ws_controller::ws_connections_list::WsConnection; +use crate::controllers::ws_routes::find_ws_route; +use crate::data::base_request_handler::BaseRequestHandler; +use crate::data::config::conf; +use crate::data::error::{ExecError, Res, ResultBoxError}; +use crate::data::http_request_handler::HttpRequestHandler; +use crate::data::user::UserID; +use crate::data::user_ws_request_handler::{WsRequestHandler, WsResponseType}; +use crate::data::ws_message::WsMessage; +use crate::utils::crypt_utils::rand_str; +use crate::utils::date_utils::time; + +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); + +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + +/// WebSocket access tokens list +mod ws_tokens_list { + use std::sync::Arc; + use std::sync::Mutex; + + use crate::constants::WS_ACCESS_TOKEN_LIFETIME; + use crate::data::user::UserID; + use crate::utils::date_utils::time; + + #[derive(Debug)] + pub struct WsToken { + pub time: u64, + pub client_id: u32, + pub user_id: UserID, + pub token: String, + pub incognito: bool, + pub remote_ip: String, + } + + lazy_static! { + static ref WS_TOKENS: Arc>> = { + Arc::new(Mutex::new(Vec::new())) + }; + } + + /// Get the list of WebSocket tokens + fn get_list() -> Arc>> { + (*WS_TOKENS).clone() + } + + /// Remove old entries from the list + pub fn clean_list() { + let list = get_list(); + let mut list = list.lock().unwrap(); + + while let Some(first) = list.first() { + if first.time < time() - WS_ACCESS_TOKEN_LIFETIME { + list.remove(0); + } else { + break; + } + } + } + + /// Add a new token to the list + pub fn add_token(t: WsToken) { + get_list().lock().unwrap().push(t) + } + + /// Remove a specific access token from the list & return it + pub fn take_access_token(t: String) -> Option { + let list = get_list(); + let mut list = list.lock().unwrap(); + for i in 0..list.len() { + if list[i].token == t { + return Some(list.remove(i)); + } + } + + None + } +} + +/// WebSocket connections list +mod ws_connections_list { + use std::sync::Arc; + use std::sync::Mutex; + + use actix::Addr; + + use crate::controllers::user_ws_controller::WsSession; + use crate::data::user::UserID; + + #[derive(Clone, Debug)] + pub struct WsConnection { + pub user_id: UserID, + pub remote_ip: String, + pub session: actix::Addr, + } + + lazy_static! { + static ref WS_CONNECTIONS: Arc>> = { + Arc::new(Mutex::new(Vec::new())) + }; + } + + /// Get the list of WebSocket connections + pub fn get_ws_connections_list() -> Arc>> { + (*WS_CONNECTIONS).clone() + } + + /// Add a new token to the list + pub fn add_connection(t: WsConnection) { + get_ws_connections_list().lock().unwrap().push(t) + } + + /// Find a connection in the list + pub fn find_connection(t: Addr) -> Option { + get_ws_connections_list() + .lock() + .unwrap() + .iter() + .find(|f| f.session == t) + .map(|f| f.clone()) + } + + /// Remove a connection from the list + pub fn remove_connection(t: Addr) -> Option { + let list = get_ws_connections_list(); + let mut list = list.lock().unwrap(); + for i in 0..list.len() { + if list[i].session == t { + return Some(list.remove(i)); + } + } + + None + } +} + +/// Get a WebSocket access token +pub fn get_token(r: &mut HttpRequestHandler) -> ResultBoxError { + ws_tokens_list::clean_list(); + + let access_token = rand_str(WS_ACCESS_TOKEN_LENGTH); + + let token = ws_tokens_list::WsToken { + user_id: r.user_id()?, + client_id: r.api_client().id, + time: time(), + token: access_token.to_string(), + incognito: r.post_bool_opt("incognito", false), + remote_ip: r.remote_ip(), + }; + + ws_tokens_list::add_token(token); + + r.set_response(ResGetWsToken::new(access_token)) +} + +#[derive(Debug)] +pub struct WsSession { + user_id: UserID, + + // Client used for the connection + client_id: u32, + + // Remote IP address + remote_ip: String, + + // Check if the client is in incognito mode + incognito: bool, + + // Client must respond to ping at a specific interval, otherwise we drop connection + hb: Instant, +} + +impl WsSession { + /// helper method that sends ping to client every second. + /// + /// also this method checks heartbeats from client + fn hb(&self, ctx: &mut actix_web_actors::ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // heartbeat timed out + println!("WebSocket Client heartbeat failed, disconnecting!"); + + // stop actor + ctx.stop(); + + // don't try to send a ping + return; + } + + ctx.ping(b""); + }); + } + + /// Handle incoming message + fn handle_message(&self, ctx: &mut ws::WebsocketContext, msg: &str) -> Res { + let incoming_msg: WsMessage = serde_json::from_str(&msg)?; + + let data = incoming_msg.data.as_object() + .ok_or(ExecError::boxed_new("Could not parse values!"))?; + + let mut args = HashMap::new(); + for (k, v) in data { + args.insert(k.to_string(), match v { + Value::Null => "null".to_string(), + Value::Bool(b) => b.to_string(), + Value::Number(n) => n.to_string(), + Value::String(s) => s.to_string(), + _ => "invalid".to_string() + }); + } + + let mut handler = WsRequestHandler::new( + &find_connection(ctx.address()).ok_or(ExecError::boxed_new("Connection not found!"))?, + args, + ); + + let result = match find_ws_route(&incoming_msg.title) { + None => { + handler.not_found("Route not found!".to_string()) + } + Some(r) => { + (r.handler)(&mut handler) + } + }; + + if !handler.has_response() { + match result { + Ok(_) => handler.success("Request successful").unwrap(), + Err(e) => { + println!("WS request error: {}", &e); + handler.internal_error(e).unwrap_err(); + } + } + } + + let response = handler.response(); + + Ok(WsMessage { + id: incoming_msg.id, + title: match response.r#type { + WsResponseType::SUCCESS => "success".to_string(), + WsResponseType::ERROR => "error".to_string(), + }, + data: response.content, + }) + } +} + +impl Actor for WsSession { + type Context = actix_web_actors::ws::WebsocketContext; + + /// Method is called on actor start. + fn started(&mut self, ctx: &mut Self::Context) { + // we'll start heartbeat process on session start. + self.hb(ctx); + + add_connection(WsConnection { + user_id: self.user_id.clone(), + remote_ip: self.remote_ip.clone(), + session: ctx.address(), + }) + } + + fn stopping(&mut self, ctx: &mut Self::Context) -> Running { + remove_connection(ctx.address()); + Running::Stop + } +} + +impl StreamHandler> for WsSession { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + let msg = match msg { + Err(_) => { + ctx.stop(); + return; + } + Ok(msg) => msg, + }; + + if conf().database.log_all_queries + { + println!("USER WEBSOCKET MESSAGE: {:?}", msg); + } + + match msg { + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } + + ws::Message::Pong(_) => { + self.hb = Instant::now(); + } + + ws::Message::Text(msg) => { + match self.handle_message(ctx, &msg) { + Ok(msg) => { + let response = serde_json::to_string(&msg) + .unwrap_or("Failed to serialize".to_string()); + + if conf().database.log_all_queries { + println!("USER WEBSOCKET RESPONSE {}", response); + } + + ctx.text(response) + } + + Err(e) => { + println!("WS processing error: {}", e); + ctx.text("Failed to parse message"); + } + } + } + + ws::Message::Binary(_) => { + ctx.text("WS is text only!") + } + + ws::Message::Close(_) => { + ctx.stop(); + } + + ws::Message::Continuation(_) => { + ctx.stop(); + } + + ws::Message::Nop => () + } + } +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct WsQueuedMessage(String); + +#[derive(Message)] +#[rtype(result = "()")] +pub struct WsCloseConnection(); + + +impl Handler for WsSession { + type Result = (); + + fn handle(&mut self, msg: WsQueuedMessage, ctx: &mut Self::Context) -> Self::Result { + ctx.text(msg.0) + } +} + +impl Handler for WsSession { + type Result = (); + + fn handle(&mut self, _: WsCloseConnection, ctx: &mut Self::Context) -> Self::Result { + ctx.close(None) + } +} + + +/// Main WebSocket route +pub async fn ws_route( + req: actix_web::HttpRequest, + stream: actix_web::web::Payload, +) -> Result { + ws_tokens_list::clean_list(); + + // Extract token + let query = req.uri().query().unwrap_or(""); + if !query.starts_with("token=") { + return Ok(actix_web::HttpResponse::BadRequest().body("No token specified!")); + } + let token = query.replace("token=", ""); + + // Check access token + let token = ws_tokens_list::take_access_token(token) + .ok_or(actix_web::error::ErrorUnauthorized("Invalid access token!"))?; + + actix_web_actors::ws::start( + WsSession { + remote_ip: token.remote_ip, + user_id: token.user_id, + hb: std::time::Instant::now(), + incognito: token.incognito, + client_id: token.client_id, + }, + &req, + stream, + ) +} + +/// Send a message to a specific connection +fn send_message(session: Addr, msg: &WsMessage) -> Res { + session.do_send(WsQueuedMessage(serde_json::to_string(msg)?)); + Ok(()) +} + +/// Send a message to specific users +pub fn send_message_to_users(msg: &WsMessage, users: &Vec) -> Res { + let connections = get_ws_connections_list() + .lock() + .unwrap() + .iter() + .filter(|f| users.contains(&f.user_id)) + .map(|f| f.session.clone()) + .collect::>>(); + + for con in connections { + send_message(con, msg)?; + } + + Ok(()) +} + +/// Check out whether user is connected or not +pub fn is_user_connected(user_id: &UserID) -> bool { + get_ws_connections_list().lock().unwrap().iter().any(|c| &c.user_id == user_id) +} + +/// Disconnect a user from WebSocket +pub fn disconnect_user_from_all_sockets(user_id: &UserID) -> Res { + let connections = get_ws_connections_list() + .lock() + .unwrap() + .iter() + .filter(|f| &f.user_id == user_id) + .map(|f| f.session.clone()) + .collect::>>(); + + for c in connections { + c.do_send(WsCloseConnection {}); + } + + Ok(()) +} \ No newline at end of file diff --git a/src/controllers/ws_routes.rs b/src/controllers/ws_routes.rs new file mode 100644 index 0000000..2f887e1 --- /dev/null +++ b/src/controllers/ws_routes.rs @@ -0,0 +1,34 @@ +//! # WebSocket routes +//! +//! @author Pierre Hubert + +use crate::data::error::Res; +use crate::data::user_ws_request_handler::WsRequestHandler; + +pub type WsRequestProcess = Box Res>; + +/// WebSocket route +pub struct WsRoute { + pub route: String, + pub handler: WsRequestProcess, +} + +impl WsRoute { + pub fn new(route: &str, handler: H) -> WsRoute + where H: 'static + Fn(&mut WsRequestHandler) -> Res { + WsRoute { + route: route.to_string(), + handler: Box::new(handler), + } + } +} + +/// Get the list of available WebSocket routes +pub fn get_ws_routes() -> Vec { + vec![] +} + +/// Search for a route +pub fn find_ws_route(uri: &str) -> Option { + get_ws_routes().into_iter().find(|r| r.route == uri) +} \ No newline at end of file diff --git a/src/data/error.rs b/src/data/error.rs index 269c1b6..dee74c8 100644 --- a/src/data/error.rs +++ b/src/data/error.rs @@ -12,6 +12,7 @@ use serde::export::Formatter; /// Simple result type pub type ResultExecError = Result; pub type ResultBoxError = Result>; +pub type Res = ResultBoxError; #[derive(Debug, Clone)] pub struct ExecError(pub String); diff --git a/src/data/mod.rs b/src/data/mod.rs index a899163..dfd8178 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -3,6 +3,7 @@ pub mod config; pub mod base_request_handler; pub mod http_request_handler; +pub mod user_ws_request_handler; pub mod api_client; pub mod user; @@ -34,4 +35,5 @@ pub mod survey_response; pub mod general_settings; pub mod lang_settings; pub mod security_settings; -pub mod new_custom_emoji; \ No newline at end of file +pub mod new_custom_emoji; +pub mod ws_message; \ No newline at end of file diff --git a/src/data/user_ws_request_handler.rs b/src/data/user_ws_request_handler.rs new file mode 100644 index 0000000..57b5b62 --- /dev/null +++ b/src/data/user_ws_request_handler.rs @@ -0,0 +1,86 @@ +//! # User Web Socket Request handler + +use std::collections::HashMap; + +use serde::Serialize; + +use crate::api_data::http_error::HttpError; +use crate::controllers::routes::RequestResult; +use crate::controllers::user_ws_controller::WsConnection; +use crate::data::base_request_handler::{BaseRequestHandler, RequestValue}; +use crate::data::error::ResultBoxError; +use crate::data::user::UserID; + +pub enum WsResponseType { + SUCCESS, + ERROR, +} + +pub struct WsResponse { + pub r#type: WsResponseType, + pub content: serde_json::Value, +} + +pub struct WsRequestHandler { + connection: WsConnection, + args: HashMap, + response: Option, +} + +impl WsRequestHandler { + pub fn new(connection: &WsConnection, args: HashMap) -> WsRequestHandler { + WsRequestHandler { + connection: connection.clone(), + args: args.into_iter().map(|f| (f.0, RequestValue::String(f.1))).collect(), + response: None, + } + } + + /// Check if a response has been set + pub fn has_response(&self) -> bool { + self.response.is_some() + } + + /// Get the response to the request + pub fn response(mut self) -> WsResponse { + if !self.has_response() { + self.success("Request done.").unwrap(); + } + + return self.response.unwrap(); + } +} + +impl BaseRequestHandler for WsRequestHandler { + fn post_parameter_opt(&self, name: &str) -> Option<&RequestValue> { + self.args.get(name) + } + + fn set_response(&mut self, response: T) -> RequestResult { + self.response = Some(WsResponse { + r#type: WsResponseType::SUCCESS, + content: serde_json::to_value(response)?, + }); + + Ok(()) + } + + fn set_error(&mut self, error: HttpError) { + self.response = Some(WsResponse { + r#type: WsResponseType::ERROR, + content: serde_json::Value::String(error.error.message), + }); + } + + fn remote_ip(&self) -> String { + self.connection.remote_ip.to_string() + } + + fn user_id_opt_ref(&self) -> Option<&UserID> { + Some(&self.connection.user_id) + } + + fn user_id(&self) -> ResultBoxError { + Ok(self.connection.user_id.clone()) + } +} \ No newline at end of file diff --git a/src/data/ws_message.rs b/src/data/ws_message.rs new file mode 100644 index 0000000..47a81ee --- /dev/null +++ b/src/data/ws_message.rs @@ -0,0 +1,12 @@ +//! # WebSocket message +//! +//! @author Pierre Hubert + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Serialize, Deserialize)] +pub struct WsMessage { + pub id: Option, + pub title: String, + pub data: serde_json::Value, +} \ No newline at end of file