From ddce4062c77f1547bd4f72e5c72c0d6742c2836e Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Sat, 23 Jan 2021 09:44:34 +0100 Subject: [PATCH] Make limit system lives --- src/controllers/routes.rs | 15 ++++ src/controllers/server.rs | 9 ++- src/data/http_request_handler.rs | 7 ++ src/helpers/requests_limit_helper.rs | 101 +++++++++++++++++++++++---- 4 files changed, 116 insertions(+), 16 deletions(-) diff --git a/src/controllers/routes.rs b/src/controllers/routes.rs index 1b2baaa..bf7e5c0 100644 --- a/src/controllers/routes.rs +++ b/src/controllers/routes.rs @@ -34,6 +34,21 @@ pub enum LimitPolicy { ANY(u64), } +impl LimitPolicy { + pub fn is_none(&self) -> bool { + matches!(self, LimitPolicy::NONE) + } + + pub fn get_count(&self) -> u64 { + match self { + LimitPolicy::NONE => 0, + LimitPolicy::SUCCESS(n) => n.clone(), + LimitPolicy::FAILURE(n) => n.clone(), + LimitPolicy::ANY(n) => n.clone(), + } + } +} + /// Define types pub type RequestResult = Result<(), Box>; pub type RequestProcess = Box RequestResult>; diff --git a/src/controllers/server.rs b/src/controllers/server.rs index 9739379..4e3a35b 100644 --- a/src/controllers/server.rs +++ b/src/controllers/server.rs @@ -196,6 +196,9 @@ impl FromRequest for CustomRequest { /// Process a "simple request" aka not a WebSocket request fn process_simple_route(route: &Route, req: &mut HttpRequestHandler) -> RequestResult { + if requests_limit_helper::trigger_before(req, route).is_err() { + req.too_many_requests("Too many request. Please try again later.")?; + } // Validate client token req.check_client_token()?; @@ -205,7 +208,11 @@ fn process_simple_route(route: &Route, req: &mut HttpRequestHandler) -> RequestR req.check_user_token()?; } - (route.func)(req) + let res: RequestResult = (route.func)(req); + + requests_limit_helper::trigger_after(res.is_ok(), req, route)?; + + res } /// Process an incoming request diff --git a/src/data/http_request_handler.rs b/src/data/http_request_handler.rs index 8dd120d..5468125 100644 --- a/src/data/http_request_handler.rs +++ b/src/data/http_request_handler.rs @@ -146,6 +146,13 @@ impl HttpRequestHandler { Err(Box::new(ExecError::new(&message))) } + /// Too many requests (429) + pub fn too_many_requests(&mut self, message: &str) -> RequestResult { + self.response = Some(HttpResponse::TooManyRequests().json( + HttpError::new(429, message))); + Err(Box::new(ExecError::new(message))) + } + /// If result is not OK, return a bad request pub fn ok_or_bad_request(&mut self, res: ResultBoxError, msg: &str) -> ResultBoxError { match res { diff --git a/src/helpers/requests_limit_helper.rs b/src/helpers/requests_limit_helper.rs index 1280724..eb1ff7d 100644 --- a/src/helpers/requests_limit_helper.rs +++ b/src/helpers/requests_limit_helper.rs @@ -2,26 +2,32 @@ //! //! Handle the limitation of requests, depending on threshold criterias + use std::sync::{Arc, Mutex}; use crate::constants::LIMIT_COUNTER_LIFETIME; -use crate::data::error::ResultBoxError; +use crate::controllers::routes::{LimitPolicy, Route}; +use crate::data::error::{ExecError, ResultBoxError}; +use crate::data::http_request_handler::HttpRequestHandler; use crate::utils::date_utils; +use crate::utils::date_utils::time; /// Information about a IP address limitation struct IpInfo { + ip: String, time_start: u64, + uri: String, count: u64, } /// Limits cache -type Cache = Arc>; +type Cache = Vec; static mut LIMITS_CACHE: Option>> = None; /// Initialize limit cache storage pub fn init() { - let limits_cache = Arc::new(dashmap::DashMap::new()); + let limits_cache = Vec::new(); let limits_cache = Some(Arc::new(Mutex::new(limits_cache))); unsafe { @@ -30,12 +36,11 @@ pub fn init() { } /// Get access to the cache. This resource must absolutely be released as quickly as possible -fn get_cache() -> ResultBoxError { - let cache: Cache; +fn get_cache() -> ResultBoxError>> { + let cache; unsafe { - let guard = LIMITS_CACHE.as_ref().unwrap().lock(); - cache = guard.unwrap().clone(); + cache = LIMITS_CACHE.as_ref().unwrap().clone(); } Ok(cache) @@ -45,16 +50,82 @@ fn get_cache() -> ResultBoxError { pub fn clean_cache() -> ResultBoxError { let time = date_utils::time(); - let cache = get_cache()?; - let obsolete_ips: Vec = cache - .iter() - .filter(|k| k.time_start + LIMIT_COUNTER_LIFETIME < time) - .map(|k| k.key().to_string()) - .collect(); + let mut i = 0; - for obsolete_ip in obsolete_ips { - cache.remove(&obsolete_ip); + let cache = get_cache()?; + let mut cache = cache.lock().unwrap(); + + while i < cache.len() { + if cache[i].time_start + LIMIT_COUNTER_LIFETIME < time { + cache.remove(i); + } else { + i = i + 1; + } } + Ok(()) +} + +/// Trigger limit helper at the beginning of requests +pub fn trigger_before(req: &HttpRequestHandler, route: &Route) -> ResultBoxError { + if route.limit_policy.is_none() { + return Ok(()); + } + + let max_count = route.limit_policy.get_count(); + let ip = req.remote_ip(); + + let cache = get_cache()?; + let found = cache.lock().unwrap() + .iter() + .find( + |k| k.uri.eq(route.uri) + && k.count >= max_count + && k.ip.eq(&ip)) + .is_some(); + + if found { + return Err(ExecError::boxed_new("Limit exceeded!")); + } + + Ok(()) +} + +/// Trigger limit at the end of the request +pub fn trigger_after(is_success: bool, req: &HttpRequestHandler, route: &Route) -> ResultBoxError { + let need_trigger = match (&route.limit_policy, is_success) + { + (LimitPolicy::NONE, _) => false, + (LimitPolicy::ANY(_), _) => true, + (LimitPolicy::SUCCESS(_), res) => res, + (LimitPolicy::FAILURE(_), res) => !res, + }; + + if !need_trigger { + return Ok(()); + } + + let ip = req.remote_ip(); + + let cache = get_cache()?; + let mut cache = cache.lock().unwrap(); + + // We search for existing entry + for i in 0..cache.len() { + if cache[i].ip.eq(&ip) && cache[i].uri.eq(route.uri) { + cache[i].count += 1; + + return Ok(()); + } + } + + // Otherwise we must add the entry to the table + cache.push(IpInfo { + ip, + time_start: time(), + uri: route.uri.to_string(), + count: 1, + }); + Ok(()) } \ No newline at end of file