From 9c8cff9357e5976fec529921372f81a4a9a40c1f Mon Sep 17 00:00:00 2001 From: Pierre Hubert Date: Tue, 30 Aug 2022 11:46:21 +0200 Subject: [PATCH] Start to listen to websocket incoming connections --- Cargo.lock | 81 +++++++++++++++++++++++++++----- tcp_relay_server/Cargo.toml | 4 +- tcp_relay_server/src/lib.rs | 1 + tcp_relay_server/src/main.rs | 35 ++++++-------- tcp_relay_server/src/relay_ws.rs | 52 ++++++++++++++++++++ 5 files changed, 141 insertions(+), 32 deletions(-) create mode 100644 tcp_relay_server/src/relay_ws.rs diff --git a/Cargo.lock b/Cargo.lock index a8bc3ce..6ca9533 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,30 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "actix" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f728064aca1c318585bf4bb04ffcfac9e75e508ab4e8b1bd9ba5dfe04e2cbed5" +dependencies = [ + "actix-rt", + "actix_derive", + "bitflags", + "bytes", + "crossbeam-channel", + "futures-core", + "futures-sink", + "futures-task", + "futures-util", + "log", + "once_cell", + "parking_lot", + "pin-project-lite", + "smallvec", + "tokio", + "tokio-util", +] + [[package]] name = "actix-codec" version = "0.5.0" @@ -169,6 +193,23 @@ dependencies = [ "url", ] +[[package]] +name = "actix-web-actors" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31efe7896f3933ce03dd4710be560254272334bb321a18fd8ff62b1a557d9d19" +dependencies = [ + "actix", + "actix-codec", + "actix-http", + "actix-web", + "bytes", + "bytestring", + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "actix-web-codegen" version = "4.0.1" @@ -182,18 +223,14 @@ dependencies = [ ] [[package]] -name = "actix-web-httpauth" -version = "0.8.0" +name = "actix_derive" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dda62cf04bc3a9ad2ea8f314f721951cfdb4cdacec4e984d20e77c7bb170991" +checksum = "6d44b8fee1ced9671ba043476deddef739dd0959bf77030b26b738cc591737a7" dependencies = [ - "actix-utils", - "actix-web", - "base64", - "futures-core", - "futures-util", - "log", - "pin-project-lite", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -429,6 +466,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -1400,12 +1457,14 @@ dependencies = [ name = "tcp_relay_server" version = "0.1.0" dependencies = [ + "actix", "actix-web", - "actix-web-httpauth", + "actix-web-actors", "base", "clap", "env_logger", "log", + "serde", ] [[package]] diff --git a/tcp_relay_server/Cargo.toml b/tcp_relay_server/Cargo.toml index 8cc61b1..b7a4d3f 100644 --- a/tcp_relay_server/Cargo.toml +++ b/tcp_relay_server/Cargo.toml @@ -8,5 +8,7 @@ base = { path = "../base" } clap = { version = "3.2.18", features = ["derive", "env"] } log = "0.4.17" env_logger = "0.9.0" +actix = "0.13.0" actix-web = "4" -actix-web-httpauth = "0.8.0" \ No newline at end of file +actix-web-actors = "4.1.0" +serde = { version = "1.0.144", features = ["derive"] } \ No newline at end of file diff --git a/tcp_relay_server/src/lib.rs b/tcp_relay_server/src/lib.rs index 6e10f4a..4e996c7 100644 --- a/tcp_relay_server/src/lib.rs +++ b/tcp_relay_server/src/lib.rs @@ -1 +1,2 @@ pub mod args; +pub mod relay_ws; \ No newline at end of file diff --git a/tcp_relay_server/src/main.rs b/tcp_relay_server/src/main.rs index 3ae8256..f615f0b 100644 --- a/tcp_relay_server/src/main.rs +++ b/tcp_relay_server/src/main.rs @@ -1,33 +1,28 @@ use std::sync::Arc; -use actix_web::{App, Error, HttpResponse, HttpServer, middleware, Responder, web}; -use actix_web::dev::ServiceRequest; -use actix_web::error::ErrorUnauthorized; +use actix_web::{App, HttpRequest, HttpResponse, HttpServer, middleware, Responder, web}; use actix_web::web::Data; -use actix_web_httpauth::extractors::bearer::BearerAuth; -use actix_web_httpauth::middleware::HttpAuthentication; use clap::Parser; use base::RelayedPort; use tcp_relay_server::args::Args; - -async fn auth_validator( - req: ServiceRequest, - creds: BearerAuth, -) -> Result { - let args: &Data> = req.app_data().unwrap(); - if args.tokens.iter().any(|t| t == creds.token()) { - Ok(req) - } else { - Err((ErrorUnauthorized("invalid token"), req)) - } -} +use tcp_relay_server::relay_ws::relay_ws; pub async fn hello_route() -> &'static str { - "TCP relay. Hello world!" + "Hello world!" } -pub async fn config_route(data: Data>) -> impl Responder { +pub async fn config_route(req: HttpRequest, data: Data>) -> impl Responder { + let token = req.headers().get("Authorization") + .map(|t| t.to_str().unwrap_or_default()) + .unwrap_or_default() + .strip_prefix("Bearer ") + .unwrap_or_default(); + + if !data.tokens.iter().any(|t| t.eq(token)) { + return HttpResponse::Unauthorized().json("Missing / invalid token"); + } + HttpResponse::Ok().json( data.ports.iter() .enumerate() @@ -59,10 +54,10 @@ async fn main() -> std::io::Result<()> { HttpServer::new(move || { App::new() .wrap(middleware::Logger::default()) - .wrap(HttpAuthentication::bearer(auth_validator)) .app_data(Data::new(args_clone.clone())) .route("/", web::get().to(hello_route)) .route("/config", web::get().to(config_route)) + .route("/ws", web::get().to(relay_ws)) }) .bind(&args.listen_address)? .run() diff --git a/tcp_relay_server/src/relay_ws.rs b/tcp_relay_server/src/relay_ws.rs new file mode 100644 index 0000000..1bc2907 --- /dev/null +++ b/tcp_relay_server/src/relay_ws.rs @@ -0,0 +1,52 @@ +use std::sync::Arc; + +use actix::{Actor, StreamHandler}; +use actix_web::{Error, HttpRequest, HttpResponse, web}; +use actix_web_actors::ws; + +use crate::args::Args; + +/// Define HTTP actor +struct RelayWS; + +impl Actor for RelayWS { + type Context = ws::WebsocketContext; +} + +/// Handler for ws::Message message +impl StreamHandler> for RelayWS { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + match msg { + Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), + Ok(ws::Message::Text(text)) => ctx.text(text), + Ok(ws::Message::Binary(bin)) => ctx.binary(bin), + _ => (), + } + } +} + +#[derive(serde::Deserialize)] +pub struct WebSocketQuery { + id: usize, + token: String, +} + +pub async fn relay_ws(req: HttpRequest, stream: web::Payload, + query: web::Query, + conf: web::Data>) -> Result { + if !conf.tokens.contains(&query.token) { + log::error!("Rejected WS request from {:?} due to invalid token!", req.peer_addr()); + return Ok(HttpResponse::Unauthorized().json("Invalid / missing token!")); + } + + if conf.ports.len() <= query.id { + log::error!("Rejected WS request from {:?} due to invalid port number!", req.peer_addr()); + return Ok(HttpResponse::BadRequest().json("Invalid port number!")); + } + + let upstream_addr = format!("{}:{}", conf.upstream_server, conf.ports[query.id]); + + let resp = ws::start(RelayWS {}, &req, stream); + log::info!("Opening new WS connection for {:?} to {}", req.peer_addr(), upstream_addr); + resp +} \ No newline at end of file