436 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			436 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| use crate::app_config::{AppConfig, ConsumptionHistoryType};
 | |
| use crate::constants;
 | |
| use crate::devices::device::{
 | |
|     Device, DeviceGeneralInfo, DeviceId, DeviceInfo, DeviceRelay, DeviceRelayID,
 | |
| };
 | |
| use crate::devices::devices_list::DevicesList;
 | |
| use crate::energy::consumption;
 | |
| use crate::energy::consumption::EnergyConsumption;
 | |
| use crate::energy::consumption_cache::ConsumptionCache;
 | |
| use crate::energy::consumption_history_file::ConsumptionHistoryFile;
 | |
| use crate::energy::engine::EnergyEngine;
 | |
| use crate::utils::time_utils::time_secs;
 | |
| use actix::prelude::*;
 | |
| use openssl::x509::X509Req;
 | |
| use std::time::Duration;
 | |
| 
 | |
| pub struct EnergyActor {
 | |
|     consumption_cache: ConsumptionCache,
 | |
|     devices: DevicesList,
 | |
|     engine: EnergyEngine,
 | |
|     last_engine_refresh: u64,
 | |
| }
 | |
| 
 | |
| impl EnergyActor {
 | |
|     pub async fn new() -> anyhow::Result<Self> {
 | |
|         let consumption_cache_size =
 | |
|             AppConfig::get().refresh_interval / AppConfig::get().energy_fetch_interval;
 | |
|         let curr_consumption = match consumption::get_curr_consumption().await {
 | |
|             Ok(v) => v,
 | |
|             Err(e) => {
 | |
|                 log::warn!("Failed to fetch consumption, using default value! {e}");
 | |
|                 constants::FALLBACK_PRODUCTION_VALUE
 | |
|             }
 | |
|         };
 | |
|         log::info!("Initial consumption value: {curr_consumption}");
 | |
|         let mut consumption_cache = ConsumptionCache::new(consumption_cache_size as usize);
 | |
|         consumption_cache.add_value(curr_consumption);
 | |
| 
 | |
|         if consumption_cache_size < 1 {
 | |
|             panic!("Energy fetch interval must be equal or smaller than refresh interval!");
 | |
|         }
 | |
| 
 | |
|         Ok(Self {
 | |
|             consumption_cache,
 | |
|             devices: DevicesList::load()?,
 | |
|             engine: EnergyEngine::default(),
 | |
|             last_engine_refresh: 0,
 | |
|         })
 | |
|     }
 | |
| 
 | |
|     async fn refresh(&mut self) -> anyhow::Result<()> {
 | |
|         // Refresh energy
 | |
|         let latest_consumption = consumption::get_curr_consumption()
 | |
|             .await
 | |
|             .unwrap_or_else(|e| {
 | |
|                 log::error!(
 | |
|                     "Failed to fetch latest consumption value, will use fallback value! {e}"
 | |
|                 );
 | |
|                 constants::FALLBACK_PRODUCTION_VALUE
 | |
|             });
 | |
|         self.consumption_cache.add_value(latest_consumption);
 | |
| 
 | |
|         let devices_list = self.devices.full_list_ref();
 | |
| 
 | |
|         let mut history =
 | |
|             ConsumptionHistoryFile::open(time_secs(), ConsumptionHistoryType::GridConsumption)?;
 | |
|         history.set_consumption(time_secs(), latest_consumption)?;
 | |
|         history.save()?;
 | |
| 
 | |
|         let mut relays_consumption =
 | |
|             ConsumptionHistoryFile::open(time_secs(), ConsumptionHistoryType::RelayConsumption)?;
 | |
|         relays_consumption.set_consumption(
 | |
|             time_secs(),
 | |
|             self.engine.sum_relays_consumption(&devices_list) as EnergyConsumption,
 | |
|         )?;
 | |
|         relays_consumption.save()?;
 | |
| 
 | |
|         if self.last_engine_refresh + AppConfig::get().refresh_interval > time_secs() {
 | |
|             return Ok(());
 | |
|         }
 | |
|         self.last_engine_refresh = time_secs();
 | |
| 
 | |
|         self.engine
 | |
|             .refresh(self.consumption_cache.median_value(), &devices_list);
 | |
| 
 | |
|         self.engine.persist_relays_state(&devices_list)?;
 | |
| 
 | |
|         Ok(())
 | |
|     }
 | |
| }
 | |
| 
 | |
| impl Actor for EnergyActor {
 | |
|     type Context = Context<Self>;
 | |
| 
 | |
|     fn started(&mut self, ctx: &mut Self::Context) {
 | |
|         log::info!("Energy actor successfully started!");
 | |
| 
 | |
|         ctx.run_interval(
 | |
|             Duration::from_secs(AppConfig::get().energy_fetch_interval),
 | |
|             |act, _ctx| {
 | |
|                 log::info!("Performing energy refresh operation");
 | |
|                 if let Err(e) = futures::executor::block_on(act.refresh()) {
 | |
|                     log::error!("Energy refresh failed! {e}")
 | |
|                 }
 | |
|             },
 | |
|         );
 | |
|     }
 | |
| 
 | |
|     fn stopped(&mut self, _ctx: &mut Self::Context) {
 | |
|         log::info!("Energy actor successfully stopped!");
 | |
|     }
 | |
| }
 | |
| 
 | |
| pub type EnergyActorAddr = Addr<EnergyActor>;
 | |
| 
 | |
| /// Get current consumption
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "EnergyConsumption")]
 | |
| pub struct GetCurrConsumption;
 | |
| 
 | |
| impl Handler<GetCurrConsumption> for EnergyActor {
 | |
|     type Result = EnergyConsumption;
 | |
| 
 | |
|     fn handle(&mut self, _msg: GetCurrConsumption, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.consumption_cache.median_value()
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Get relays consumption
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "usize")]
 | |
| pub struct RelaysConsumption;
 | |
| 
 | |
| impl Handler<RelaysConsumption> for EnergyActor {
 | |
|     type Result = usize;
 | |
| 
 | |
|     fn handle(&mut self, _msg: RelaysConsumption, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.engine
 | |
|             .sum_relays_consumption(&self.devices.full_list_ref())
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Check if device exists
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "bool")]
 | |
| pub struct CheckDeviceExists(pub DeviceId);
 | |
| 
 | |
| impl Handler<CheckDeviceExists> for EnergyActor {
 | |
|     type Result = bool;
 | |
| 
 | |
|     fn handle(&mut self, msg: CheckDeviceExists, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.devices.exists(&msg.0)
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Enroll device
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "anyhow::Result<()>")]
 | |
| pub struct EnrollDevice(pub DeviceId, pub DeviceInfo, pub X509Req);
 | |
| 
 | |
| impl Handler<EnrollDevice> for EnergyActor {
 | |
|     type Result = anyhow::Result<()>;
 | |
| 
 | |
|     fn handle(&mut self, msg: EnrollDevice, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.devices.enroll(&msg.0, &msg.1, &msg.2)
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Validate a device
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "anyhow::Result<()>")]
 | |
| pub struct ValidateDevice(pub DeviceId);
 | |
| 
 | |
| impl Handler<ValidateDevice> for EnergyActor {
 | |
|     type Result = anyhow::Result<()>;
 | |
| 
 | |
|     fn handle(&mut self, msg: ValidateDevice, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         log::info!("Requested to validate device {:?}...", &msg.0);
 | |
|         self.devices.validate(&msg.0)?;
 | |
|         Ok(())
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Update a device general information
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "anyhow::Result<()>")]
 | |
| pub struct UpdateDeviceGeneralInfo(pub DeviceId, pub DeviceGeneralInfo);
 | |
| 
 | |
| impl Handler<UpdateDeviceGeneralInfo> for EnergyActor {
 | |
|     type Result = anyhow::Result<()>;
 | |
| 
 | |
|     fn handle(&mut self, msg: UpdateDeviceGeneralInfo, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         log::info!(
 | |
|             "Requested to update device general info {:?}... {:#?}",
 | |
|             &msg.0,
 | |
|             &msg.1
 | |
|         );
 | |
| 
 | |
|         self.devices.update_general_info(&msg.0, msg.1)?;
 | |
| 
 | |
|         Ok(())
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Set device desired version
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "anyhow::Result<()>")]
 | |
| pub struct SetDesiredVersion(pub DeviceId, pub Option<semver::Version>);
 | |
| 
 | |
| impl Handler<SetDesiredVersion> for EnergyActor {
 | |
|     type Result = anyhow::Result<()>;
 | |
| 
 | |
|     fn handle(&mut self, msg: SetDesiredVersion, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         log::info!(
 | |
|             "Requested to update device desired version {:?} => {:#?}",
 | |
|             &msg.0,
 | |
|             &msg.1
 | |
|         );
 | |
| 
 | |
|         self.devices.set_desired_version(&msg.0, msg.1)?;
 | |
| 
 | |
|         Ok(())
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Delete a device
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "anyhow::Result<()>")]
 | |
| pub struct DeleteDevice(pub DeviceId);
 | |
| 
 | |
| impl Handler<DeleteDevice> for EnergyActor {
 | |
|     type Result = anyhow::Result<()>;
 | |
| 
 | |
|     fn handle(&mut self, msg: DeleteDevice, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         log::info!("Requested to delete device {:?}...", &msg.0);
 | |
| 
 | |
|         let Some(device) = self.devices.get_single(&msg.0) else {
 | |
|             log::warn!("Requested to delete non-existent device!");
 | |
|             return Ok(());
 | |
|         };
 | |
| 
 | |
|         // Delete device relays
 | |
|         for relay in device.relays {
 | |
|             self.devices.relay_delete(relay.id)?;
 | |
|         }
 | |
| 
 | |
|         self.devices.delete(&msg.0)?;
 | |
| 
 | |
|         Ok(())
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Get the list of devices
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "Vec<Device>")]
 | |
| pub struct GetDeviceLists;
 | |
| 
 | |
| impl Handler<GetDeviceLists> for EnergyActor {
 | |
|     type Result = Vec<Device>;
 | |
| 
 | |
|     fn handle(&mut self, _msg: GetDeviceLists, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.devices.full_list()
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Get the information about a single device
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "Option<Device>")]
 | |
| pub struct GetSingleDevice(pub DeviceId);
 | |
| 
 | |
| impl Handler<GetSingleDevice> for EnergyActor {
 | |
|     type Result = Option<Device>;
 | |
| 
 | |
|     fn handle(&mut self, msg: GetSingleDevice, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.devices.get_single(&msg.0)
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Get the full list of relays
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "Vec<DeviceRelay>")]
 | |
| pub struct GetRelaysList;
 | |
| 
 | |
| impl Handler<GetRelaysList> for EnergyActor {
 | |
|     type Result = Vec<DeviceRelay>;
 | |
| 
 | |
|     fn handle(&mut self, _msg: GetRelaysList, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.devices.relays_list()
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Create a new device relay
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "anyhow::Result<()>")]
 | |
| pub struct CreateDeviceRelay(pub DeviceId, pub DeviceRelay);
 | |
| 
 | |
| impl Handler<CreateDeviceRelay> for EnergyActor {
 | |
|     type Result = anyhow::Result<()>;
 | |
| 
 | |
|     fn handle(&mut self, msg: CreateDeviceRelay, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.devices.relay_create(&msg.0, msg.1)
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Get the information about a single relay
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "Option<DeviceRelay>")]
 | |
| pub struct GetSingleRelay(pub DeviceRelayID);
 | |
| 
 | |
| impl Handler<GetSingleRelay> for EnergyActor {
 | |
|     type Result = Option<DeviceRelay>;
 | |
| 
 | |
|     fn handle(&mut self, msg: GetSingleRelay, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.devices.relay_get_single(msg.0)
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Update a device relay
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "anyhow::Result<()>")]
 | |
| pub struct UpdateDeviceRelay(pub DeviceRelay);
 | |
| 
 | |
| impl Handler<UpdateDeviceRelay> for EnergyActor {
 | |
|     type Result = anyhow::Result<()>;
 | |
| 
 | |
|     fn handle(&mut self, msg: UpdateDeviceRelay, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.devices.relay_update(msg.0)
 | |
|     }
 | |
| }
 | |
| 
 | |
| /// Delete a device relay
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "anyhow::Result<()>")]
 | |
| pub struct DeleteDeviceRelay(pub DeviceRelayID);
 | |
| 
 | |
| impl Handler<DeleteDeviceRelay> for EnergyActor {
 | |
|     type Result = anyhow::Result<()>;
 | |
| 
 | |
|     fn handle(&mut self, msg: DeleteDeviceRelay, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.devices.relay_delete(msg.0)
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[derive(serde::Serialize)]
 | |
| pub struct RelaySyncStatus {
 | |
|     enabled: bool,
 | |
| }
 | |
| 
 | |
| /// Synchronize a device
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "anyhow::Result<Vec<RelaySyncStatus>>")]
 | |
| pub struct SynchronizeDevice(pub DeviceId, pub DeviceInfo);
 | |
| 
 | |
| impl Handler<SynchronizeDevice> for EnergyActor {
 | |
|     type Result = anyhow::Result<Vec<RelaySyncStatus>>;
 | |
| 
 | |
|     fn handle(&mut self, msg: SynchronizeDevice, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.devices.synchronise_dev_info(&msg.0, msg.1.clone())?;
 | |
|         self.engine.device_state(&msg.0).record_ping();
 | |
| 
 | |
|         let Some(device) = self.devices.get_single(&msg.0) else {
 | |
|             return Ok(vec![]);
 | |
|         };
 | |
| 
 | |
|         let mut v = vec![];
 | |
|         for d in &device.relays {
 | |
|             v.push(RelaySyncStatus {
 | |
|                 enabled: self.engine.relay_state(d.id).is_on(),
 | |
|             });
 | |
|         }
 | |
|         Ok(v)
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[derive(serde::Serialize)]
 | |
| pub struct ResDevState {
 | |
|     pub id: DeviceId,
 | |
|     last_ping: u64,
 | |
|     online: bool,
 | |
| }
 | |
| 
 | |
| /// Get the state of devices
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "Vec<ResDevState>")]
 | |
| pub struct GetDevicesState;
 | |
| 
 | |
| impl Handler<GetDevicesState> for EnergyActor {
 | |
|     type Result = Vec<ResDevState>;
 | |
| 
 | |
|     fn handle(&mut self, _msg: GetDevicesState, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         self.devices
 | |
|             .full_list()
 | |
|             .into_iter()
 | |
|             .map(|d| {
 | |
|                 let s = self.engine.device_state(&d.id);
 | |
|                 ResDevState {
 | |
|                     id: d.id,
 | |
|                     last_ping: time_secs() - s.last_ping,
 | |
|                     online: s.is_online(),
 | |
|                 }
 | |
|             })
 | |
|             .collect()
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[derive(serde::Serialize)]
 | |
| pub struct ResRelayState {
 | |
|     pub id: DeviceRelayID,
 | |
|     pub on: bool,
 | |
|     pub r#for: usize,
 | |
| }
 | |
| 
 | |
| /// Get the state of all relays
 | |
| #[derive(Message)]
 | |
| #[rtype(result = "Vec<ResRelayState>")]
 | |
| pub struct GetAllRelaysState;
 | |
| 
 | |
| impl Handler<GetAllRelaysState> for EnergyActor {
 | |
|     type Result = Vec<ResRelayState>;
 | |
| 
 | |
|     fn handle(&mut self, _msg: GetAllRelaysState, _ctx: &mut Context<Self>) -> Self::Result {
 | |
|         let mut list = vec![];
 | |
| 
 | |
|         for d in &self.devices.relays_list() {
 | |
|             let state = self.engine.relay_state(d.id);
 | |
|             list.push(ResRelayState {
 | |
|                 id: d.id,
 | |
|                 on: state.is_on(),
 | |
|                 r#for: state.state_for(),
 | |
|             })
 | |
|         }
 | |
| 
 | |
|         list
 | |
|     }
 | |
| }
 |