mirror of
				https://gitlab.com/comunic/comunicapiv3
				synced 2025-10-31 07:34:45 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			327 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			327 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| use std::collections::HashMap;
 | |
| use std::pin::Pin;
 | |
| 
 | |
| use actix_web::{App, FromRequest, http, HttpMessage, HttpRequest, HttpResponse, HttpServer, web};
 | |
| use actix_web::dev::{Decompress, Payload, PayloadStream};
 | |
| use actix_web::error::{ErrorBadRequest, ErrorInternalServerError, PayloadError};
 | |
| use actix_web::web::{Bytes, BytesMut, BufMut, Buf};
 | |
| use encoding_rs::UTF_8;
 | |
| use futures::{FutureExt, Stream, StreamExt};
 | |
| use futures::future::LocalBoxFuture;
 | |
| use futures::task::{Context, Poll};
 | |
| use percent_encoding::percent_decode_str;
 | |
| 
 | |
| use crate::api_data::http_error::HttpError;
 | |
| use crate::constants::MAX_REQUEST_SIZE;
 | |
| use crate::controllers::{rtc_relay_controller, user_ws_controller};
 | |
| use crate::routes::{get_routes, RequestResult, Route};
 | |
| use crate::routes::Method::{GET, POST};
 | |
| use crate::data::base_request_handler::{BaseRequestHandler, PostFile, RequestValue};
 | |
| use crate::data::config::Config;
 | |
| use crate::data::http_request_handler::HttpRequestHandler;
 | |
| use crate::helpers::requests_limit_helper;
 | |
| 
 | |
| /// Main server functions
 | |
| ///
 | |
| /// @author Pierre Hubert
 | |
| 
 | |
| /// Custom stream to give a limit to requests size
 | |
| struct LimitedStream {
 | |
|     stream: Box<dyn Stream<Item=Result<Bytes, PayloadError>> + Unpin + 'static>,
 | |
|     already_read: usize,
 | |
|     max_size: usize,
 | |
| }
 | |
| 
 | |
| impl<'a> Stream for LimitedStream
 | |
| {
 | |
|     type Item = Result<Bytes, PayloadError>;
 | |
| 
 | |
|     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
 | |
|         if self.already_read >= self.max_size {
 | |
|             return Poll::Ready(None);
 | |
|         }
 | |
| 
 | |
|         let res = Pin::new(self.stream.as_mut()).poll_next(cx);
 | |
| 
 | |
|         if let Poll::Ready(Some(Ok(d))) = &res {
 | |
|             self.already_read = self.already_read + d.len();
 | |
|         }
 | |
| 
 | |
|         res
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Custom request value
 | |
| struct CustomRequest {
 | |
|     req: web::HttpRequest,
 | |
|     body: HashMap<String, RequestValue>,
 | |
| }
 | |
| 
 | |
| /// Process in our way incoming requests
 | |
| impl FromRequest for CustomRequest {
 | |
|     type Error = actix_web::Error;
 | |
|     type Future = LocalBoxFuture<'static, Result<CustomRequest, actix_web::Error>>;
 | |
|     type Config = ();
 | |
| 
 | |
|     fn from_request(req: &HttpRequest, payload: &mut Payload<PayloadStream>) -> Self::Future {
 | |
|         let req = req.clone();
 | |
|         let payload = Box::new(payload.take());
 | |
| 
 | |
|         async move {
 | |
|             let content_length_size;
 | |
| 
 | |
|             // Check the size, if provided
 | |
|             if req.headers().contains_key("Content-Length") {
 | |
|                 if let Some(v) = req.headers().get("Content-Length") {
 | |
|                     content_length_size = String::from_utf8_lossy(v.as_bytes()).parse::<usize>().unwrap_or(0);
 | |
|                     if content_length_size > MAX_REQUEST_SIZE {
 | |
|                         return Err(actix_web::error::ErrorBadRequest("Request too big!"));
 | |
|                     }
 | |
|                 } else {
 | |
|                     unreachable!();
 | |
|                 }
 | |
|             } else {
 | |
|                 return Err(actix_web::error::ErrorBadRequest("Content-Length header is required!"));
 | |
|             }
 | |
| 
 | |
|             let payload = LimitedStream {
 | |
|                 stream: payload,
 | |
|                 already_read: 0,
 | |
|                 max_size: content_length_size,
 | |
|             };
 | |
| 
 | |
| 
 | |
|             let mut body_args = HashMap::new();
 | |
| 
 | |
|             // Process "application/x-www-form-urlencoded" requests
 | |
|             if req.content_type().eq("application/x-www-form-urlencoded") {
 | |
| 
 | |
|                 // Maximum size of the request
 | |
|                 let limit = 16384;
 | |
| 
 | |
|                 // Ready body
 | |
|                 let mut body = BytesMut::with_capacity(8192);
 | |
|                 let mut stream = Decompress::from_headers(payload, req.headers());
 | |
|                 while let Some(item) = stream.next().await {
 | |
|                     let chunk = item?;
 | |
| 
 | |
| 
 | |
|                     if body.len() + chunk.len() > limit {
 | |
|                         return Err(actix_web::error::ErrorBadRequest("Overflow - too long"));
 | |
|                     } else {
 | |
|                         body.extend_from_slice(&chunk);
 | |
|                     }
 | |
|                 }
 | |
|                 let body = body.freeze();
 | |
| 
 | |
| 
 | |
|                 // Decode body as a string
 | |
|                 let encoding = req.encoding()?;
 | |
| 
 | |
|                 let body_str = if encoding == UTF_8 {
 | |
|                     String::from_utf8_lossy(body.as_ref()).to_string()
 | |
|                 } else {
 | |
|                     encoding
 | |
|                         .decode_without_bom_handling_and_without_replacement(&body)
 | |
|                         .map(|s| s.into_owned())
 | |
|                         .ok_or_else(|| ErrorBadRequest("Can not decode body"))?
 | |
|                 };
 | |
| 
 | |
|                 // Parse body arguments (following the pattern key1=value1&key2=value2)
 | |
|                 if body_str.len() > 0 {
 | |
|                     for v in body_str.split("&") {
 | |
|                         if v.len() == 0 {
 | |
|                             continue;
 | |
|                         }
 | |
| 
 | |
|                         let args: Vec<&str> = v.split("=").collect();
 | |
| 
 | |
|                         if args.len() != 2 {
 | |
|                             return Err(actix_web::error::ErrorBadRequest(format!("{} is invalid!", args[0])));
 | |
|                         }
 | |
| 
 | |
|                         // Add the value to the body
 | |
|                         body_args.insert(
 | |
|                             percent_decode_str(args[0]).decode_utf8_lossy().to_string(),
 | |
|                             RequestValue::String(percent_decode_str(args[1]).decode_utf8_lossy().to_string()),
 | |
|                         );
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
| 
 | |
| 
 | |
|             // Process "multipart/form-data" request
 | |
|             else if req.content_type().starts_with("multipart/form-data") {
 | |
|                 let mut req = actix_multipart::Multipart::new(req.headers(), payload);
 | |
| 
 | |
|                 // Process the list of arguments
 | |
|                 while let Some(el) = req.next().await {
 | |
|                     let mut field = el?;
 | |
| 
 | |
|                     let content_type = field.content_disposition().ok_or(
 | |
|                         ErrorInternalServerError("Missing content type"))?;
 | |
|                     let name = content_type.get_name().ok_or(
 | |
|                         ErrorInternalServerError("Missing field name!"))?;
 | |
| 
 | |
|                     // Handle file upload
 | |
|                     if content_type.get_filename().is_some() {
 | |
|                         let filename = content_type.get_filename().unwrap();
 | |
|                         let mut buf = BytesMut::new();
 | |
| 
 | |
|                         while let Some(chunk) = field.next().await {
 | |
|                             let data = chunk.unwrap();
 | |
|                             buf.put(data);
 | |
|                         }
 | |
| 
 | |
|                         body_args.insert(name.to_string(),
 | |
|                                          RequestValue::File(PostFile {
 | |
|                                              name: filename.to_string(),
 | |
|                                              buff: buf.to_bytes(),
 | |
|                                          }));
 | |
|                     }
 | |
| 
 | |
|                     // It is a simple field
 | |
|                     else {
 | |
|                         let mut content = String::new();
 | |
| 
 | |
|                         // Get content
 | |
|                         while let Some(chunk) = field.next().await {
 | |
|                             content = format!("{}{}", content, String::from_utf8_lossy(chunk?.bytes()));
 | |
|                         }
 | |
| 
 | |
|                         body_args.insert(name.to_string(), RequestValue::String(content));
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             Ok(CustomRequest {
 | |
|                 req: req.clone(),
 | |
|                 body: body_args,
 | |
|             })
 | |
|         }.boxed_local()
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Process a "simple request" aka not a WebSocket request
 | |
| fn process_simple_route(route: &Route, req: &mut HttpRequestHandler) -> RequestResult {
 | |
|     if requests_limit_helper::trigger_before(req, route).is_err() {
 | |
|         req.too_many_requests("Too many request. Please try again later.")?;
 | |
|     }
 | |
| 
 | |
|     // Validate client token
 | |
|     req.check_client_token()?;
 | |
| 
 | |
|     // Check user token, if required
 | |
|     if route.need_login || req.has_post_parameter("userToken1") {
 | |
|         req.check_user_token()?;
 | |
|     }
 | |
| 
 | |
|     let res: RequestResult = (route.func)(req);
 | |
| 
 | |
|     requests_limit_helper::trigger_after(res.is_ok(), req, route)?;
 | |
| 
 | |
|     res
 | |
| }
 | |
| 
 | |
| /// Process an incoming request
 | |
| async fn process_request(custom_req: CustomRequest) -> HttpResponse {
 | |
|     let req = &custom_req.req;
 | |
|     let routes = get_routes();
 | |
| 
 | |
|     // We search the appropriate route for the request
 | |
|     let mut route: Option<&Route> = None;
 | |
|     for el in &routes {
 | |
| 
 | |
|         // Check verb
 | |
|         if !(req.method() == http::Method::GET && el.method == GET) &&
 | |
|             !(req.method() == http::Method::POST && el.method == POST) {
 | |
|             continue;
 | |
|         }
 | |
| 
 | |
|         // Check path
 | |
|         if !el.uri.eq(req.uri()) {
 | |
|             continue;
 | |
|         }
 | |
| 
 | |
|         route = Some(el);
 | |
|         break;
 | |
|     }
 | |
| 
 | |
|     // Check if a route was found
 | |
|     if let None = route {
 | |
|         return HttpResponse::NotFound().json(HttpError::not_found("Method not found!"));
 | |
|     }
 | |
|     let route = route.unwrap();
 | |
| 
 | |
|     // Clean requests limit
 | |
|     requests_limit_helper::clean_cache().unwrap();
 | |
| 
 | |
|     // Execute the request
 | |
|     let mut request = HttpRequestHandler::new(custom_req.req, custom_req.body);
 | |
| 
 | |
|     match process_simple_route(route, &mut request) {
 | |
| 
 | |
|         // Set default error response if required
 | |
|         Err(e) => {
 | |
|             let err_msg = e.to_string();
 | |
| 
 | |
|             if !request.has_response() {
 | |
|                 request.internal_error(e).unwrap_err();
 | |
|             }
 | |
| 
 | |
|             println!("{} - {} - {} - {}",
 | |
|                      request.remote_ip(),
 | |
|                      request.response_status_code(),
 | |
|                      request.request_path(),
 | |
|                      err_msg
 | |
|             );
 | |
|         }
 | |
| 
 | |
|         // Set default success response if required
 | |
|         Ok(_) => {
 | |
|             if !request.has_response() {
 | |
|                 request.success("Success").unwrap()
 | |
|             }
 | |
| 
 | |
|             println!("{} - {} - {}",
 | |
|                      request.remote_ip(),
 | |
|                      request.response_status_code(),
 | |
|                      request.request_path()
 | |
|             );
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     // Send the response
 | |
|     match request.response() {
 | |
|         Ok(s) => s,
 | |
|         Err(e) => {
 | |
|             println!("Error while getting response: {}", e);
 | |
|             HttpResponse::InternalServerError().body("Response error")
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| 
 | |
| /// Given the configuration, start the server
 | |
| pub async fn start_server(conf: &Config) -> std::io::Result<()> {
 | |
| 
 | |
|     // Initialize limit helper
 | |
|     requests_limit_helper::init();
 | |
| 
 | |
|     let addr = conf.server_listen_address();
 | |
|     println!("Start to listen on http://{}/", addr);
 | |
| 
 | |
|     HttpServer::new(|| {
 | |
|         App::new()
 | |
| 
 | |
|             // User WebSocket route
 | |
|             .service(actix_web::web::resource("/ws").to(user_ws_controller::ws_route))
 | |
| 
 | |
|             // RTC Relay WebSocket route
 | |
|             .service(actix_web::web::resource("/rtc_proxy/ws").to(rtc_relay_controller::open_ws))
 | |
| 
 | |
|             // API routes
 | |
|             .route("**", web::get().to(process_request))
 | |
|             .route("**", web::post().to(process_request))
 | |
|     }).bind(&addr)?.run().await
 | |
| } |