From d4a81f5fdfa1737867f84fd69d0eec5e99679443 Mon Sep 17 00:00:00 2001 From: Pierre HUBERT Date: Sat, 29 Jun 2024 11:45:39 +0200 Subject: [PATCH] Create energy actor --- central_backend/Cargo.lock | 52 +++++++++++++++ central_backend/Cargo.toml | 3 +- central_backend/src/constants.rs | 7 ++ central_backend/src/energy/energy_actor.rs | 64 +++++++++++++++++++ central_backend/src/energy/mod.rs | 1 + central_backend/src/lib.rs | 1 + central_backend/src/main.rs | 10 ++- central_backend/src/server/custom_error.rs | 6 ++ .../src/server/energy_controller.rs | 14 +++- central_backend/src/server/mod.rs | 12 +++- 10 files changed, 163 insertions(+), 7 deletions(-) create mode 100644 central_backend/src/constants.rs create mode 100644 central_backend/src/energy/energy_actor.rs diff --git a/central_backend/Cargo.lock b/central_backend/Cargo.lock index 29293f0..88a741e 100644 --- a/central_backend/Cargo.lock +++ b/central_backend/Cargo.lock @@ -2,6 +2,31 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "actix" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de7fa236829ba0841304542f7614c42b80fca007455315c45c785ccfa873a85b" +dependencies = [ + "actix-macros", + "actix-rt", + "actix_derive", + "bitflags 2.6.0", + "bytes", + "crossbeam-channel", + "futures-core", + "futures-sink", + "futures-task", + "futures-util", + "log", + "once_cell", + "parking_lot", + "pin-project-lite", + "smallvec", + "tokio", + "tokio-util", +] + [[package]] name = "actix-codec" version = "0.5.2" @@ -205,6 +230,17 @@ dependencies = [ "syn", ] +[[package]] +name = "actix_derive" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c7db3d5a9718568e4cf4a537cfd7070e6e6ff7481510d0237fb529ac850f6d3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "addr2line" version = "0.22.0" @@ -443,6 +479,7 @@ dependencies = [ name = "central_backend" version = "0.1.0" dependencies = [ + "actix", "actix-web", "anyhow", "asn1", @@ -565,6 +602,21 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" diff --git a/central_backend/Cargo.toml b/central_backend/Cargo.toml index 4e6d07b..1d17568 100644 --- a/central_backend/Cargo.toml +++ b/central_backend/Cargo.toml @@ -20,4 +20,5 @@ futures = "0.3.30" serde = { version = "1.0.203", features = ["derive"] } reqwest = "0.12.5" serde_json = "1.0.118" -rand = "0.8.5" \ No newline at end of file +rand = "0.8.5" +actix = "0.13.5" \ No newline at end of file diff --git a/central_backend/src/constants.rs b/central_backend/src/constants.rs new file mode 100644 index 0000000..a50e705 --- /dev/null +++ b/central_backend/src/constants.rs @@ -0,0 +1,7 @@ +use std::time::Duration; + +/// Energy refresh operations interval +pub const ENERGY_REFRESH_INTERVAL: Duration = Duration::from_secs(30); + +/// Fallback value to use if production cannot be fetched +pub const FALLBACK_PRODUCTION_VALUE: i32 = 5000; diff --git a/central_backend/src/energy/energy_actor.rs b/central_backend/src/energy/energy_actor.rs new file mode 100644 index 0000000..593ddf1 --- /dev/null +++ b/central_backend/src/energy/energy_actor.rs @@ -0,0 +1,64 @@ +use crate::constants; +use crate::energy::consumption; +use crate::energy::consumption::EnergyConsumption; +use actix::prelude::*; + +pub struct EnergyActor { + curr_consumption: EnergyConsumption, +} + +impl EnergyActor { + pub async fn new() -> anyhow::Result { + Ok(Self { + curr_consumption: consumption::get_curr_consumption().await?, + }) + } + + async fn refresh(&mut self) -> anyhow::Result<()> { + // Refresh energy + self.curr_consumption = consumption::get_curr_consumption() + .await + .unwrap_or_else(|e| { + log::error!( + "Failed to fetch latest consumption value, will use fallback value! {e}" + ); + constants::FALLBACK_PRODUCTION_VALUE + }); + + Ok(()) + } +} + +impl Actor for EnergyActor { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + log::info!("Energy actor successfully started!"); + + ctx.run_interval(constants::ENERGY_REFRESH_INTERVAL, |act, _ctx| { + log::info!("Performing energy refresh operation"); + if let Err(e) = futures::executor::block_on(act.refresh()) { + log::error!("Energy refresh failed! {e}") + } + }); + } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + log::info!("Energy actor successfully stopped!"); + } +} + +pub type EnergyActorAddr = Addr; + +/// Get current consumption +#[derive(Message)] +#[rtype(result = "EnergyConsumption")] +pub struct GetCurrConsumption; + +impl Handler for EnergyActor { + type Result = EnergyConsumption; + + fn handle(&mut self, _msg: GetCurrConsumption, _ctx: &mut Context) -> Self::Result { + self.curr_consumption + } +} diff --git a/central_backend/src/energy/mod.rs b/central_backend/src/energy/mod.rs index f1682c6..747d93a 100644 --- a/central_backend/src/energy/mod.rs +++ b/central_backend/src/energy/mod.rs @@ -1 +1,2 @@ pub mod consumption; +pub mod energy_actor; diff --git a/central_backend/src/lib.rs b/central_backend/src/lib.rs index dc205ba..c75a6e9 100644 --- a/central_backend/src/lib.rs +++ b/central_backend/src/lib.rs @@ -1,4 +1,5 @@ pub mod app_config; +pub mod constants; pub mod crypto; pub mod energy; pub mod server; diff --git a/central_backend/src/main.rs b/central_backend/src/main.rs index b78e3ea..aa10392 100644 --- a/central_backend/src/main.rs +++ b/central_backend/src/main.rs @@ -1,5 +1,7 @@ +use actix::Actor; use central_backend::app_config::AppConfig; use central_backend::crypto::pki; +use central_backend::energy::energy_actor::EnergyActor; use central_backend::server::{secure_server, unsecure_server}; use central_backend::utils::files_utils::create_directory_if_missing; use futures::future; @@ -22,7 +24,13 @@ async fn main() -> std::io::Result<()> { pki::refresh_crls().expect("Failed to initialize Root CA!"); - let s1 = secure_server(); + // Initialize energy actor + let actor = EnergyActor::new() + .await + .expect("Failed to initialize energy actor!") + .start(); + + let s1 = secure_server(actor); let s2 = unsecure_server(); future::try_join(s1, s2) .await diff --git a/central_backend/src/server/custom_error.rs b/central_backend/src/server/custom_error.rs index 1a97fec..d94ac14 100644 --- a/central_backend/src/server/custom_error.rs +++ b/central_backend/src/server/custom_error.rs @@ -85,6 +85,12 @@ impl From for HttpErr { } } +impl From for HttpErr { + fn from(value: actix::MailboxError) -> Self { + HttpErr::Err(std::io::Error::new(ErrorKind::Other, value.to_string()).into()) + } +} + impl From for HttpErr { fn from(value: HttpResponse) -> Self { HttpErr::HTTPResponse(value) diff --git a/central_backend/src/server/energy_controller.rs b/central_backend/src/server/energy_controller.rs index c7c4252..2ab729e 100644 --- a/central_backend/src/server/energy_controller.rs +++ b/central_backend/src/server/energy_controller.rs @@ -1,9 +1,10 @@ -use crate::energy::consumption; +use crate::energy::{consumption, energy_actor}; use crate::server::custom_error::HttpResult; +use crate::server::WebEnergyActor; use actix_web::HttpResponse; #[derive(serde::Serialize)] -struct CurrConsumption { +struct Consumption { consumption: i32, } @@ -11,5 +12,12 @@ struct CurrConsumption { pub async fn curr_consumption() -> HttpResult { let consumption = consumption::get_curr_consumption().await?; - Ok(HttpResponse::Ok().json(CurrConsumption { consumption })) + Ok(HttpResponse::Ok().json(Consumption { consumption })) +} + +/// Get cached energy consumption +pub async fn cached_consumption(energy_actor: WebEnergyActor) -> HttpResult { + let consumption = energy_actor.send(energy_actor::GetCurrConsumption).await?; + + Ok(HttpResponse::Ok().json(Consumption { consumption })) } diff --git a/central_backend/src/server/mod.rs b/central_backend/src/server/mod.rs index 8cfbadc..022699d 100644 --- a/central_backend/src/server/mod.rs +++ b/central_backend/src/server/mod.rs @@ -4,12 +4,15 @@ use openssl::ssl::{SslAcceptor, SslMethod}; use crate::app_config::AppConfig; use crate::crypto::pki; +use crate::energy::energy_actor::EnergyActorAddr; pub mod custom_error; pub mod energy_controller; pub mod pki_controller; pub mod server_controller; +pub type WebEnergyActor = web::Data; + /// Start unsecure (HTTP) server pub async fn unsecure_server() -> anyhow::Result<()> { log::info!( @@ -31,7 +34,7 @@ pub async fn unsecure_server() -> anyhow::Result<()> { } /// Start secure (HTTPS) server -pub async fn secure_server() -> anyhow::Result<()> { +pub async fn secure_server(energy_actor: EnergyActorAddr) -> anyhow::Result<()> { let web_ca = pki::CertData::load_web_ca()?; let server_cert = pki::CertData::load_server()?; @@ -45,14 +48,19 @@ pub async fn secure_server() -> anyhow::Result<()> { AppConfig::get().listen_address, AppConfig::get().secure_origin() ); - HttpServer::new(|| { + HttpServer::new(move || { App::new() + .app_data(web::Data::new(energy_actor.clone())) .wrap(Logger::default()) .route("/", web::get().to(server_controller::secure_home)) .route( "/api/energy/curr_consumption", web::get().to(energy_controller::curr_consumption), ) + .route( + "/api/energy/cached_consumption", + web::get().to(energy_controller::cached_consumption), + ) }) .bind_openssl(&AppConfig::get().listen_address, builder)? .run()