Skip to content

Commit

Permalink
Refactor logic to be cleaner and remove unnecessary config data
Browse files Browse the repository at this point in the history
  • Loading branch information
EricGhildyal committed Nov 26, 2024
1 parent 4766c25 commit f909c8e
Showing 1 changed file with 74 additions and 117 deletions.
191 changes: 74 additions & 117 deletions src/adapters/ingresses/apig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,42 @@ impl AwsApiGateway {
})
}

pub async fn upload_lambda(&self, lambda_name: &str) -> Result<String> {
/// Helper function to convert an API Gateway's name to its auto-generated AWS ID
pub async fn get_api_id_by_name(&self, api_name: &str) -> Result<RestApi> {
let all_apis = self
.apig_client
.get_rest_apis()
.send()
.await
.into_diagnostic()?;

let api = all_apis
.items()
.iter()
.find(|api| api.name.clone().unwrap() == api_name)
.ok_or(miette!(
"Could not find an API Gateway with the name: {}",
api_name
))?;

Ok(api.clone())
}
}

/// Given a path to a file, load it as an array of bytes.
async fn read_file(artifact_path: PathBuf) -> Result<Vec<u8>> {
let mut bytes = Vec::new();
// Load the lambda from file.
let mut artifact = File::open(artifact_path).await.into_diagnostic()?;
artifact.read_to_end(&mut bytes).await.into_diagnostic()?;
Ok(bytes)
}

#[async_trait]
impl Ingress for AwsApiGateway {
async fn deploy(&mut self) -> Result<()> {
// First, we need to deploy the new version of the lambda

// Parse the bytes into the format AWS wants
let code = Blob::from(self.lambda_artifact.clone());

Expand All @@ -67,47 +102,43 @@ impl AwsApiGateway {
let res = self
.lambda_client
.update_function_code()
.function_name(lambda_name)
.function_name(&self.lambda_name)
.zip_file(zip_file.clone())
.send()
.await
.into_diagnostic()?;

let lambda_arn = res
.function_arn()
.ok_or(miette!("Couldn't get ARN of deployed lambda"))?;

let version = res
.version()
.ok_or(miette!("Couldn't get version of deployed lambda"))?;

Ok(version.to_string())
}
let api = self.get_api_id_by_name(&self.gateway_name).await?;
let api_id = api.id.ok_or(miette!("Couldn't get ID of deployed API"))?;

pub async fn create_apig_deployment(
&self,
api_id: &str,
stage_name: &str,
lambda_name: &str,
lambda_version: &str,
traffic_percentage: WholePercent,
) -> Result<()> {
// Update the APIG with the new lambda version
// Next, we need to create a new deployment, pointing at our
// new lambda version with canary settings
self.apig_client
.put_integration()
.rest_api_id(api_id)
.uri(format!(
"arn:aws:lambda:us-east-2:471112630982:function:{}:{}",
lambda_name, lambda_version
))
.rest_api_id(&api_id)
.uri(format!("{}:{}", lambda_arn, version))
.send()
.await
.into_diagnostic()?;

// Create a deployment with canary settings to deploy our new lambda
self.apig_client
.create_deployment()
.rest_api_id(api_id)
.stage_name(stage_name)
.rest_api_id(&api_id)
.stage_name(&self.stage_name)
.canary_settings(
DeploymentCanarySettings::builder()
.percent_traffic(traffic_percentage.into_inner() as f64)
// This is set to 0 explicitly here since the first step of the pipeline
// is to increase traffic
.percent_traffic(0.0)
.build(),
)
.send()
Expand All @@ -117,45 +148,20 @@ impl AwsApiGateway {
Ok(())
}

pub async fn get_api_id_by_name(&self, api_name: &str) -> Result<RestApi> {
// Given an API Gateway's name, return its auto-generated AWS ID
let all_apis = self
.apig_client
.get_rest_apis()
.send()
.await
.into_diagnostic()?;

let api = all_apis
.items()
.iter()
.find(|api| api.name.clone().unwrap() == api_name)
.ok_or(miette!(
"Could not find an API Gateway with the name: {}",
api_name
))?;

Ok(api.clone())
}

pub async fn update_canary_traffic(
&self,
api_name: &str,
stage_name: &str,
traffic_percentage: WholePercent,
) -> Result<()> {
let api = self.get_api_id_by_name(api_name).await?;
async fn set_canary_traffic(&mut self, percent: WholePercent) -> Result<()> {
let api = self.get_api_id_by_name(&self.gateway_name).await?;
let api_id = api.id.ok_or(miette!("Couldn't get ID of deployed API"))?;

let patch_op = PatchOperation::builder()
.op(Op::Replace)
.path("/canarySettings/percentTraffic")
.value(traffic_percentage.to_string())
.value(percent.to_string())
.build();

self.apig_client
.update_stage()
.rest_api_id(api.id.unwrap_or_default())
.stage_name(stage_name)
.rest_api_id(api_id)
.stage_name(&self.stage_name)
.patch_operations(patch_op)
.send()
.await
Expand All @@ -164,8 +170,9 @@ impl AwsApiGateway {
Ok(())
}

async fn delete_canary(&self, api_name: &str, stage_name: &str) -> Result<()> {
let api = self.get_api_id_by_name(api_name).await?;
async fn rollback_canary(&mut self) -> Result<()> {
let api = self.get_api_id_by_name(&self.gateway_name).await?;
let api_id = api.id.ok_or(miette!("Couldn't get ID of deployed API"))?;

// Updates the stage to delete any canary settings from the API Gateway
let patch_op = PatchOperation::builder()
Expand All @@ -175,8 +182,8 @@ impl AwsApiGateway {

self.apig_client
.update_stage()
.rest_api_id(api.id.unwrap_or_default())
.stage_name(stage_name)
.rest_api_id(api_id)
.stage_name(&self.stage_name)
.patch_operations(patch_op)
.send()
.await
Expand All @@ -185,8 +192,9 @@ impl AwsApiGateway {
Ok(())
}

pub async fn promote_apig_canary(&self, api_name: &str, stage_name: &str) -> Result<()> {
let api = self.get_api_id_by_name(api_name).await?;
async fn promote_canary(&mut self) -> Result<()> {
let api = self.get_api_id_by_name(&self.gateway_name).await?;
let api_id = api.id.ok_or(miette!("Couldn't get ID of deployed API"))?;

// Overwrite the main deployment's ID with the canary's
let replace_deployment_op = PatchOperation::builder()
Expand All @@ -195,75 +203,24 @@ impl AwsApiGateway {
.path("/deploymentId")
.build();

// Reset canary traffic to 0% so we're ready for another release
let reset_traffic_op = PatchOperation::builder()
.op(Op::Replace)
.path("/canarySettings/percentTraffic")
// Note: this must be a string to pass into Value, but it's actually an f64 in AWS
.value("0.0")
// Deletes all canary settings from the API Gateway so we're ready for the next
// canary deployment
let delete_canary_op = PatchOperation::builder()
.op(Op::Remove)
.path("/canarySettings")
.build();

// Send request to update stage
self.apig_client
.update_stage()
.rest_api_id(api.id.unwrap_or_default())
.stage_name(stage_name)
.rest_api_id(api_id)
.stage_name(&self.stage_name)
.patch_operations(replace_deployment_op)
.patch_operations(reset_traffic_op)
.patch_operations(delete_canary_op)
.send()
.await
.into_diagnostic()?;

Ok(())
}
}

/// given a path to a file, load it as an array of bytes.
async fn read_file(artifact_path: PathBuf) -> Result<Vec<u8>> {
let mut bytes = Vec::new();
// Load the lambda from file.
let mut artifact = File::open(artifact_path).await.into_diagnostic()?;
artifact.read_to_end(&mut bytes).await.into_diagnostic()?;
Ok(bytes)
}

#[async_trait]
impl Ingress for AwsApiGateway {
async fn deploy(&mut self) -> Result<()> {
// First, we need to deploy the new version of the lambda
let lambda_version = self.upload_lambda(&self.lambda_name).await?;

// Next, we need to create a new deployment, pointing at our
// new lambda version with canary settings
self.create_apig_deployment(
&self.gateway_name,
&self.stage_name,
&self.lambda_name,
&lambda_version,
WholePercent::try_new(0).into_diagnostic()?,
)
.await?;

Ok(())
}

async fn set_canary_traffic(&mut self, percent: WholePercent) -> Result<()> {
self.update_canary_traffic(&self.gateway_name, &self.stage_name, percent)
.await?;

Ok(())
}

async fn rollback_canary(&mut self) -> Result<()> {
self.delete_canary(&self.gateway_name, &self.stage_name)
.await?;

Ok(())
}

async fn promote_canary(&mut self) -> Result<()> {
self.promote_apig_canary("Releases", "prod").await?;

Ok(())
}
}

0 comments on commit f909c8e

Please sign in to comment.