use crate::app_config::{AppConfig, ConsumptionHistoryType}; use crate::constants; use crate::devices::device::{ Device, DeviceGeneralInfo, DeviceId, DeviceInfo, DeviceRelay, DeviceRelayID, }; use crate::devices::devices_list::DevicesList; use crate::energy::consumption; use crate::energy::consumption::EnergyConsumption; use crate::energy::consumption_cache::ConsumptionCache; use crate::energy::consumption_history_file::ConsumptionHistoryFile; use crate::energy::engine::EnergyEngine; use crate::utils::time_utils::time_secs; use actix::prelude::*; use openssl::x509::X509Req; use std::time::Duration; pub struct EnergyActor { consumption_cache: ConsumptionCache, devices: DevicesList, engine: EnergyEngine, last_engine_refresh: u64, } impl EnergyActor { pub async fn new() -> anyhow::Result { let consumption_cache_size = AppConfig::get().refresh_interval / AppConfig::get().energy_fetch_interval; let curr_consumption = consumption::get_curr_consumption().await?; let mut consumption_cache = ConsumptionCache::new(consumption_cache_size as usize); consumption_cache.add_value(curr_consumption); if consumption_cache_size < 1 { panic!("Energy fetch interval must be equal or smaller than refresh interval!"); } Ok(Self { consumption_cache, devices: DevicesList::load()?, engine: EnergyEngine::default(), last_engine_refresh: 0, }) } async fn refresh(&mut self) -> anyhow::Result<()> { // Refresh energy let latest_consumption = consumption::get_curr_consumption() .await .unwrap_or_else(|e| { log::error!( "Failed to fetch latest consumption value, will use fallback value! {e}" ); constants::FALLBACK_PRODUCTION_VALUE }); self.consumption_cache.add_value(latest_consumption); let devices_list = self.devices.full_list_ref(); let mut history = ConsumptionHistoryFile::open(time_secs(), ConsumptionHistoryType::GridConsumption)?; history.set_consumption(time_secs(), latest_consumption)?; history.save()?; let mut relays_consumption = ConsumptionHistoryFile::open(time_secs(), ConsumptionHistoryType::RelayConsumption)?; relays_consumption.set_consumption( time_secs(), self.engine.sum_relays_consumption(&devices_list) as EnergyConsumption, )?; relays_consumption.save()?; if self.last_engine_refresh + AppConfig::get().refresh_interval > time_secs() { return Ok(()); } self.last_engine_refresh = time_secs(); self.engine .refresh(self.consumption_cache.median_value(), &devices_list); self.engine.persist_relays_state(&devices_list)?; Ok(()) } } impl Actor for EnergyActor { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { log::info!("Energy actor successfully started!"); ctx.run_interval( Duration::from_secs(AppConfig::get().energy_fetch_interval), |act, _ctx| { log::info!("Performing energy refresh operation"); if let Err(e) = futures::executor::block_on(act.refresh()) { log::error!("Energy refresh failed! {e}") } }, ); } fn stopped(&mut self, _ctx: &mut Self::Context) { log::info!("Energy actor successfully stopped!"); } } pub type EnergyActorAddr = Addr; /// Get current consumption #[derive(Message)] #[rtype(result = "EnergyConsumption")] pub struct GetCurrConsumption; impl Handler for EnergyActor { type Result = EnergyConsumption; fn handle(&mut self, _msg: GetCurrConsumption, _ctx: &mut Context) -> Self::Result { self.consumption_cache.median_value() } } /// Get relays consumption #[derive(Message)] #[rtype(result = "usize")] pub struct RelaysConsumption; impl Handler for EnergyActor { type Result = usize; fn handle(&mut self, _msg: RelaysConsumption, _ctx: &mut Context) -> Self::Result { self.engine .sum_relays_consumption(&self.devices.full_list_ref()) } } /// Check if device exists #[derive(Message)] #[rtype(result = "bool")] pub struct CheckDeviceExists(pub DeviceId); impl Handler for EnergyActor { type Result = bool; fn handle(&mut self, msg: CheckDeviceExists, _ctx: &mut Context) -> Self::Result { self.devices.exists(&msg.0) } } /// Enroll device #[derive(Message)] #[rtype(result = "anyhow::Result<()>")] pub struct EnrollDevice(pub DeviceId, pub DeviceInfo, pub X509Req); impl Handler for EnergyActor { type Result = anyhow::Result<()>; fn handle(&mut self, msg: EnrollDevice, _ctx: &mut Context) -> Self::Result { self.devices.enroll(&msg.0, &msg.1, &msg.2) } } /// Validate a device #[derive(Message)] #[rtype(result = "anyhow::Result<()>")] pub struct ValidateDevice(pub DeviceId); impl Handler for EnergyActor { type Result = anyhow::Result<()>; fn handle(&mut self, msg: ValidateDevice, _ctx: &mut Context) -> Self::Result { log::info!("Requested to validate device {:?}...", &msg.0); self.devices.validate(&msg.0)?; Ok(()) } } /// Update a device general information #[derive(Message)] #[rtype(result = "anyhow::Result<()>")] pub struct UpdateDeviceGeneralInfo(pub DeviceId, pub DeviceGeneralInfo); impl Handler for EnergyActor { type Result = anyhow::Result<()>; fn handle(&mut self, msg: UpdateDeviceGeneralInfo, _ctx: &mut Context) -> Self::Result { log::info!( "Requested to update device general info {:?}... {:#?}", &msg.0, &msg.1 ); self.devices.update_general_info(&msg.0, msg.1)?; Ok(()) } } /// Delete a device #[derive(Message)] #[rtype(result = "anyhow::Result<()>")] pub struct DeleteDevice(pub DeviceId); impl Handler for EnergyActor { type Result = anyhow::Result<()>; fn handle(&mut self, msg: DeleteDevice, _ctx: &mut Context) -> Self::Result { log::info!("Requested to delete device {:?}...", &msg.0); let Some(device) = self.devices.get_single(&msg.0) else { log::warn!("Requested to delete non-existent device!"); return Ok(()); }; // Delete device relays for relay in device.relays { self.devices.relay_delete(relay.id)?; } self.devices.delete(&msg.0)?; Ok(()) } } /// Get the list of devices #[derive(Message)] #[rtype(result = "Vec")] pub struct GetDeviceLists; impl Handler for EnergyActor { type Result = Vec; fn handle(&mut self, _msg: GetDeviceLists, _ctx: &mut Context) -> Self::Result { self.devices.full_list() } } /// Get the information about a single device #[derive(Message)] #[rtype(result = "Option")] pub struct GetSingleDevice(pub DeviceId); impl Handler for EnergyActor { type Result = Option; fn handle(&mut self, msg: GetSingleDevice, _ctx: &mut Context) -> Self::Result { self.devices.get_single(&msg.0) } } /// Get the full list of relays #[derive(Message)] #[rtype(result = "Vec")] pub struct GetRelaysList; impl Handler for EnergyActor { type Result = Vec; fn handle(&mut self, _msg: GetRelaysList, _ctx: &mut Context) -> Self::Result { self.devices.relays_list() } } /// Create a new device relay #[derive(Message)] #[rtype(result = "anyhow::Result<()>")] pub struct CreateDeviceRelay(pub DeviceId, pub DeviceRelay); impl Handler for EnergyActor { type Result = anyhow::Result<()>; fn handle(&mut self, msg: CreateDeviceRelay, _ctx: &mut Context) -> Self::Result { self.devices.relay_create(&msg.0, msg.1) } } /// Get the information about a single relay #[derive(Message)] #[rtype(result = "Option")] pub struct GetSingleRelay(pub DeviceRelayID); impl Handler for EnergyActor { type Result = Option; fn handle(&mut self, msg: GetSingleRelay, _ctx: &mut Context) -> Self::Result { self.devices.relay_get_single(msg.0) } } /// Update a device relay #[derive(Message)] #[rtype(result = "anyhow::Result<()>")] pub struct UpdateDeviceRelay(pub DeviceRelay); impl Handler for EnergyActor { type Result = anyhow::Result<()>; fn handle(&mut self, msg: UpdateDeviceRelay, _ctx: &mut Context) -> Self::Result { self.devices.relay_update(msg.0) } } /// Delete a device relay #[derive(Message)] #[rtype(result = "anyhow::Result<()>")] pub struct DeleteDeviceRelay(pub DeviceRelayID); impl Handler for EnergyActor { type Result = anyhow::Result<()>; fn handle(&mut self, msg: DeleteDeviceRelay, _ctx: &mut Context) -> Self::Result { self.devices.relay_delete(msg.0) } } #[derive(serde::Serialize)] pub struct RelaySyncStatus { enabled: bool, } /// Synchronize a device #[derive(Message)] #[rtype(result = "anyhow::Result>")] pub struct SynchronizeDevice(pub DeviceId, pub DeviceInfo); impl Handler for EnergyActor { type Result = anyhow::Result>; fn handle(&mut self, msg: SynchronizeDevice, _ctx: &mut Context) -> Self::Result { self.devices.synchronise_dev_info(&msg.0, msg.1.clone())?; self.engine.device_state(&msg.0).record_ping(); let Some(device) = self.devices.get_single(&msg.0) else { return Ok(vec![]); }; let mut v = vec![]; for d in &device.relays { v.push(RelaySyncStatus { enabled: self.engine.relay_state(d.id).is_on(), }); } Ok(v) } } #[derive(serde::Serialize)] pub struct ResDevState { pub id: DeviceId, last_ping: u64, online: bool, } /// Get the state of devices #[derive(Message)] #[rtype(result = "Vec")] pub struct GetDevicesState; impl Handler for EnergyActor { type Result = Vec; fn handle(&mut self, _msg: GetDevicesState, _ctx: &mut Context) -> Self::Result { self.devices .full_list() .into_iter() .map(|d| { let s = self.engine.device_state(&d.id); ResDevState { id: d.id, last_ping: time_secs() - s.last_ping, online: s.is_online(), } }) .collect() } } #[derive(serde::Serialize)] pub struct ResRelayState { pub id: DeviceRelayID, on: bool, r#for: usize, } /// Get the state of all relays #[derive(Message)] #[rtype(result = "Vec")] pub struct GetAllRelaysState; impl Handler for EnergyActor { type Result = Vec; fn handle(&mut self, _msg: GetAllRelaysState, _ctx: &mut Context) -> Self::Result { let mut list = vec![]; for d in &self.devices.relays_list() { let state = self.engine.relay_state(d.id); list.push(ResRelayState { id: d.id, on: state.is_on(), r#for: state.state_for(), }) } list } }