diff --git a/central_backend/src/app_config.rs b/central_backend/src/app_config.rs index 8f159c8..73fced1 100644 --- a/central_backend/src/app_config.rs +++ b/central_backend/src/app_config.rs @@ -81,9 +81,13 @@ pub struct AppConfig { pub production_margin: i32, /// Energy refresh operations interval, in seconds - #[arg(short('i'), long, env, default_value_t = 20)] + #[arg(short('i'), long, env, default_value_t = 25)] pub refresh_interval: u64, + /// Energy refresh operations interval, in seconds + #[arg(short('f'), long, env, default_value_t = 5)] + pub energy_fetch_interval: u64, + /// Consumption backend provider #[clap(subcommand)] pub consumption_backend: Option, diff --git a/central_backend/src/energy/consumption_cache.rs b/central_backend/src/energy/consumption_cache.rs new file mode 100644 index 0000000..fb4aa8e --- /dev/null +++ b/central_backend/src/energy/consumption_cache.rs @@ -0,0 +1,85 @@ +use crate::constants; +use crate::energy::consumption::EnergyConsumption; +use log::log; + +pub struct ConsumptionCache { + nb_vals: usize, + values: Vec, +} + +impl ConsumptionCache { + pub fn new(nb_vals: usize) -> Self { + Self { + nb_vals, + values: vec![], + } + } + + pub fn add_value(&mut self, value: EnergyConsumption) { + if self.values.len() >= self.nb_vals { + self.values.remove(0); + } + + self.values.push(value); + } + + pub fn median_value(&self) -> EnergyConsumption { + if self.values.is_empty() { + return constants::FALLBACK_PRODUCTION_VALUE; + } + + let mut clone = self.values.clone(); + clone.sort(); + let median = clone[clone.len() / 2]; + + log::info!("Cached consumption: {:?} / Median: {}", self.values, median); + + median + } +} + +#[cfg(test)] +pub mod test { + use crate::constants; + use crate::energy::consumption_cache::ConsumptionCache; + + #[test] + fn empty_vec() { + let cache = ConsumptionCache::new(10); + assert_eq!(cache.median_value(), constants::FALLBACK_PRODUCTION_VALUE); + } + + #[test] + fn single_value() { + let mut cache = ConsumptionCache::new(10); + cache.add_value(-10); + assert_eq!(cache.median_value(), -10); + } + + #[test] + fn four_values() { + let mut cache = ConsumptionCache::new(10); + cache.add_value(50); + cache.add_value(-10); + cache.add_value(-10); + cache.add_value(-10000); + assert_eq!(cache.median_value(), -10); + } + + #[test] + fn many_values() { + let mut cache = ConsumptionCache::new(6); + + for i in 0..1000 { + cache.add_value(-i); + } + + cache.add_value(10); + cache.add_value(50); + cache.add_value(-10); + cache.add_value(-10); + cache.add_value(-30); + cache.add_value(-10000); + assert_eq!(cache.median_value(), -10); + } +} diff --git a/central_backend/src/energy/energy_actor.rs b/central_backend/src/energy/energy_actor.rs index 766eca7..57f9594 100644 --- a/central_backend/src/energy/energy_actor.rs +++ b/central_backend/src/energy/energy_actor.rs @@ -6,6 +6,7 @@ use crate::devices::device::{ use crate::devices::devices_list::DevicesList; use crate::energy::consumption; use crate::energy::consumption::EnergyConsumption; +use crate::energy::consumption_cache::ConsumptionCache; use crate::energy::engine::EnergyEngine; use crate::utils::time_utils::time_secs; use actix::prelude::*; @@ -13,34 +14,55 @@ use openssl::x509::X509Req; use std::time::Duration; pub struct EnergyActor { - curr_consumption: EnergyConsumption, + consumption_cache: ConsumptionCache, devices: DevicesList, engine: EnergyEngine, + last_engine_refresh: u64, } impl EnergyActor { pub async fn new() -> anyhow::Result { + let consumption_cache_size = + AppConfig::get().refresh_interval / AppConfig::get().energy_fetch_interval; + let curr_consumption = consumption::get_curr_consumption().await?; + let mut consumption_cache = ConsumptionCache::new(consumption_cache_size as usize); + consumption_cache.add_value(curr_consumption); + + if consumption_cache_size < 1 { + panic!("Energy fetch interval must be equal or smaller than refresh interval!"); + } + Ok(Self { - curr_consumption: consumption::get_curr_consumption().await?, + consumption_cache, devices: DevicesList::load()?, engine: EnergyEngine::default(), + last_engine_refresh: 0, }) } async fn refresh(&mut self) -> anyhow::Result<()> { // Refresh energy - self.curr_consumption = consumption::get_curr_consumption() - .await - .unwrap_or_else(|e| { - log::error!( + self.consumption_cache + .add_value( + consumption::get_curr_consumption() + .await + .unwrap_or_else(|e| { + log::error!( "Failed to fetch latest consumption value, will use fallback value! {e}" ); - constants::FALLBACK_PRODUCTION_VALUE - }); + constants::FALLBACK_PRODUCTION_VALUE + }), + ); + + if self.last_engine_refresh + AppConfig::get().refresh_interval > time_secs() { + return Ok(()); + } + self.last_engine_refresh = time_secs(); let devices_list = self.devices.full_list(); - self.engine.refresh(self.curr_consumption, &devices_list); + self.engine + .refresh(self.consumption_cache.median_value(), &devices_list); self.engine.persist_relays_state(&devices_list)?; @@ -55,7 +77,7 @@ impl Actor for EnergyActor { log::info!("Energy actor successfully started!"); ctx.run_interval( - Duration::from_secs(AppConfig::get().refresh_interval), + Duration::from_secs(AppConfig::get().energy_fetch_interval), |act, _ctx| { log::info!("Performing energy refresh operation"); if let Err(e) = futures::executor::block_on(act.refresh()) { @@ -81,7 +103,7 @@ impl Handler for EnergyActor { type Result = EnergyConsumption; fn handle(&mut self, _msg: GetCurrConsumption, _ctx: &mut Context) -> Self::Result { - self.curr_consumption + self.consumption_cache.median_value() } } diff --git a/central_backend/src/energy/mod.rs b/central_backend/src/energy/mod.rs index 2922113..5ef5270 100644 --- a/central_backend/src/energy/mod.rs +++ b/central_backend/src/energy/mod.rs @@ -1,4 +1,5 @@ pub mod consumption; +pub mod consumption_cache; pub mod energy_actor; pub mod engine; pub mod relay_state_history;