diff --git a/src/audit.rs b/src/audit.rs index a978f16..4e03d9c 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -3,9 +3,8 @@ use crate::events::init_event_watcher; use crate::manager::Manager; use crate::policy::{load_policies_from_file, PolicyInfo, PolicyStore, PolicyStoreRef}; use crate::util::error::{kube_err, load_err, BridgekeeperError, Result}; -use crate::util::k8s_client::{list_with_retry, patch_status_with_retry}; +use crate::util::k8s::{list_with_retry, patch_status_with_retry, namespaces, find_k8s_resource_matches, gen_target_identifier}; use crate::util::defaults::api_group_or_default; -use crate::util::k8s_util::{namespaces, find_k8s_resource_matches, gen_target_identifier}; use argh::FromArgs; use k8s_openapi::chrono::{DateTime, Utc}; use kube::{ diff --git a/src/evaluator.rs b/src/evaluator.rs index a554280..d486f57 100644 --- a/src/evaluator.rs +++ b/src/evaluator.rs @@ -1,7 +1,7 @@ use crate::{ crd::{Policy, PolicySpec}, events::{EventSender, PolicyEvent, PolicyEventData}, - policy::{PolicyInfo, PolicyStoreRef}, util::k8s_util::find_k8s_resource_matches, + policy::{PolicyInfo, PolicyStoreRef}, util::k8s::find_k8s_resource_matches, }; use kube::{core::{ admission::{self, Operation}, diff --git a/src/util/k8s_util.rs b/src/util/k8s.rs similarity index 71% rename from src/util/k8s_util.rs rename to src/util/k8s.rs index e0f728e..81277a3 100644 --- a/src/util/k8s_util.rs +++ b/src/util/k8s.rs @@ -1,9 +1,68 @@ -use crate::util::error::{kube_err, Result}; -use k8s_openapi::{api::core::v1::Namespace, apimachinery::pkg::apis::meta::v1::{APIGroup, APIResource}}; +use exponential_backoff::Backoff; use kube::{ + api::{Api, ListParams, Patch, PatchParams}, + core::ObjectList, + Resource, Client, core::{ApiResource as KubeApiResource, DynamicObject, GroupVersionKind}, - Client, api::ListParams, Api, Resource }; +use lazy_static::lazy_static; +use serde::{de::DeserializeOwned, Serialize}; +use std::time::Duration; + +use crate::util::error::{kube_err, Result}; +use k8s_openapi::{api::core::v1::Namespace, apimachinery::pkg::apis::meta::v1::{APIGroup, APIResource}}; + +lazy_static! { + static ref BACKOFF: Backoff = + Backoff::new(4, Duration::from_millis(100), Duration::from_secs(2)); +} + +pub async fn list_with_retry(api: &Api, params: ListParams) -> kube::Result> +where + T: DeserializeOwned + Clone + std::fmt::Debug, +{ + for duration in BACKOFF.iter() { + match api.list(¶ms).await { + Ok(result) => return Ok(result), + Err(_err) => tokio::time::sleep(duration).await, + } + } + api.list(¶ms).await +} + +pub async fn patch_status_with_retry< + T: DeserializeOwned + Clone + std::fmt::Debug, + P: serde::Serialize + std::fmt::Debug, +>( + api: &Api, + name: &str, + pp: &PatchParams, + patch: &Patch

, +) -> kube::Result { + for duration in BACKOFF.iter() { + match api.patch_status(name, pp, patch).await { + Ok(result) => return Ok(result), + Err(_err) => tokio::time::sleep(duration).await, + } + } + api.patch_status(name, pp, patch).await +} + +pub async fn apply(api: &Api, name: &str, mut object: T) -> kube::Result +where + ::DynamicType: Default, + T: Clone, + T: Serialize, + T: DeserializeOwned, + T: std::fmt::Debug, +{ + if let Ok(res) = api.get(name).await { + object.meta_mut().resource_version = res.meta().resource_version.clone(); + api.replace(name, &Default::default(), &object).await + } else { + api.create(&Default::default(), &object).await + } +} pub async fn find_k8s_resource_matches( api_group: &str, @@ -130,4 +189,4 @@ pub async fn namespaces(k8s_client: Client) -> Result> { } } Ok(namespaces) -} \ No newline at end of file +} diff --git a/src/util/k8s_client.rs b/src/util/k8s_client.rs deleted file mode 100644 index 34c8720..0000000 --- a/src/util/k8s_client.rs +++ /dev/null @@ -1,61 +0,0 @@ -use exponential_backoff::Backoff; -use kube::{ - api::{Api, ListParams, Patch, PatchParams}, - core::ObjectList, - Resource, -}; -use lazy_static::lazy_static; -use serde::{de::DeserializeOwned, Serialize}; -use std::time::Duration; - -lazy_static! { - static ref BACKOFF: Backoff = - Backoff::new(4, Duration::from_millis(100), Duration::from_secs(2)); -} - -pub async fn list_with_retry(api: &Api, params: ListParams) -> kube::Result> -where - T: DeserializeOwned + Clone + std::fmt::Debug, -{ - for duration in BACKOFF.iter() { - match api.list(¶ms).await { - Ok(result) => return Ok(result), - Err(_err) => tokio::time::sleep(duration).await, - } - } - api.list(¶ms).await -} - -pub async fn patch_status_with_retry< - T: DeserializeOwned + Clone + std::fmt::Debug, - P: serde::Serialize + std::fmt::Debug, ->( - api: &Api, - name: &str, - pp: &PatchParams, - patch: &Patch

, -) -> kube::Result { - for duration in BACKOFF.iter() { - match api.patch_status(name, pp, patch).await { - Ok(result) => return Ok(result), - Err(_err) => tokio::time::sleep(duration).await, - } - } - api.patch_status(name, pp, patch).await -} - -pub async fn apply(api: &Api, name: &str, mut object: T) -> kube::Result -where - ::DynamicType: Default, - T: Clone, - T: Serialize, - T: DeserializeOwned, - T: std::fmt::Debug, -{ - if let Ok(res) = api.get(name).await { - object.meta_mut().resource_version = res.meta().resource_version.clone(); - api.replace(name, &Default::default(), &object).await - } else { - api.create(&Default::default(), &object).await - } -} diff --git a/src/util/mod.rs b/src/util/mod.rs index 272cb56..0c052c3 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,6 +1,5 @@ pub mod cert; pub mod error; -pub mod k8s_client; -pub mod k8s_util; +pub mod k8s; pub mod webhook; pub mod defaults; \ No newline at end of file diff --git a/src/util/webhook.rs b/src/util/webhook.rs index d487183..380d333 100644 --- a/src/util/webhook.rs +++ b/src/util/webhook.rs @@ -1,5 +1,5 @@ use crate::util::error::{kube_err, Result}; -use crate::{constants::*, util::cert::CertKeyPair, util::k8s_client::apply}; +use crate::{constants::*, util::cert::CertKeyPair, util::k8s::apply}; use k8s_openapi::api::admissionregistration::v1::{ MutatingWebhookConfiguration, ValidatingWebhookConfiguration, };