Implement base operator #1
@ -18,7 +18,5 @@ k8s-openapi = { version = "0.18.0", features = ["v1_26"] } # TODO : switch to v1
|
||||
futures = "0.3.28"
|
||||
thiserror = "1.0.40"
|
||||
rand = "0.8.5"
|
||||
|
||||
[dev-dependencies]
|
||||
mktemp = "0.5.0"
|
||||
reqwest = "0.11.17"
|
@ -7,3 +7,5 @@ pub const SECRET_MINIO_BUCKET_SECRET_KEY: &str = "secretKey";
|
||||
|
||||
pub const SECRET_MINIO_BUCKET_ACCESS_LEN: usize = 20;
|
||||
pub const SECRET_MINIO_BUCKET_SECRET_LEN: usize = 35;
|
||||
|
||||
pub const MC_EXE: &str = "mc";
|
||||
|
@ -45,5 +45,7 @@ pub struct MinioBucketSpec {
|
||||
#[serde(default)]
|
||||
pub versioning: bool,
|
||||
pub quota: Option<usize>,
|
||||
#[serde(default)]
|
||||
pub lock: bool,
|
||||
pub retention: Option<BucketRetention>,
|
||||
}
|
||||
|
@ -2,6 +2,6 @@ pub mod constants;
|
||||
pub mod crd;
|
||||
pub mod minio;
|
||||
#[cfg(test)]
|
||||
pub mod minio_server;
|
||||
pub mod minio_test_server;
|
||||
pub mod secrets;
|
||||
pub mod utils;
|
||||
|
260
src/minio.rs
260
src/minio.rs
@ -1,5 +1,27 @@
|
||||
use crate::constants::{SECRET_MINIO_BUCKET_ACCESS_LEN, SECRET_MINIO_BUCKET_SECRET_LEN};
|
||||
use crate::constants::{MC_EXE, SECRET_MINIO_BUCKET_ACCESS_LEN, SECRET_MINIO_BUCKET_SECRET_LEN};
|
||||
use crate::crd::{MinioBucketSpec, RetentionType};
|
||||
use crate::utils::rand_str;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Deserialize;
|
||||
use std::process::Command;
|
||||
|
||||
const MC_ALIAS_NAME: &str = "managedminioinst";
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum MinioError {
|
||||
#[error("Failed to set 'mc' alias!")]
|
||||
SetMcAlias,
|
||||
#[error("Failed to execute 'mc' command!")]
|
||||
ExecMc,
|
||||
#[error("Failed to execute 'mc mb' command!")]
|
||||
MakeBucketFailed,
|
||||
#[error("Failed to set anonymous access!")]
|
||||
SetAnonymousAcccessFailed,
|
||||
#[error("Failed to set bucket quota!")]
|
||||
SetQuotaFailed,
|
||||
#[error("Failed to set bucket retention!")]
|
||||
SetRetentionFailed,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MinioService {
|
||||
@ -23,6 +45,29 @@ impl MinioUser {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct BucketEntry {
|
||||
pub status: String,
|
||||
key: String,
|
||||
}
|
||||
|
||||
impl BucketEntry {
|
||||
pub fn bucket_name(&self) -> &str {
|
||||
&self.key[0..self.key.len() - 1]
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
struct BasicMinioResult {
|
||||
pub status: String,
|
||||
}
|
||||
|
||||
impl BasicMinioResult {
|
||||
pub fn success(&self) -> bool {
|
||||
self.status == "success"
|
||||
}
|
||||
}
|
||||
|
||||
impl MinioService {
|
||||
/// Check if the Minio Service is ready to respond to our requests
|
||||
pub async fn is_ready(&self) -> bool {
|
||||
@ -43,4 +88,217 @@ impl MinioService {
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
/// Execute a minio mc command
|
||||
async fn exec_mc_cmd<A>(&self, args: &[&str]) -> anyhow::Result<Vec<A>>
|
||||
where
|
||||
A: DeserializeOwned,
|
||||
{
|
||||
log::debug!("exec_mc_cmd with args {:?}", args);
|
||||
|
||||
let conf_dir = mktemp::Temp::new_dir()?;
|
||||
let global_flags = ["--config-dir", conf_dir.to_str().unwrap(), "--json"];
|
||||
|
||||
// First, set our alias to mc in a temporary directory
|
||||
let res = Command::new(MC_EXE)
|
||||
.args(global_flags)
|
||||
.args([
|
||||
"alias",
|
||||
"set",
|
||||
MC_ALIAS_NAME,
|
||||
self.hostname.as_str(),
|
||||
self.access_key.as_str(),
|
||||
self.secret_key.as_str(),
|
||||
])
|
||||
.output()?;
|
||||
if res.status.code() != Some(0) {
|
||||
log::error!(
|
||||
"Failed to configure mc alias! (status code {:?}, stderr={}, stdout={})",
|
||||
res.status,
|
||||
String::from_utf8_lossy(&res.stderr),
|
||||
String::from_utf8_lossy(&res.stdout)
|
||||
);
|
||||
return Err(MinioError::SetMcAlias.into());
|
||||
}
|
||||
|
||||
// Execute requested command
|
||||
let res = Command::new(MC_EXE)
|
||||
.args(global_flags)
|
||||
.args(args)
|
||||
.output()?;
|
||||
|
||||
if res.status.code() != Some(0) {
|
||||
log::error!(
|
||||
"Failed execute command! (status code {:?}, stderr={}, stdout={})",
|
||||
res.status,
|
||||
String::from_utf8_lossy(&res.stderr),
|
||||
String::from_utf8_lossy(&res.stdout)
|
||||
);
|
||||
return Err(MinioError::ExecMc.into());
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&res.stdout);
|
||||
log::debug!(
|
||||
"stdout='{}' stderr='{}'",
|
||||
stdout,
|
||||
String::from_utf8_lossy(&res.stderr)
|
||||
);
|
||||
|
||||
if stdout.is_empty() {
|
||||
log::info!("Command returned no result!");
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let mut out = vec![];
|
||||
for l in stdout.split('\n') {
|
||||
if !l.trim().is_empty() {
|
||||
out.push(serde_json::from_str(l)?);
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Get the list of buckets
|
||||
pub async fn buckets_list(&self) -> anyhow::Result<Vec<BucketEntry>> {
|
||||
self.exec_mc_cmd::<BucketEntry>(&["ls", MC_ALIAS_NAME])
|
||||
.await
|
||||
}
|
||||
|
||||
/// Check if a bucket exists or not
|
||||
pub async fn bucket_exists(&self, name: &str) -> anyhow::Result<bool> {
|
||||
Ok(self
|
||||
.buckets_list()
|
||||
.await?
|
||||
.iter()
|
||||
.any(|b| b.bucket_name().eq(name)))
|
||||
}
|
||||
|
||||
/// Create a bucket
|
||||
pub async fn create_bucket(&self, b: &MinioBucketSpec) -> anyhow::Result<()> {
|
||||
// Set base parameters
|
||||
let bucket_name = format!("{}/{}", MC_ALIAS_NAME, b.name);
|
||||
let mut args = ["mb", bucket_name.as_str()].to_vec();
|
||||
|
||||
if b.versioning {
|
||||
args.push("--with-versioning");
|
||||
}
|
||||
|
||||
if b.lock || b.retention.is_some() {
|
||||
args.push("--with-lock");
|
||||
}
|
||||
|
||||
let res = self.exec_mc_cmd::<BasicMinioResult>(&args).await?;
|
||||
if res.get(0).map(|r| r.success()) != Some(true) {
|
||||
return Err(MinioError::MakeBucketFailed.into());
|
||||
}
|
||||
|
||||
// Enable anonymous access, eg. public hosting
|
||||
if b.anonymous_read_access {
|
||||
let target = format!("{}/*", bucket_name);
|
||||
|
||||
let res = self
|
||||
.exec_mc_cmd::<BasicMinioResult>(&["anonymous", "set", "download", target.as_str()])
|
||||
.await?;
|
||||
|
||||
if res.get(0).map(|r| r.success()) != Some(true) {
|
||||
return Err(MinioError::SetAnonymousAcccessFailed.into());
|
||||
}
|
||||
}
|
||||
|
||||
// Set quota, if requested
|
||||
if let Some(quota) = &b.quota {
|
||||
let quota = format!("{}MB", quota);
|
||||
|
||||
let res = self
|
||||
.exec_mc_cmd::<BasicMinioResult>(&[
|
||||
"quota",
|
||||
"set",
|
||||
bucket_name.as_str(),
|
||||
"--size",
|
||||
quota.as_str(),
|
||||
])
|
||||
.await?;
|
||||
|
||||
if res.get(0).map(|r| r.success()) != Some(true) {
|
||||
return Err(MinioError::SetQuotaFailed.into());
|
||||
}
|
||||
}
|
||||
|
||||
// Set retention, if requested
|
||||
if let Some(retention) = &b.retention {
|
||||
let days = format!("{}d", retention.validity);
|
||||
|
||||
let res = self
|
||||
.exec_mc_cmd::<BasicMinioResult>(&[
|
||||
"retention",
|
||||
"set",
|
||||
"--default",
|
||||
match retention.r#type {
|
||||
RetentionType::Compliance => "compliance",
|
||||
RetentionType::Governance => "governance",
|
||||
},
|
||||
days.as_str(),
|
||||
bucket_name.as_str(),
|
||||
])
|
||||
.await?;
|
||||
|
||||
if res.get(0).map(|r| r.success()) != Some(true) {
|
||||
return Err(MinioError::SetRetentionFailed.into());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::crd::MinioBucketSpec;
|
||||
use crate::minio_test_server::MinioTestServer;
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_buckets_empty_instance() {
|
||||
let srv = MinioTestServer::start().await.unwrap();
|
||||
let buckets = srv.as_service().buckets_list().await.unwrap();
|
||||
assert!(buckets.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bucket_exists_no_bucket() {
|
||||
let srv = MinioTestServer::start().await.unwrap();
|
||||
assert!(!srv.as_service().bucket_exists("mybucket").await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bucket_basic_creation() {
|
||||
let _ = env_logger::builder().is_test(true).try_init();
|
||||
|
||||
let srv = MinioTestServer::start().await.unwrap();
|
||||
let service = srv.as_service();
|
||||
service
|
||||
.create_bucket(&MinioBucketSpec {
|
||||
instance: "".to_string(),
|
||||
name: "mybucket".to_string(),
|
||||
secret: "".to_string(),
|
||||
anonymous_read_access: false,
|
||||
versioning: false,
|
||||
quota: None,
|
||||
lock: false,
|
||||
retention: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(service.bucket_exists("mybucket").await.unwrap());
|
||||
}
|
||||
|
||||
// TODO : with anonymous access
|
||||
// TODO : without anonymous access
|
||||
// TODO : with versioning
|
||||
// TODO : without versioning
|
||||
// TODO : with quota
|
||||
// TODO : without quota
|
||||
// TODO : with lock
|
||||
// TODO : without lock
|
||||
// TODO : with retention
|
||||
// TODO : without retention
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ use rand::RngCore;
|
||||
use std::io::ErrorKind;
|
||||
use std::process::{Child, Command};
|
||||
|
||||
pub struct MinioServer {
|
||||
pub struct MinioTestServer {
|
||||
#[allow(dead_code)]
|
||||
storage_base_dir: mktemp::Temp,
|
||||
child: Child,
|
||||
@ -17,7 +17,7 @@ pub struct MinioServer {
|
||||
pub root_password: String,
|
||||
}
|
||||
|
||||
impl MinioServer {
|
||||
impl MinioTestServer {
|
||||
pub async fn start() -> anyhow::Result<Self> {
|
||||
let storage_dir = mktemp::Temp::new_dir()?;
|
||||
|
||||
@ -84,7 +84,7 @@ impl MinioServer {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MinioServer {
|
||||
impl Drop for MinioTestServer {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = self.child.kill() {
|
||||
log::error!("Failed to kill child server! {}", e);
|
||||
@ -94,13 +94,13 @@ impl Drop for MinioServer {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::minio_server::MinioServer;
|
||||
use crate::minio_test_server::MinioTestServer;
|
||||
|
||||
#[tokio::test]
|
||||
async fn start_minio() {
|
||||
let _ = env_logger::builder().is_test(true).try_init();
|
||||
|
||||
let server = MinioServer::start().await.unwrap();
|
||||
let server = MinioTestServer::start().await.unwrap();
|
||||
let service = server.as_service();
|
||||
println!("{:?}", service);
|
||||
|
@ -49,6 +49,10 @@ spec:
|
||||
type: integer
|
||||
description: Limits the amount of data in the bucket, in Megabytes. By default it is unlimited
|
||||
example: 100
|
||||
lock:
|
||||
description: Object locking prevent objects from being deleted. Will be considered as set to true when retention is defined.
|
||||
type: boolean
|
||||
default: false
|
||||
retention:
|
||||
type: object
|
||||
description: Impose rules to prevent object deletion for a period of time. It requires versioning to be enabled/disabled
|
||||
|
Loading…
Reference in New Issue
Block a user