Use cached consumption value
This commit is contained in:
		@@ -81,9 +81,13 @@ pub struct AppConfig {
 | 
			
		||||
    pub production_margin: i32,
 | 
			
		||||
 | 
			
		||||
    /// Energy refresh operations interval, in seconds
 | 
			
		||||
    #[arg(short('i'), long, env, default_value_t = 20)]
 | 
			
		||||
    #[arg(short('i'), long, env, default_value_t = 25)]
 | 
			
		||||
    pub refresh_interval: u64,
 | 
			
		||||
 | 
			
		||||
    /// Energy refresh operations interval, in seconds
 | 
			
		||||
    #[arg(short('f'), long, env, default_value_t = 5)]
 | 
			
		||||
    pub energy_fetch_interval: u64,
 | 
			
		||||
 | 
			
		||||
    /// Consumption backend provider
 | 
			
		||||
    #[clap(subcommand)]
 | 
			
		||||
    pub consumption_backend: Option<ConsumptionBackend>,
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										85
									
								
								central_backend/src/energy/consumption_cache.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										85
									
								
								central_backend/src/energy/consumption_cache.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,85 @@
 | 
			
		||||
use crate::constants;
 | 
			
		||||
use crate::energy::consumption::EnergyConsumption;
 | 
			
		||||
use log::log;
 | 
			
		||||
 | 
			
		||||
pub struct ConsumptionCache {
 | 
			
		||||
    nb_vals: usize,
 | 
			
		||||
    values: Vec<EnergyConsumption>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ConsumptionCache {
 | 
			
		||||
    pub fn new(nb_vals: usize) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            nb_vals,
 | 
			
		||||
            values: vec![],
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn add_value(&mut self, value: EnergyConsumption) {
 | 
			
		||||
        if self.values.len() >= self.nb_vals {
 | 
			
		||||
            self.values.remove(0);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        self.values.push(value);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn median_value(&self) -> EnergyConsumption {
 | 
			
		||||
        if self.values.is_empty() {
 | 
			
		||||
            return constants::FALLBACK_PRODUCTION_VALUE;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let mut clone = self.values.clone();
 | 
			
		||||
        clone.sort();
 | 
			
		||||
        let median = clone[clone.len() / 2];
 | 
			
		||||
 | 
			
		||||
        log::info!("Cached consumption: {:?} / Median: {}", self.values, median);
 | 
			
		||||
 | 
			
		||||
        median
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
pub mod test {
 | 
			
		||||
    use crate::constants;
 | 
			
		||||
    use crate::energy::consumption_cache::ConsumptionCache;
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn empty_vec() {
 | 
			
		||||
        let cache = ConsumptionCache::new(10);
 | 
			
		||||
        assert_eq!(cache.median_value(), constants::FALLBACK_PRODUCTION_VALUE);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn single_value() {
 | 
			
		||||
        let mut cache = ConsumptionCache::new(10);
 | 
			
		||||
        cache.add_value(-10);
 | 
			
		||||
        assert_eq!(cache.median_value(), -10);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn four_values() {
 | 
			
		||||
        let mut cache = ConsumptionCache::new(10);
 | 
			
		||||
        cache.add_value(50);
 | 
			
		||||
        cache.add_value(-10);
 | 
			
		||||
        cache.add_value(-10);
 | 
			
		||||
        cache.add_value(-10000);
 | 
			
		||||
        assert_eq!(cache.median_value(), -10);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn many_values() {
 | 
			
		||||
        let mut cache = ConsumptionCache::new(6);
 | 
			
		||||
 | 
			
		||||
        for i in 0..1000 {
 | 
			
		||||
            cache.add_value(-i);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        cache.add_value(10);
 | 
			
		||||
        cache.add_value(50);
 | 
			
		||||
        cache.add_value(-10);
 | 
			
		||||
        cache.add_value(-10);
 | 
			
		||||
        cache.add_value(-30);
 | 
			
		||||
        cache.add_value(-10000);
 | 
			
		||||
        assert_eq!(cache.median_value(), -10);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -6,6 +6,7 @@ use crate::devices::device::{
 | 
			
		||||
use crate::devices::devices_list::DevicesList;
 | 
			
		||||
use crate::energy::consumption;
 | 
			
		||||
use crate::energy::consumption::EnergyConsumption;
 | 
			
		||||
use crate::energy::consumption_cache::ConsumptionCache;
 | 
			
		||||
use crate::energy::engine::EnergyEngine;
 | 
			
		||||
use crate::utils::time_utils::time_secs;
 | 
			
		||||
use actix::prelude::*;
 | 
			
		||||
@@ -13,34 +14,55 @@ use openssl::x509::X509Req;
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
 | 
			
		||||
pub struct EnergyActor {
 | 
			
		||||
    curr_consumption: EnergyConsumption,
 | 
			
		||||
    consumption_cache: ConsumptionCache,
 | 
			
		||||
    devices: DevicesList,
 | 
			
		||||
    engine: EnergyEngine,
 | 
			
		||||
    last_engine_refresh: u64,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl EnergyActor {
 | 
			
		||||
    pub async fn new() -> anyhow::Result<Self> {
 | 
			
		||||
        let consumption_cache_size =
 | 
			
		||||
            AppConfig::get().refresh_interval / AppConfig::get().energy_fetch_interval;
 | 
			
		||||
        let curr_consumption = consumption::get_curr_consumption().await?;
 | 
			
		||||
        let mut consumption_cache = ConsumptionCache::new(consumption_cache_size as usize);
 | 
			
		||||
        consumption_cache.add_value(curr_consumption);
 | 
			
		||||
 | 
			
		||||
        if consumption_cache_size < 1 {
 | 
			
		||||
            panic!("Energy fetch interval must be equal or smaller than refresh interval!");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            curr_consumption: consumption::get_curr_consumption().await?,
 | 
			
		||||
            consumption_cache,
 | 
			
		||||
            devices: DevicesList::load()?,
 | 
			
		||||
            engine: EnergyEngine::default(),
 | 
			
		||||
            last_engine_refresh: 0,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn refresh(&mut self) -> anyhow::Result<()> {
 | 
			
		||||
        // Refresh energy
 | 
			
		||||
        self.curr_consumption = consumption::get_curr_consumption()
 | 
			
		||||
            .await
 | 
			
		||||
            .unwrap_or_else(|e| {
 | 
			
		||||
                log::error!(
 | 
			
		||||
        self.consumption_cache
 | 
			
		||||
            .add_value(
 | 
			
		||||
                consumption::get_curr_consumption()
 | 
			
		||||
                    .await
 | 
			
		||||
                    .unwrap_or_else(|e| {
 | 
			
		||||
                        log::error!(
 | 
			
		||||
                    "Failed to fetch latest consumption value, will use fallback value! {e}"
 | 
			
		||||
                );
 | 
			
		||||
                constants::FALLBACK_PRODUCTION_VALUE
 | 
			
		||||
            });
 | 
			
		||||
                        constants::FALLBACK_PRODUCTION_VALUE
 | 
			
		||||
                    }),
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
        if self.last_engine_refresh + AppConfig::get().refresh_interval > time_secs() {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
        self.last_engine_refresh = time_secs();
 | 
			
		||||
 | 
			
		||||
        let devices_list = self.devices.full_list();
 | 
			
		||||
 | 
			
		||||
        self.engine.refresh(self.curr_consumption, &devices_list);
 | 
			
		||||
        self.engine
 | 
			
		||||
            .refresh(self.consumption_cache.median_value(), &devices_list);
 | 
			
		||||
 | 
			
		||||
        self.engine.persist_relays_state(&devices_list)?;
 | 
			
		||||
 | 
			
		||||
@@ -55,7 +77,7 @@ impl Actor for EnergyActor {
 | 
			
		||||
        log::info!("Energy actor successfully started!");
 | 
			
		||||
 | 
			
		||||
        ctx.run_interval(
 | 
			
		||||
            Duration::from_secs(AppConfig::get().refresh_interval),
 | 
			
		||||
            Duration::from_secs(AppConfig::get().energy_fetch_interval),
 | 
			
		||||
            |act, _ctx| {
 | 
			
		||||
                log::info!("Performing energy refresh operation");
 | 
			
		||||
                if let Err(e) = futures::executor::block_on(act.refresh()) {
 | 
			
		||||
@@ -81,7 +103,7 @@ impl Handler<GetCurrConsumption> for EnergyActor {
 | 
			
		||||
    type Result = EnergyConsumption;
 | 
			
		||||
 | 
			
		||||
    fn handle(&mut self, _msg: GetCurrConsumption, _ctx: &mut Context<Self>) -> Self::Result {
 | 
			
		||||
        self.curr_consumption
 | 
			
		||||
        self.consumption_cache.median_value()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,5 @@
 | 
			
		||||
pub mod consumption;
 | 
			
		||||
pub mod consumption_cache;
 | 
			
		||||
pub mod energy_actor;
 | 
			
		||||
pub mod engine;
 | 
			
		||||
pub mod relay_state_history;
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user