use std::collections::HashMap; use std::pin::Pin; use actix_web::{App, FromRequest, http, HttpMessage, HttpRequest, HttpResponse, HttpServer, web}; use actix_web::dev::{Decompress, Payload}; use actix_web::error::{ErrorBadRequest, ErrorInternalServerError, PayloadError}; use actix_web::web::{BufMut, Bytes, BytesMut}; use encoding_rs::UTF_8; use futures::{FutureExt, Stream, StreamExt}; use futures::future::LocalBoxFuture; use futures::task::{Context, Poll}; use percent_encoding::percent_decode_str; use crate::api_data::http_error::HttpError; use crate::constants::MAX_REQUEST_SIZE; use crate::controllers::{rtc_relay_controller, user_ws_controller}; use crate::data::base_request_handler::{BaseRequestHandler, PostFile, RequestValue}; use crate::data::config::Config; use crate::data::http_request_handler::HttpRequestHandler; use crate::helpers::{admin_access_token_helper, admin_key_authentication_challenges_helper, admin_key_registration_challenges_helper, api_helper, requests_limit_helper}; use crate::routes::{find_route, RequestResult, Route, RouteScope}; use crate::routes::Method::{GET, POST}; use crate::utils::user_data_utils::user_data_path; /// Main server functions /// /// @author Pierre Hubert /// Custom stream to give a limit to requests size struct LimitedStream { stream: Box> + Unpin + 'static>, already_read: usize, max_size: usize, } impl<'a> Stream for LimitedStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.already_read >= self.max_size { return Poll::Ready(None); } let res = Pin::new(self.stream.as_mut()).poll_next(cx); if let Poll::Ready(Some(Ok(d))) = &res { self.already_read = self.already_read + d.len(); } res } } /// Custom request value struct CustomRequest { req: HttpRequest, body: HashMap, } /// Process in our way incoming requests impl FromRequest for CustomRequest { type Error = actix_web::Error; type Future = LocalBoxFuture<'static, Result>; fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { let req = req.clone(); let payload = Box::new(payload.take()); async move { let content_length_size; // Check the size, if provided if req.headers().contains_key("Content-Length") { if let Some(v) = req.headers().get("Content-Length") { content_length_size = String::from_utf8_lossy(v.as_bytes()).parse::().unwrap_or(0); if content_length_size > MAX_REQUEST_SIZE { return Err(actix_web::error::ErrorBadRequest("Request too big!")); } } else { unreachable!(); } } else { return Err(actix_web::error::ErrorBadRequest("Content-Length header is required!")); } let payload = LimitedStream { stream: payload, already_read: 0, max_size: content_length_size, }; let mut body_args = HashMap::new(); // Process "application/x-www-form-urlencoded" requests if req.content_type().eq("application/x-www-form-urlencoded") { // Maximum size of the request let limit = 16384; // Ready body let mut body = BytesMut::with_capacity(8192); let mut stream = Decompress::from_headers(payload, req.headers()); while let Some(item) = stream.next().await { let chunk = item?; if body.len() + chunk.len() > limit { return Err(actix_web::error::ErrorBadRequest("Overflow - too long")); } else { body.extend_from_slice(&chunk); } } let body = body.freeze(); // Decode body as a string let encoding = req.encoding()?; let body_str = if encoding == UTF_8 { String::from_utf8_lossy(body.as_ref()).to_string() } else { encoding .decode_without_bom_handling_and_without_replacement(&body) .map(|s| s.into_owned()) .ok_or_else(|| ErrorBadRequest("Can not decode body"))? }; // Parse body arguments (following the pattern key1=value1&key2=value2) if body_str.len() > 0 { for v in body_str.split("&") { if v.len() == 0 { continue; } let args: Vec<&str> = v.split("=").collect(); if args.len() != 2 { return Err(actix_web::error::ErrorBadRequest(format!("{} is invalid!", args[0]))); } // Add the value to the body body_args.insert( percent_decode_str(args[0]).decode_utf8_lossy().to_string(), RequestValue::String(percent_decode_str(args[1]).decode_utf8_lossy().to_string()), ); } } } // Process "multipart/form-data" request else if req.content_type().starts_with("multipart/form-data") { let mut req = actix_multipart::Multipart::new(req.headers(), payload); // Process the list of arguments while let Some(el) = req.next().await { let mut field = el?; let content_type = field.content_disposition().clone(); let name = content_type.get_name().ok_or( ErrorInternalServerError("Missing field name!"))?; // Handle file upload if content_type.get_filename().is_some() { let filename = content_type.get_filename().unwrap(); let mut buf = BytesMut::new(); while let Some(chunk) = field.next().await { let data = chunk.unwrap(); buf.put(data); } body_args.insert(name.to_string(), RequestValue::File(PostFile { name: filename.to_string(), buff: buf.freeze(), })); } // It is a simple field else { let mut content = String::new(); // Get content while let Some(chunk) = field.next().await { content = format!("{}{}", content, String::from_utf8_lossy(chunk?.as_ref())); } body_args.insert(name.to_string(), RequestValue::String(content)); } } } Ok(CustomRequest { req: req.clone(), body: body_args, }) }.boxed_local() } } /// Process a "simple request" aka not a WebSocket request async fn process_simple_route(route: &Route, req: &mut HttpRequestHandler) -> RequestResult { if requests_limit_helper::trigger_before(req, route).is_err() { req.too_many_requests("Too many request. Please try again later.")?; } // Check if the user is allowed to access the route match route.scope { // "Normal" user route RouteScope::USER => { // Validate client token req.check_client_token()?; // Check user token, if required if route.need_login || req.has_post_parameter("token") { req.check_user_token()?; } } // "Admin" user scope RouteScope::ADMIN => { req.check_admin_origin()?; if route.need_login { req.check_admin_access_token()?; } if let Some(role) = route.admin_role { req.check_admin_has_role(role)?; } } } let (_, res) = find_route(route.uri, Some(req)).await; let res = res.unwrap(); requests_limit_helper::trigger_after(res.is_ok(), req, route)?; res } /// Process an incoming request async fn process_request(custom_req: CustomRequest) -> HttpResponse { let req = &custom_req.req; let (route, _) = find_route(&req.uri().to_string(), None).await; // Check if a route was found with the right verb if !route.as_ref().map(|r| (req.method() == http::Method::GET && r.method == GET) || (req.method() == http::Method::POST && r.method == POST) ).unwrap_or(false) { return HttpResponse::NotFound().json(HttpError::not_found("Route not found!")); } let route = route.unwrap(); // Clean requests limit requests_limit_helper::clean_cache().unwrap(); // Execute the request let mut request = HttpRequestHandler::new(custom_req.req, custom_req.body); match process_simple_route(&route, &mut request).await { // Set default error response if required Err(e) => { let err_msg = e.to_string(); if !request.has_response() { request.internal_error(e).unwrap_err(); } println!("{} - {} - {} - {}", request.remote_ip(), request.response_status_code(), request.request_path(), err_msg ); } // Set default success response if required Ok(_) => { if !request.has_response() { request.success("Success").unwrap() } println!("{} - {} - {}", request.remote_ip(), request.response_status_code(), request.request_path() ); } } // Send the response match request.response() { Ok(s) => s, Err(e) => { println!("Error while getting response: {}", e); HttpResponse::InternalServerError().body("Response error") } } } /// Handle pre-flight requests async fn handle_options_request(r: HttpRequest) -> HttpResponse { // Extract origin let origin = r.headers().get("Origin"); if origin.is_none() { return HttpResponse::BadRequest().body("Missing origin header!"); } let origin = origin.unwrap().to_str().unwrap_or("").to_string(); // Find client let client = api_helper::get_by_origin(&origin); if client.is_err() { eprintln!("Failed to handle OPTIONS request: {:#?}", client); return HttpResponse::Unauthorized().body("Unkown origin!"); } // Accept request HttpResponse::NoContent() .append_header(("Access-Control-Allow-Origin", origin)) .append_header(("Access-Control-Allow-Methods", "POST, GET, OPTIONS")) .finish() } /// Given the configuration, start the server pub fn start_server(conf: &Config) -> std::io::Result<()> { let sys = actix::System::new(); // Initialize limit helper requests_limit_helper::init(); admin_access_token_helper::init(); admin_key_registration_challenges_helper::init(); admin_key_authentication_challenges_helper::init(); let addr = conf.server_listen_address(); println!("Start to listen on http://{}/", addr); let serve_storage_file = conf.serve_storage_file; let server = HttpServer::new(move || { let mut app = App::new(); if serve_storage_file { app = app.service(actix_files::Files::new("/user_data", user_data_path("".as_ref()))); } // User WebSocket route app.service(actix_web::web::resource("/ws").to(user_ws_controller::ws_route)) // RTC Relay WebSocket route .service(actix_web::web::resource("/rtc_proxy/ws").to(rtc_relay_controller::open_ws)) // Option .route("{tail:.*}", web::method(http::Method::OPTIONS).to(handle_options_request)) // API routes .route("{tail:.*}", web::get().to(process_request)) .route("{tail:.*}", web::post().to(process_request)) }).bind(&addr)?.run(); sys.block_on(server)?; Ok(()) }