Implement base operator (#1)
Some checks reported errors
continuous-integration/drone/push Build was killed

Add base operator logic

Reviewed-on: #1
This commit is contained in:
2023-05-08 16:20:15 +00:00
parent 87899f57e4
commit e2598d7509
26 changed files with 3955 additions and 89 deletions

11
src/constants.rs Normal file
View File

@ -0,0 +1,11 @@
//! # Application constants
pub const SECRET_MINIO_INSTANCE_ACCESS_KEY: &str = "accessKey";
pub const SECRET_MINIO_INSTANCE_SECRET_KEY: &str = "secretKey";
pub const SECRET_MINIO_BUCKET_ACCESS_KEY: &str = "accessKey";
pub const SECRET_MINIO_BUCKET_SECRET_KEY: &str = "secretKey";
pub const SECRET_MINIO_BUCKET_ACCESS_LEN: usize = 20;
pub const SECRET_MINIO_BUCKET_SECRET_LEN: usize = 35;
pub const MC_EXE: &str = "mc";

51
src/crd.rs Normal file
View File

@ -0,0 +1,51 @@
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Debug, Serialize, Deserialize, Default, Clone, JsonSchema)]
#[kube(
group = "communiquons.org",
version = "v1",
kind = "MinioInstance",
namespaced
)]
pub struct MinioInstanceSpec {
pub endpoint: String,
pub credentials: String,
}
#[derive(Debug, Serialize, Deserialize, Default, Copy, Clone, JsonSchema, PartialEq, Eq)]
pub enum RetentionType {
#[default]
#[serde(rename_all = "lowercase")]
Compliance,
#[serde(rename_all = "lowercase")]
Governance,
}
#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, JsonSchema, PartialEq, Eq)]
pub struct BucketRetention {
pub validity: usize,
pub r#type: RetentionType,
}
#[derive(CustomResource, Debug, Serialize, Deserialize, Default, Clone, JsonSchema)]
#[kube(
group = "communiquons.org",
version = "v1",
kind = "MinioBucket",
namespaced
)]
pub struct MinioBucketSpec {
pub instance: String,
pub name: String,
pub secret: String,
#[serde(default)]
pub anonymous_read_access: bool,
#[serde(default)]
pub versioning: bool,
pub quota: Option<usize>,
#[serde(default)]
pub lock: bool,
pub retention: Option<BucketRetention>,
}

7
src/lib.rs Normal file
View File

@ -0,0 +1,7 @@
pub mod constants;
pub mod crd;
pub mod minio;
#[cfg(test)]
pub mod minio_test_server;
pub mod secrets;
pub mod utils;

109
src/main.rs Normal file
View File

@ -0,0 +1,109 @@
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Secret;
use kube::runtime::{watcher, WatchStreamExt};
use kube::{Api, Client};
use minio_operator::constants::{
SECRET_MINIO_BUCKET_ACCESS_KEY, SECRET_MINIO_BUCKET_SECRET_KEY,
SECRET_MINIO_INSTANCE_ACCESS_KEY, SECRET_MINIO_INSTANCE_SECRET_KEY,
};
use minio_operator::crd::{MinioBucket, MinioInstance};
use minio_operator::minio::{MinioService, MinioUser};
use minio_operator::secrets::{create_secret, read_secret_str};
use std::collections::BTreeMap;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let client = Client::try_default().await?;
let buckets: Api<MinioBucket> = Api::default_namespaced(client.clone());
// Listen for events / buckets creation or update (deletion is not supported)
let wc = watcher::Config::default();
let bw = watcher(buckets, wc).applied_objects();
futures::pin_mut!(bw);
while let Some(b) = bw.try_next().await? {
if let Err(e) = apply_bucket(&b, &client).await {
log::error!(
"Failed to apply desired configuration for applied bucket {} : {}",
b.spec.name,
e
)
}
}
Ok(())
}
/// Make sure a bucket is compliant with a desired configuration
async fn apply_bucket(b: &MinioBucket, client: &Client) -> anyhow::Result<()> {
log::info!("Apply configuration for bucket {}", b.spec.name);
// Get instance information
let instances: Api<MinioInstance> = Api::default_namespaced(client.clone());
let instance = instances.get(&b.spec.instance).await?;
// Get instance configuration
let secrets: Api<Secret> = Api::default_namespaced(client.clone());
let instance_secret = secrets.get(&instance.spec.credentials).await?;
let service = MinioService {
hostname: instance.spec.endpoint,
access_key: read_secret_str(&instance_secret, SECRET_MINIO_INSTANCE_ACCESS_KEY)?,
secret_key: read_secret_str(&instance_secret, SECRET_MINIO_INSTANCE_SECRET_KEY)?,
};
// Get user key & password
let user_secret = match secrets.get_opt(&b.spec.secret).await? {
Some(s) => s,
None => {
log::info!(
"Needs to create the secret {} for the bucket {}",
b.spec.secret,
b.spec.name
);
// The secret needs to be created
let new_user = MinioUser::gen_random();
create_secret(
&secrets,
&b.spec.secret,
BTreeMap::from([
(
SECRET_MINIO_BUCKET_ACCESS_KEY.to_string(),
new_user.username,
),
(
SECRET_MINIO_BUCKET_SECRET_KEY.to_string(),
new_user.password,
),
]),
)
.await?
}
};
let user = MinioUser {
username: read_secret_str(&user_secret, SECRET_MINIO_BUCKET_ACCESS_KEY)?,
password: read_secret_str(&user_secret, SECRET_MINIO_BUCKET_SECRET_KEY)?,
};
log::debug!("Create or update bucket...");
service.bucket_apply(&b.spec).await?;
let policy_name = format!("bucket-{}", b.spec.name);
log::debug!("Create or update policy '{policy_name}'...");
let policy_content =
include_str!("policy_template.json").replace("{{ bucket }}", b.spec.name.as_str());
service.policy_apply(&policy_name, &policy_content).await?;
log::debug!("Create or update user '{}'...", user.username);
service.user_apply(&user).await?;
log::debug!("Attach policy '{policy_name}' to user...");
service.policy_attach_user(&user, &policy_name).await?;
log::debug!("Successfully applied desired configuration!");
Ok(())
}

1138
src/minio.rs Normal file

File diff suppressed because it is too large Load Diff

117
src/minio_test_server.rs Normal file
View File

@ -0,0 +1,117 @@
//! # Minio server controller
//!
//! Used for testing only
use crate::minio::MinioService;
use crate::utils::rand_str;
use rand::RngCore;
use std::io::ErrorKind;
use std::process::{Child, Command};
use std::time::Duration;
pub struct MinioTestServer {
#[allow(dead_code)]
storage_base_dir: mktemp::Temp,
child: Child,
pub api_port: u16,
pub root_user: String,
pub root_password: String,
}
impl MinioTestServer {
pub async fn start() -> anyhow::Result<Self> {
let storage_dir = mktemp::Temp::new_dir()?;
let root_user = rand_str(30);
let root_password = rand_str(30);
let api_port = (2000 + rand::thread_rng().next_u64() % 5000) as u16;
log::info!(
"Spwan a new Minio server on port {} with root credentials {}:{}",
api_port,
root_user,
root_password
);
let child = Command::new("minio")
.current_dir(storage_dir.clone())
.arg("server")
.arg("--address")
.arg(format!(":{api_port}"))
.arg(storage_dir.to_str().unwrap())
.env("MINIO_ROOT_USER", &root_user)
.env("MINIO_ROOT_PASSWORD", &root_password)
.spawn()?;
let instance = Self {
storage_base_dir: storage_dir,
child,
api_port,
root_user,
root_password,
};
// Wait for Minio to become ready
std::thread::sleep(Duration::from_millis(500));
let mut check_count = 0;
loop {
if check_count >= 100 {
log::error!("Minio failed to respond properly in time!");
return Err(std::io::Error::new(
ErrorKind::Other,
"Minio failed to respond in time!",
)
.into());
}
check_count += 1;
std::thread::sleep(Duration::from_millis(100));
if instance.as_service().is_ready().await {
break;
}
}
Ok(instance)
}
pub fn base_url(&self) -> String {
format!("http://127.0.0.1:{}", self.api_port)
}
/// Get a MinioService instance of this temporary server
pub fn as_service(&self) -> MinioService {
MinioService {
hostname: self.base_url(),
access_key: self.root_user.clone(),
secret_key: self.root_password.clone(),
}
}
}
impl Drop for MinioTestServer {
fn drop(&mut self) {
if let Err(e) = self.child.kill() {
log::error!("Failed to kill child server! {}", e);
}
}
}
#[cfg(test)]
mod test {
use crate::minio_test_server::MinioTestServer;
#[tokio::test]
async fn start_minio() {
let _ = env_logger::builder().is_test(true).try_init();
let server = MinioTestServer::start().await.unwrap();
let service = server.as_service();
println!("{:?}", service);
assert!(service.is_ready().await);
// Check if minio properly exit
drop(server);
assert!(!service.is_ready().await);
}
}

17
src/policy_template.json Normal file
View File

@ -0,0 +1,17 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ListObjectsInBucket",
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": ["arn:aws:s3:::{{ bucket }}"]
},
{
"Sid": "AllObjectActions",
"Effect": "Allow",
"Action": ["s3:DeleteObject", "s3:Get*", "s3:PutObject", "s3:*Object"],
"Resource": ["arn:aws:s3:::{{ bucket }}/*"]
}
]
}

64
src/secrets.rs Normal file
View File

@ -0,0 +1,64 @@
use k8s_openapi::api::core::v1::Secret;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::api::PostParams;
use kube::Api;
use std::collections::BTreeMap;
#[derive(thiserror::Error, Debug)]
enum SecretError {
#[error("Secret has no data!")]
MissingData,
#[error("The key '{0}' is not present in the secret!")]
MissingKey(String),
}
/// Attempt to read a value contained in a secret. Returns an error in case
/// of failure
pub fn read_secret_str(s: &Secret, key: &str) -> anyhow::Result<String> {
let data = s.data.as_ref().ok_or(SecretError::MissingData)?;
let value = data
.get(key)
.ok_or(SecretError::MissingKey(key.to_string()))?;
Ok(String::from_utf8(value.0.clone())?)
}
/// Create a secret consisting only of string key / value pairs
pub async fn create_secret(
secrets: &Api<Secret>,
name: &str,
values: BTreeMap<String, String>,
) -> anyhow::Result<Secret> {
Ok(secrets
.create(
&PostParams::default(),
&Secret {
data: None,
immutable: None,
metadata: ObjectMeta {
annotations: None,
creation_timestamp: None,
deletion_grace_period_seconds: None,
deletion_timestamp: None,
finalizers: None,
generate_name: None,
generation: None,
labels: Some(BTreeMap::from([(
"created-by".to_string(),
"miniok8sbuckets".to_string(),
)])),
managed_fields: None,
name: Some(name.to_string()),
namespace: None,
owner_references: None,
resource_version: None,
self_link: None,
uid: None,
},
string_data: Some(values),
type_: None,
},
)
.await?)
}

11
src/utils.rs Normal file
View File

@ -0,0 +1,11 @@
use rand::distributions::Alphanumeric;
use rand::Rng;
/// Generate a random string of a given size
pub fn rand_str(len: usize) -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(len)
.map(char::from)
.collect()
}