Start to listen to websocket incoming connections

This commit is contained in:
2022-08-30 11:46:21 +02:00
parent 4cf5100c30
commit 9c8cff9357
5 changed files with 141 additions and 32 deletions

View File

@ -8,5 +8,7 @@ base = { path = "../base" }
clap = { version = "3.2.18", features = ["derive", "env"] }
log = "0.4.17"
env_logger = "0.9.0"
actix = "0.13.0"
actix-web = "4"
actix-web-httpauth = "0.8.0"
actix-web-actors = "4.1.0"
serde = { version = "1.0.144", features = ["derive"] }

View File

@ -1 +1,2 @@
pub mod args;
pub mod relay_ws;

View File

@ -1,33 +1,28 @@
use std::sync::Arc;
use actix_web::{App, Error, HttpResponse, HttpServer, middleware, Responder, web};
use actix_web::dev::ServiceRequest;
use actix_web::error::ErrorUnauthorized;
use actix_web::{App, HttpRequest, HttpResponse, HttpServer, middleware, Responder, web};
use actix_web::web::Data;
use actix_web_httpauth::extractors::bearer::BearerAuth;
use actix_web_httpauth::middleware::HttpAuthentication;
use clap::Parser;
use base::RelayedPort;
use tcp_relay_server::args::Args;
async fn auth_validator(
req: ServiceRequest,
creds: BearerAuth,
) -> Result<ServiceRequest, (Error, ServiceRequest)> {
let args: &Data<Arc<Args>> = req.app_data().unwrap();
if args.tokens.iter().any(|t| t == creds.token()) {
Ok(req)
} else {
Err((ErrorUnauthorized("invalid token"), req))
}
}
use tcp_relay_server::relay_ws::relay_ws;
pub async fn hello_route() -> &'static str {
"TCP relay. Hello world!"
"Hello world!"
}
pub async fn config_route(data: Data<Arc<Args>>) -> impl Responder {
pub async fn config_route(req: HttpRequest, data: Data<Arc<Args>>) -> impl Responder {
let token = req.headers().get("Authorization")
.map(|t| t.to_str().unwrap_or_default())
.unwrap_or_default()
.strip_prefix("Bearer ")
.unwrap_or_default();
if !data.tokens.iter().any(|t| t.eq(token)) {
return HttpResponse::Unauthorized().json("Missing / invalid token");
}
HttpResponse::Ok().json(
data.ports.iter()
.enumerate()
@ -59,10 +54,10 @@ async fn main() -> std::io::Result<()> {
HttpServer::new(move || {
App::new()
.wrap(middleware::Logger::default())
.wrap(HttpAuthentication::bearer(auth_validator))
.app_data(Data::new(args_clone.clone()))
.route("/", web::get().to(hello_route))
.route("/config", web::get().to(config_route))
.route("/ws", web::get().to(relay_ws))
})
.bind(&args.listen_address)?
.run()

View File

@ -0,0 +1,52 @@
use std::sync::Arc;
use actix::{Actor, StreamHandler};
use actix_web::{Error, HttpRequest, HttpResponse, web};
use actix_web_actors::ws;
use crate::args::Args;
/// Define HTTP actor
struct RelayWS;
impl Actor for RelayWS {
type Context = ws::WebsocketContext<Self>;
}
/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for RelayWS {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
_ => (),
}
}
}
#[derive(serde::Deserialize)]
pub struct WebSocketQuery {
id: usize,
token: String,
}
pub async fn relay_ws(req: HttpRequest, stream: web::Payload,
query: web::Query<WebSocketQuery>,
conf: web::Data<Arc<Args>>) -> Result<HttpResponse, Error> {
if !conf.tokens.contains(&query.token) {
log::error!("Rejected WS request from {:?} due to invalid token!", req.peer_addr());
return Ok(HttpResponse::Unauthorized().json("Invalid / missing token!"));
}
if conf.ports.len() <= query.id {
log::error!("Rejected WS request from {:?} due to invalid port number!", req.peer_addr());
return Ok(HttpResponse::BadRequest().json("Invalid port number!"));
}
let upstream_addr = format!("{}:{}", conf.upstream_server, conf.ports[query.id]);
let resp = ws::start(RelayWS {}, &req, stream);
log::info!("Opening new WS connection for {:?} to {}", req.peer_addr(), upstream_addr);
resp
}