1
0
mirror of https://gitlab.com/comunic/comunicapiv3 synced 2025-01-09 20:22:27 +00:00
comunicapiv3/src/controllers/server.rs

328 lines
11 KiB
Rust
Raw Normal View History

2020-06-20 08:31:58 +00:00
use std::collections::HashMap;
use std::pin::Pin;
2020-05-21 13:28:07 +00:00
2020-06-20 08:31:58 +00:00
use actix_web::{App, FromRequest, http, HttpMessage, HttpRequest, HttpResponse, HttpServer, web};
use actix_web::dev::{Decompress, Payload, PayloadStream};
2020-06-20 09:37:29 +00:00
use actix_web::error::{ErrorBadRequest, ErrorInternalServerError, PayloadError};
2020-06-20 08:31:58 +00:00
use actix_web::web::{Bytes, BytesMut};
2020-06-20 09:37:29 +00:00
use bytes::{Buf, BufMut};
2020-05-23 07:37:21 +00:00
use encoding_rs::UTF_8;
2020-06-20 08:31:58 +00:00
use futures::{FutureExt, Stream, StreamExt};
use futures::future::LocalBoxFuture;
use futures::task::{Context, Poll};
use percent_encoding::percent_decode_str;
2020-05-23 07:37:21 +00:00
2020-06-20 08:31:58 +00:00
use crate::api_data::http_error::HttpError;
use crate::constants::MAX_REQUEST_SIZE;
2021-02-08 16:20:03 +00:00
use crate::controllers::{rtc_relay_controller, user_ws_controller};
2020-06-20 08:31:58 +00:00
use crate::controllers::routes::{get_routes, RequestResult, Route};
use crate::controllers::routes::Method::{GET, POST};
2021-02-05 08:11:30 +00:00
use crate::data::base_request_handler::{BaseRequestHandler, PostFile, RequestValue};
2020-06-20 08:31:58 +00:00
use crate::data::config::Config;
2021-02-05 08:11:30 +00:00
use crate::data::http_request_handler::HttpRequestHandler;
2021-01-22 18:39:14 +00:00
use crate::helpers::requests_limit_helper;
2020-05-21 13:28:07 +00:00
/// Main server functions
///
/// @author Pierre Hubert
2020-06-20 08:47:06 +00:00
/// Custom stream to give a limit to requests size
2020-06-20 08:31:58 +00:00
struct LimitedStream {
2020-06-20 08:47:06 +00:00
stream: Box<dyn Stream<Item=Result<Bytes, PayloadError>> + Unpin + 'static>,
already_read: usize,
2021-01-26 17:24:47 +00:00
max_size: usize,
2020-06-20 08:31:58 +00:00
}
impl<'a> Stream for LimitedStream
{
type Item = Result<Bytes, PayloadError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2021-01-26 17:24:47 +00:00
if self.already_read >= self.max_size {
2020-06-20 08:47:06 +00:00
return Poll::Ready(None);
}
let res = Pin::new(self.stream.as_mut()).poll_next(cx);
if let Poll::Ready(Some(Ok(d))) = &res {
self.already_read = self.already_read + d.len();
}
res
2020-06-20 08:31:58 +00:00
}
}
2020-05-23 07:37:21 +00:00
/// Custom request value
struct CustomRequest {
req: web::HttpRequest,
body: HashMap<String, RequestValue>,
}
/// Process in our way incoming requests
impl FromRequest for CustomRequest {
type Error = actix_web::Error;
type Future = LocalBoxFuture<'static, Result<CustomRequest, actix_web::Error>>;
type Config = ();
fn from_request(req: &HttpRequest, payload: &mut Payload<PayloadStream>) -> Self::Future {
let req = req.clone();
2020-06-20 08:31:58 +00:00
let payload = Box::new(payload.take());
2020-05-23 07:37:21 +00:00
async move {
2021-01-26 17:24:47 +00:00
let content_length_size;
2020-06-20 08:31:58 +00:00
// Check the size, if provided
if req.headers().contains_key("Content-Length") {
if let Some(v) = req.headers().get("Content-Length") {
2021-01-26 17:24:47 +00:00
content_length_size = String::from_utf8_lossy(v.as_bytes()).parse::<usize>().unwrap_or(0);
if content_length_size > MAX_REQUEST_SIZE {
2020-06-20 08:31:58 +00:00
return Err(actix_web::error::ErrorBadRequest("Request too big!"));
}
2021-01-26 17:24:47 +00:00
} else {
unreachable!();
2020-06-20 08:31:58 +00:00
}
2021-01-26 17:24:47 +00:00
} else {
return Err(actix_web::error::ErrorBadRequest("Content-Length header is required!"));
2020-06-20 08:31:58 +00:00
}
2021-01-26 17:24:47 +00:00
let payload = LimitedStream {
stream: payload,
already_read: 0,
max_size: content_length_size,
};
2020-06-20 08:31:58 +00:00
2020-05-23 07:37:21 +00:00
let mut body_args = HashMap::new();
// Process "application/x-www-form-urlencoded" requests
if req.content_type().eq("application/x-www-form-urlencoded") {
// Maximum size of the request
let limit = 16384;
// Ready body
let mut body = BytesMut::with_capacity(8192);
let mut stream = Decompress::from_headers(payload, req.headers());
while let Some(item) = stream.next().await {
let chunk = item?;
if body.len() + chunk.len() > limit {
return Err(actix_web::error::ErrorBadRequest("Overflow - too long"));
} else {
body.extend_from_slice(&chunk);
}
}
let body = body.freeze();
// Decode body as a string
let encoding = req.encoding()?;
let body_str = if encoding == UTF_8 {
String::from_utf8_lossy(body.as_ref()).to_string()
} else {
encoding
.decode_without_bom_handling_and_without_replacement(&body)
.map(|s| s.into_owned())
.ok_or_else(|| ErrorBadRequest("Can not decode body"))?
};
// Parse body arguments (following the pattern key1=value1&key2=value2)
if body_str.len() > 0 {
for v in body_str.split("&") {
if v.len() == 0 {
continue;
}
let args: Vec<&str> = v.split("=").collect();
if args.len() != 2 {
return Err(actix_web::error::ErrorBadRequest(format!("{} is invalid!", args[0])));
}
// Add the value to the body
body_args.insert(
2020-05-23 07:54:13 +00:00
percent_decode_str(args[0]).decode_utf8_lossy().to_string(),
2020-06-20 09:37:29 +00:00
RequestValue::String(percent_decode_str(args[1]).decode_utf8_lossy().to_string()),
2020-05-23 07:37:21 +00:00
);
}
}
}
2020-06-20 09:37:29 +00:00
2020-06-20 08:31:58 +00:00
// Process "multipart/form-data" request
else if req.content_type().starts_with("multipart/form-data") {
let mut req = actix_multipart::Multipart::new(req.headers(), payload);
2020-06-20 09:37:29 +00:00
// Process the list of arguments
2020-06-20 08:31:58 +00:00
while let Some(el) = req.next().await {
let mut field = el?;
2020-06-20 09:37:29 +00:00
let content_type = field.content_disposition().ok_or(
ErrorInternalServerError("Missing content type"))?;
2020-06-20 09:37:29 +00:00
let name = content_type.get_name().ok_or(
ErrorInternalServerError("Missing field name!"))?;
// Handle file upload
if content_type.get_filename().is_some() {
2020-06-20 09:37:29 +00:00
let filename = content_type.get_filename().unwrap();
let mut buf = BytesMut::new();
while let Some(chunk) = field.next().await {
let data = chunk.unwrap();
buf.put(data);
}
body_args.insert(name.to_string(),
2020-06-20 13:22:41 +00:00
RequestValue::File(PostFile {
name: filename.to_string(),
buff: buf.to_bytes(),
}));
2020-06-20 09:37:29 +00:00
}
// It is a simple field
else {
let mut content = String::new();
// Get content
while let Some(chunk) = field.next().await {
content = format!("{}{}", content, String::from_utf8_lossy(chunk?.bytes()));
}
body_args.insert(name.to_string(), RequestValue::String(content));
}
2020-06-20 08:31:58 +00:00
}
}
2020-05-23 07:37:21 +00:00
Ok(CustomRequest {
req: req.clone(),
body: body_args,
})
}.boxed_local()
}
}
/// Process a "simple request" aka not a WebSocket request
fn process_simple_route(route: &Route, req: &mut HttpRequestHandler) -> RequestResult {
2021-01-23 08:44:34 +00:00
if requests_limit_helper::trigger_before(req, route).is_err() {
req.too_many_requests("Too many request. Please try again later.")?;
}
2020-05-23 07:37:21 +00:00
// Validate client token
req.check_client_token()?;
2020-05-24 15:57:47 +00:00
// Check user token, if required
if route.need_login || req.has_post_parameter("userToken1") {
req.check_user_token()?;
}
2021-01-23 08:44:34 +00:00
let res: RequestResult = (route.func)(req);
requests_limit_helper::trigger_after(res.is_ok(), req, route)?;
res
2020-05-23 07:37:21 +00:00
}
2020-05-21 13:28:07 +00:00
/// Process an incoming request
2020-05-23 07:37:21 +00:00
async fn process_request(custom_req: CustomRequest) -> HttpResponse {
let req = &custom_req.req;
2020-05-21 13:28:07 +00:00
let routes = get_routes();
// We search the appropriate route for the request
let mut route: Option<&Route> = None;
for el in &routes {
// Check verb
if !(req.method() == http::Method::GET && el.method == GET) &&
!(req.method() == http::Method::POST && el.method == POST) {
continue;
}
// Check path
if !el.uri.eq(req.uri()) {
continue;
}
route = Some(el);
break;
}
// Check if a route was found
if let None = route {
return HttpResponse::NotFound().json(HttpError::not_found("Method not found!"));
}
let route = route.unwrap();
2021-01-22 18:39:14 +00:00
// Clean requests limit
requests_limit_helper::clean_cache().unwrap();
2020-05-21 13:28:07 +00:00
// Execute the request
2020-05-23 07:37:21 +00:00
let mut request = HttpRequestHandler::new(custom_req.req, custom_req.body);
2020-05-21 13:28:07 +00:00
2020-05-23 07:37:21 +00:00
match process_simple_route(route, &mut request) {
2020-05-21 13:28:07 +00:00
// Set default error response if required
Err(e) => {
2020-06-20 08:31:58 +00:00
let err_msg = e.to_string();
2020-05-23 07:54:13 +00:00
2020-05-21 13:28:07 +00:00
if !request.has_response() {
request.internal_error(e).unwrap_err();
}
2020-05-23 07:54:13 +00:00
2021-01-23 07:19:10 +00:00
println!("{} - {} - {} - {}",
request.remote_ip(),
2020-05-23 07:54:13 +00:00
request.response_status_code(),
request.request_path(),
err_msg
);
2020-05-21 13:28:07 +00:00
}
// Set default success response if required
Ok(_) => {
if !request.has_response() {
request.success("Success").unwrap()
}
2020-05-23 07:54:13 +00:00
2021-01-23 07:19:10 +00:00
println!("{} - {} - {}",
request.remote_ip(),
request.response_status_code(),
request.request_path()
);
2020-05-21 13:28:07 +00:00
}
}
2020-05-23 12:08:22 +00:00
// Send the response
match request.response() {
Ok(s) => s,
Err(e) => {
println!("Error while getting response: {}", e);
HttpResponse::InternalServerError().body("Response error")
}
}
2020-05-21 13:28:07 +00:00
}
/// Given the configuration, start the server
pub async fn start_server(conf: &Config) -> std::io::Result<()> {
2021-01-22 18:39:14 +00:00
// Initialize limit helper
requests_limit_helper::init();
2020-05-21 13:28:07 +00:00
let addr = conf.server_listen_address();
println!("Start to listen on http://{}/", addr);
HttpServer::new(|| {
App::new()
2021-02-05 12:21:10 +00:00
// User WebSocket route
.service(actix_web::web::resource("/ws").to(user_ws_controller::ws_route))
2021-02-08 16:20:03 +00:00
// RTC Relay WebSocket route
.service(actix_web::web::resource("/rtc_proxy/ws").to(rtc_relay_controller::open_ws))
2021-02-05 12:21:10 +00:00
// API routes
2020-05-21 13:28:07 +00:00
.route("**", web::get().to(process_request))
.route("**", web::post().to(process_request))
}).bind(&addr)?.run().await
}