use std::collections::HashMap; use std::pin::Pin; use actix_web::{App, FromRequest, http, HttpMessage, HttpRequest, HttpResponse, HttpServer, web}; use actix_web::dev::{Decompress, Payload, PayloadStream}; use actix_web::error::{ErrorBadRequest, ErrorInternalServerError, PayloadError}; use actix_web::web::{Bytes, BytesMut}; use bytes::{Buf, BufMut}; use encoding_rs::UTF_8; use futures::{FutureExt, Stream, StreamExt}; use futures::future::LocalBoxFuture; use futures::task::{Context, Poll}; use percent_encoding::percent_decode_str; use crate::api_data::http_error::HttpError; use crate::constants::MAX_REQUEST_SIZE; use crate::controllers::{rtc_relay_controller, user_ws_controller}; use crate::controllers::routes::{get_routes, RequestResult, Route}; use crate::controllers::routes::Method::{GET, POST}; use crate::data::base_request_handler::{BaseRequestHandler, PostFile, RequestValue}; use crate::data::config::Config; use crate::data::http_request_handler::HttpRequestHandler; use crate::helpers::requests_limit_helper; /// Main server functions /// /// @author Pierre Hubert /// Custom stream to give a limit to requests size struct LimitedStream { stream: Box> + Unpin + 'static>, already_read: usize, max_size: usize, } impl<'a> Stream for LimitedStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.already_read >= self.max_size { return Poll::Ready(None); } let res = Pin::new(self.stream.as_mut()).poll_next(cx); if let Poll::Ready(Some(Ok(d))) = &res { self.already_read = self.already_read + d.len(); } res } } /// Custom request value struct CustomRequest { req: web::HttpRequest, body: HashMap, } /// Process in our way incoming requests impl FromRequest for CustomRequest { type Error = actix_web::Error; type Future = LocalBoxFuture<'static, Result>; type Config = (); fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { let req = req.clone(); let payload = Box::new(payload.take()); async move { let content_length_size; // Check the size, if provided if req.headers().contains_key("Content-Length") { if let Some(v) = req.headers().get("Content-Length") { content_length_size = String::from_utf8_lossy(v.as_bytes()).parse::().unwrap_or(0); if content_length_size > MAX_REQUEST_SIZE { return Err(actix_web::error::ErrorBadRequest("Request too big!")); } } else { unreachable!(); } } else { return Err(actix_web::error::ErrorBadRequest("Content-Length header is required!")); } let payload = LimitedStream { stream: payload, already_read: 0, max_size: content_length_size, }; let mut body_args = HashMap::new(); // Process "application/x-www-form-urlencoded" requests if req.content_type().eq("application/x-www-form-urlencoded") { // Maximum size of the request let limit = 16384; // Ready body let mut body = BytesMut::with_capacity(8192); let mut stream = Decompress::from_headers(payload, req.headers()); while let Some(item) = stream.next().await { let chunk = item?; if body.len() + chunk.len() > limit { return Err(actix_web::error::ErrorBadRequest("Overflow - too long")); } else { body.extend_from_slice(&chunk); } } let body = body.freeze(); // Decode body as a string let encoding = req.encoding()?; let body_str = if encoding == UTF_8 { String::from_utf8_lossy(body.as_ref()).to_string() } else { encoding .decode_without_bom_handling_and_without_replacement(&body) .map(|s| s.into_owned()) .ok_or_else(|| ErrorBadRequest("Can not decode body"))? }; // Parse body arguments (following the pattern key1=value1&key2=value2) if body_str.len() > 0 { for v in body_str.split("&") { if v.len() == 0 { continue; } let args: Vec<&str> = v.split("=").collect(); if args.len() != 2 { return Err(actix_web::error::ErrorBadRequest(format!("{} is invalid!", args[0]))); } // Add the value to the body body_args.insert( percent_decode_str(args[0]).decode_utf8_lossy().to_string(), RequestValue::String(percent_decode_str(args[1]).decode_utf8_lossy().to_string()), ); } } } // Process "multipart/form-data" request else if req.content_type().starts_with("multipart/form-data") { let mut req = actix_multipart::Multipart::new(req.headers(), payload); // Process the list of arguments while let Some(el) = req.next().await { let mut field = el?; let content_type = field.content_disposition().ok_or( ErrorInternalServerError("Missing content type"))?; let name = content_type.get_name().ok_or( ErrorInternalServerError("Missing field name!"))?; // Handle file upload if content_type.get_filename().is_some() { let filename = content_type.get_filename().unwrap(); let mut buf = BytesMut::new(); while let Some(chunk) = field.next().await { let data = chunk.unwrap(); buf.put(data); } body_args.insert(name.to_string(), RequestValue::File(PostFile { name: filename.to_string(), buff: buf.to_bytes(), })); } // It is a simple field else { let mut content = String::new(); // Get content while let Some(chunk) = field.next().await { content = format!("{}{}", content, String::from_utf8_lossy(chunk?.bytes())); } body_args.insert(name.to_string(), RequestValue::String(content)); } } } Ok(CustomRequest { req: req.clone(), body: body_args, }) }.boxed_local() } } /// Process a "simple request" aka not a WebSocket request fn process_simple_route(route: &Route, req: &mut HttpRequestHandler) -> RequestResult { if requests_limit_helper::trigger_before(req, route).is_err() { req.too_many_requests("Too many request. Please try again later.")?; } // Validate client token req.check_client_token()?; // Check user token, if required if route.need_login || req.has_post_parameter("userToken1") { req.check_user_token()?; } let res: RequestResult = (route.func)(req); requests_limit_helper::trigger_after(res.is_ok(), req, route)?; res } /// Process an incoming request async fn process_request(custom_req: CustomRequest) -> HttpResponse { let req = &custom_req.req; let routes = get_routes(); // We search the appropriate route for the request let mut route: Option<&Route> = None; for el in &routes { // Check verb if !(req.method() == http::Method::GET && el.method == GET) && !(req.method() == http::Method::POST && el.method == POST) { continue; } // Check path if !el.uri.eq(req.uri()) { continue; } route = Some(el); break; } // Check if a route was found if let None = route { return HttpResponse::NotFound().json(HttpError::not_found("Method not found!")); } let route = route.unwrap(); // Clean requests limit requests_limit_helper::clean_cache().unwrap(); // Execute the request let mut request = HttpRequestHandler::new(custom_req.req, custom_req.body); match process_simple_route(route, &mut request) { // Set default error response if required Err(e) => { let err_msg = e.to_string(); if !request.has_response() { request.internal_error(e).unwrap_err(); } println!("{} - {} - {} - {}", request.remote_ip(), request.response_status_code(), request.request_path(), err_msg ); } // Set default success response if required Ok(_) => { if !request.has_response() { request.success("Success").unwrap() } println!("{} - {} - {}", request.remote_ip(), request.response_status_code(), request.request_path() ); } } // Send the response match request.response() { Ok(s) => s, Err(e) => { println!("Error while getting response: {}", e); HttpResponse::InternalServerError().body("Response error") } } } /// Given the configuration, start the server pub async fn start_server(conf: &Config) -> std::io::Result<()> { // Initialize limit helper requests_limit_helper::init(); let addr = conf.server_listen_address(); println!("Start to listen on http://{}/", addr); HttpServer::new(|| { App::new() // User WebSocket route .service(actix_web::web::resource("/ws").to(user_ws_controller::ws_route)) // RTC Relay WebSocket route .service(actix_web::web::resource("/rtc_proxy/ws").to(rtc_relay_controller::open_ws)) // API routes .route("**", web::get().to(process_request)) .route("**", web::post().to(process_request)) }).bind(&addr)?.run().await }