From d6637617d42ce3d9e410ba5e33d8be74045fd415 Mon Sep 17 00:00:00 2001 From: Brian Date: Thu, 2 Jan 2025 05:55:06 +0100 Subject: [PATCH] Begin refactoring on worker artifact (#114) Removed a lot of uneccesary clones and duplicate code. --- worker/src/artifact.rs | 1355 ++++++++++++++-------------------------- 1 file changed, 457 insertions(+), 898 deletions(-) diff --git a/worker/src/artifact.rs b/worker/src/artifact.rs index 13e3542b..616100b3 100644 --- a/worker/src/artifact.rs +++ b/worker/src/artifact.rs @@ -15,9 +15,11 @@ use tokio::{ }; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{wrappers::LinesStream, StreamExt}; -use tonic::{Code::NotFound, Request, Response, Status}; +use tonic::{Request, Response, Status}; use tracing::error; -use vorpal_schema::vorpal::artifact::v0::{ArtifactId, ArtifactStepEnvironment}; +use vorpal_schema::vorpal::artifact::v0::{ + Artifact, ArtifactId, ArtifactSourceId, ArtifactStepEnvironment, +}; use vorpal_schema::vorpal::{ artifact::v0::ArtifactSystem, artifact::v0::{ @@ -57,6 +59,12 @@ impl ArtifactServer { } } +fn expand_env(text: &str, envs: &[&ArtifactStepEnvironment]) -> String { + envs.iter().fold(text.to_string(), |acc, e| { + acc.replace(&format!("${}", e.key), &e.value) + }) +} + #[allow(clippy::too_many_arguments)] async fn run_step( artifact_artifacts: Vec, @@ -82,69 +90,67 @@ async fn run_step( return Err(Status::internal("artifact not found")); } + let path_str = path.display().to_string(); + environments.push(ArtifactStepEnvironment { key: format!( "VORPAL_ARTIFACT_{}", artifact.name.to_lowercase().replace('-', "_") ), - value: path.display().to_string(), + value: path_str.clone(), }); - paths.push(path.display().to_string()); + paths.push(path_str); } // Add default environment variables let name_envkey = artifact_name.to_lowercase().replace('-', "_"); - environments.push(ArtifactStepEnvironment { - key: format!("VORPAL_ARTIFACT_{}", name_envkey.clone()), - value: artifact_path.display().to_string(), - }); - - environments.push(ArtifactStepEnvironment { - key: "VORPAL_ARTIFACTS".to_string(), - value: paths.join(" ").to_string(), - }); - - environments.push(ArtifactStepEnvironment { - key: "VORPAL_OUTPUT".to_string(), - value: artifact_path.display().to_string(), - }); - - environments.push(ArtifactStepEnvironment { - key: "VORPAL_WORKSPACE".to_string(), - value: workspace_path.display().to_string(), - }); + environments.extend([ + ArtifactStepEnvironment { + key: format!("VORPAL_ARTIFACT_{}", name_envkey.clone()), + value: artifact_path.display().to_string(), + }, + ArtifactStepEnvironment { + key: "VORPAL_ARTIFACTS".to_string(), + value: paths.join(" ").to_string(), + }, + ArtifactStepEnvironment { + key: "VORPAL_OUTPUT".to_string(), + value: artifact_path.display().to_string(), + }, + ArtifactStepEnvironment { + key: "VORPAL_WORKSPACE".to_string(), + value: workspace_path.display().to_string(), + }, + ]); // Add all custom environment variables - for environment in step_environments.clone() { - environments.push(environment); - } + environments.extend(step_environments); // Sort environment variables by key length - let mut environments_sorted = environments.to_vec(); + let mut environments_sorted = environments; environments_sorted.sort_by(|a, b| b.key.len().cmp(&a.key.len())); + let vorpal_envs: Vec<_> = environments_sorted + .iter() + .filter(|e| e.key.starts_with("VORPAL_")) + .collect(); + // Setup script let mut script_path = None; - if let Some(script) = &step_script { - let mut script = script.clone(); - - for e in environments_sorted.clone() { - if e.key.starts_with("VORPAL_") { - script = script.replace(&format!("${}", e.key), &e.value); - } - } + if let Some(script) = step_script { + let script = expand_env(&script, &vorpal_envs); let path = workspace_path.join("script.sh"); - write(&path, script.clone()) + write(&path, script) .await .map_err(|err| Status::internal(format!("failed to write script: {:?}", err)))?; @@ -159,17 +165,13 @@ async fn run_step( // Setup entrypoint - let entrypoint = match step_entrypoint.clone() { - Some(entrypoint) => entrypoint, - None => match script_path { - Some(ref path) => path.display().to_string(), - None => return Err(Status::invalid_argument("entrypoint is missing")), - }, - }; + let entrypoint = step_entrypoint + .or_else(|| script_path.as_ref().map(|path| path.display().to_string())) + .ok_or_else(|| Status::invalid_argument("entrypoint is missing"))?; // Setup command - let mut command = Command::new(entrypoint.clone()); + let mut command = Command::new(&entrypoint); // Setup working directory @@ -177,30 +179,16 @@ async fn run_step( // Setup environment variables - for env in environments_sorted.clone() { - let mut env_value = env.value.clone(); - - for e in environments_sorted.clone() { - if e.key.starts_with("VORPAL_") { - env_value = env_value.replace(&format!("${}", e.key), &e.value); - } - } - - command.env(env.key, env_value); + for env in environments_sorted.iter() { + let env_value = expand_env(&env.value, &vorpal_envs); + command.env(&env.key, env_value); } // Setup arguments if !entrypoint.is_empty() { for arg in step_arguments.iter() { - let mut arg = arg.clone(); - - for e in environments_sorted.clone() { - if e.key.starts_with("VORPAL_") { - arg = arg.replace(&format!("${}", e.key), &e.value); - } - } - + let arg = expand_env(arg, &vorpal_envs); command.arg(arg); } @@ -217,8 +205,15 @@ async fn run_step( .spawn() .map_err(|err| Status::internal(format!("failed to spawn sandbox: {:?}", err)))?; - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); + let stdout = child + .stdout + .take() + .ok_or_else(|| Status::internal("Failed to capture stdout from the spawned sandbox"))?; + + let stderr = child + .stderr + .take() + .ok_or_else(|| Status::internal("Failed to capture stderr from the spawned sandbox"))?; let stdout = LinesStream::new(BufReader::new(stdout).lines()); let stderr = LinesStream::new(BufReader::new(stderr).lines()); @@ -246,6 +241,25 @@ async fn run_step( Ok(()) } +/// Sends a response to the client and logs errors if any. +async fn send_build_response( + tx: &Sender>, + response: Result, +) -> Result<(), Status> { + tx.send(response).await.map_err(|err| { + error!("Failed to send response: {:?}", err); + Status::internal("failed to send response") + }) +} + +/// Writes a message to the client stream and propagates errors. +async fn send_message( + tx: &Sender>, + message: String, +) -> Result<(), Status> { + send_build_response(tx, Ok(ArtifactBuildResponse { output: message })).await +} + #[tonic::async_trait] impl ArtifactService for ArtifactServer { type BuildStream = ReceiverStream>; @@ -259,905 +273,450 @@ impl ArtifactService for ArtifactServer { let registry = self.registry.clone(); tokio::spawn(async move { - let request = request.into_inner(); + if let Err(err) = handle_build(request.into_inner(), registry, tx.clone()).await { + if let Err(err) = send_build_response(&tx, Err(err)).await { + error!("Failed to send response: {:?}", err); + } + } + }); - let artifact = request.artifact.clone(); + Ok(Response::new(ReceiverStream::new(rx))) + } +} - if artifact.is_none() { - if let Err(err) = tx - .send(Err(Status::invalid_argument("artifact is missing"))) - .await - { - error!("failed to send error: {:?}", err); - } +async fn handle_build( + request: ArtifactBuildRequest, + registry: String, + tx: Sender>, +) -> Result<(), Status> { + let artifact = &request + .artifact + .as_ref() + .ok_or_else(|| Status::invalid_argument("artifact is missing"))?; - return; - } + if artifact.name.is_empty() { + return Err(Status::invalid_argument("name is missing")); + } - let artifact = artifact.unwrap(); + if artifact.steps.is_empty() { + return Err(Status::invalid_argument("steps are missing")); + } - if artifact.name.is_empty() { - if let Err(err) = tx - .send(Err(Status::invalid_argument("name is missing"))) - .await - { - error!("failed to send error: {:?}", err); - } + let manifest_json = serde_json::to_string(&request) + .map_err(|err| Status::internal(format!("failed to serialize manifest: {:?}", err)))?; - return; - } + let request_system = ArtifactSystem::try_from(request.system).unwrap_or(UnknownSystem); - if artifact.steps.is_empty() { - if let Err(err) = tx - .send(Err(Status::invalid_argument("steps are missing"))) - .await - { - error!("failed to send error: {:?}", err); - } + if request_system == UnknownSystem { + return Err(Status::invalid_argument("unknown target")); + } - return; - } + let worker_system = format!("{}-{}", ARCH, OS); - let manifest_json = match serde_json::to_string(&request) { - Ok(json) => json, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to serialize manifest: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - let request_system = match ArtifactSystem::try_from(request.system) { - Ok(target) => target, - Err(_) => UnknownSystem, - }; - - if request_system == UnknownSystem { - if let Err(err) = tx - .send(Err(Status::invalid_argument("unknown target"))) - .await - { - error!("failed to send error: {:?}", err); - } + let worker_target = get_artifact_system::(&worker_system); - return; - } + if request_system != worker_target { + return Err(Status::invalid_argument("target mismatch")); + } - let worker_system = format!("{}-{}", ARCH, OS); + let manifest_hash = digest(manifest_json.as_bytes()); - let worker_target = get_artifact_system::(worker_system.as_str()); + // Check if artifact is locked - if request_system != worker_target { - if let Err(err) = tx - .send(Err(Status::invalid_argument("target mismatch"))) - .await - { - error!("failed to send error: {:?}", err); - } + let lock_path = get_artifact_lock_path(&manifest_hash, &artifact.name); - return; - } + if lock_path.exists() { + return Err(Status::already_exists("artifact is locked")); + } - let manifest_hash = digest(manifest_json.as_bytes()); + // If artifact exists, return - // Check if artifact is locked + let artifact_path = get_artifact_path(&manifest_hash, &artifact.name); - let lock_path = get_artifact_lock_path(&manifest_hash, &artifact.name); + if artifact_path.exists() { + return Err(Status::already_exists("artifact exists")); + } - if lock_path.exists() { - if let Err(err) = tx - .send(Err(Status::already_exists("artifact is locked"))) - .await - { - error!("failed to send error: {:?}", err); - } + // Create lock file - return; - } + if let Err(err) = write(&lock_path, "").await { + return Err(Status::internal(format!( + "failed to create lock file: {:?}", + err + ))); + } - // If artifact exists, return + if let Err(err) = create_dir_all(&artifact_path).await { + return Err(Status::internal(format!( + "failed to create artifact path: {:?}", + err + ))); + } - let artifact_path = get_artifact_path(&manifest_hash, &artifact.name); + // Create workspace - if artifact_path.exists() { - if let Err(err) = tx - .send(Err(Status::already_exists("artifact exists"))) - .await - { - error!("failed to send error: {:?}", err); - } + let workspace_path = create_sandbox_dir() + .await + .map_err(|err| Status::internal(format!("failed to create workspace: {:?}", err)))?; - return; - } + // let workspace_path_canonical = workspace_path + // .canonicalize() + // .map_err(|err| Status::internal(format!("failed to canonicalize workspace: {:?}", err)))?; - // Create lock file - - if let Err(err) = write(&lock_path, "").await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to create lock file: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } + // Connect to registry - return; - } + let mut registry_client = RegistryServiceClient::connect(registry) + .await + .map_err(|err| Status::internal(format!("failed to connect to registry: {:?}", err)))?; + + // Pull any source archives + + pull_source_archives(artifact, &workspace_path, &mut registry_client, &tx).await?; + + // Run artifact steps + + for step in artifact.steps.iter() { + if let Err(err) = run_step( + artifact.artifacts.clone(), + artifact.name.clone(), + &artifact_path, + step.arguments.clone(), + step.entrypoint.clone(), + step.environments.clone(), + step.script.clone(), + &tx, + &workspace_path, + ) + .await + { + return Err(Status::internal(format!("failed to run step: {:?}", err))); + } + } - if let Err(err) = create_dir_all(&artifact_path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to create artifact path: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } + let artifact_path_files = get_file_paths(&artifact_path, vec![], vec![]) + .map_err(|err| Status::internal(format!("failed to get output files: {:?}", err)))?; - return; - } + if artifact_path_files.is_empty() || artifact_path_files.len() == 1 { + return Err(Status::internal("no output files found")); + } - // Create workspace - - let workspace_path = match create_sandbox_dir().await { - Ok(path) => path, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to create workspace: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - // let workspace_path_canonical = match workspace_path.canonicalize() { - // Ok(path) => path, - // Err(err) => { - // if let Err(err) = tx - // .send(Err(Status::internal(format!( - // "failed to canonicalize workspace: {:?}", - // err - // )))) - // .await - // { - // error!("failed to send error: {:?}", err); - // } - // - // return; - // } - // }; - - // Connect to registry - - let mut registry_client = match RegistryServiceClient::connect(registry).await { - Ok(client) => client, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to connect to registry: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; + // Create artifact tar from build output files - // Pull any source archives + send_message(&tx, format!("packing: {}", manifest_hash)).await?; - let workspace_source_dir_path = workspace_path.join("source"); + let artifact_archive_path = create_sandbox_file(Some("tar.zst")) + .await + .map_err(|err| Status::internal(format!("failed to create artifact archive: {:?}", err)))?; + + if let Err(err) = + compress_zstd(&artifact_path, &artifact_path_files, &artifact_archive_path).await + { + return Err(Status::internal(format!( + "failed to compress artifact: {:?}", + err + ))); + } - if let Err(err) = create_dir_all(&workspace_source_dir_path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to create source path: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } + // upload artifact to registry - return; - } + send_message(&tx, format!("pushing: {}", manifest_hash)).await?; - for source in artifact.sources.iter() { - let workspace_source_path = workspace_source_dir_path.join(&source.name); - - if let Err(err) = create_dir_all(&workspace_source_path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to create source path: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } + let artifact_data = read(&artifact_archive_path) + .await + .map_err(|err| Status::internal(format!("failed to read artifact archive: {:?}", err)))?; - let source_cache_path = get_cache_path(&source.hash, &source.name); - - if source_cache_path.exists() { - let source_cache_files = - match get_file_paths(&source_cache_path, vec![], vec![]) { - Ok(files) => files, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to get source files: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - if let Err(err) = tx - .send(Ok(ArtifactBuildResponse { - output: format!("copying source: {}-{}", source.name, source.hash), - })) - .await - { - error!("failed to send error: {:?}", err); - - return; - } - - let workspace_source_files = match copy_files( - &source_cache_path, - source_cache_files.clone(), - &workspace_source_path, - ) - .await - { - Ok(files) => files, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to copy source files: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - for path in workspace_source_files.iter() { - if let Err(err) = set_timestamps(path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to sanitize output files: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - } - - continue; - } + let private_key_path = get_private_key_path(); - let source_archive_path = get_source_archive_path(&source.hash, &source.name); - - if source_archive_path.exists() { - if let Err(err) = tx - .send(Ok(ArtifactBuildResponse { - output: format!("caching source: {}-{}", source.name, source.hash), - })) - .await - { - error!("failed to send error: {:?}", err); - - return; - } - - if let Err(err) = create_dir_all(&source_cache_path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to create source path: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - - if let Err(err) = unpack_zstd(&source_cache_path, &source_archive_path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to unpack source archive: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - - let source_cache_files = - match get_file_paths(&source_cache_path, vec![], vec![]) { - Ok(files) => files, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to get source files: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - if let Err(err) = tx - .send(Ok(ArtifactBuildResponse { - output: format!("copying source: {}-{}", source.name, source.hash), - })) - .await - { - error!("failed to send error: {:?}", err); - - return; - } - - let workspace_source_files = match copy_files( - &source_cache_path, - source_cache_files.clone(), - &workspace_source_path, - ) - .await - { - Ok(files) => files, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to copy source files: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - for path in workspace_source_files.iter() { - if let Err(err) = set_timestamps(path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to sanitize output files: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - } - - continue; - } + if !private_key_path.exists() { + return Err(Status::internal("private key not found")); + } - if let Err(err) = tx - .send(Ok(ArtifactBuildResponse { - output: format!("pulling source: {}-{}", source.name, source.hash), - })) - .await - { - error!("failed to send error: {:?}", err); + let source_signature = vorpal_notary::sign(private_key_path, &artifact_data) + .await + .map_err(|err| Status::internal(format!("failed to sign artifact: {:?}", err)))?; - return; - } + let mut request_stream = vec![]; - let pull_request = RegistryRequest { - hash: source.hash.clone(), - name: source.name.clone(), - kind: RegistryKind::ArtifactSource as i32, - }; - - match registry_client.pull(pull_request.clone()).await { - Ok(response) => { - let mut response = response.into_inner(); - let mut response_data = Vec::new(); - - while let Ok(message) = response.message().await { - if message.is_none() { - break; - } - - if let Some(res) = message { - if !res.data.is_empty() { - response_data.extend_from_slice(&res.data); - } - } - } - - if !response_data.is_empty() { - let mut source_archive = match File::create(&source_archive_path).await - { - Ok(file) => file, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to create source archive: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - if let Err(err) = source_archive.write_all(&response_data).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to write source archive: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - } - - if let Err(err) = set_timestamps(&source_archive_path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to set source archive timestamps: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - - if let Err(err) = tx - .send(Ok(ArtifactBuildResponse { - output: format!( - "caching source: {}-{}", - source.name, source.hash - ), - })) - .await - { - error!("failed to send error: {:?}", err); - - return; - } - - if let Err(err) = create_dir_all(&source_cache_path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to create source path: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - - if let Err(err) = - unpack_zstd(&source_cache_path, &source_archive_path).await - { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to unpack source archive: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - - let source_cache_files = - match get_file_paths(&source_cache_path, vec![], vec![]) { - Ok(files) => files, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to get source files: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - if let Err(err) = tx - .send(Ok(ArtifactBuildResponse { - output: format!( - "copying source: {}-{}", - source.name, source.hash - ), - })) - .await - { - error!("failed to send error: {:?}", err); - - return; - } - - let workspace_source_files = match copy_files( - &source_cache_path, - source_cache_files.clone(), - &workspace_source_path, - ) - .await - { - Ok(files) => files, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to copy source files: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - for path in workspace_source_files.iter() { - if let Err(err) = set_timestamps(path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to sanitize output files: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - } - } - } - - Err(status) => { - if status.code() != NotFound { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to pull source archive: {:?}", - status - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - } - } - } + for chunk in artifact_data.chunks(DEFAULT_CHUNKS_SIZE) { + request_stream.push(RegistryPushRequest { + data: chunk.to_vec(), + data_signature: source_signature.clone().to_vec(), + hash: manifest_hash.clone(), + kind: RegistryKind::Artifact as i32, + name: artifact.name.clone(), + }); + } - // Run artifact steps - - for step in artifact.steps.iter() { - if let Err(err) = run_step( - artifact.artifacts.clone(), - artifact.name.clone(), - &artifact_path, - step.arguments.clone(), - step.entrypoint.clone(), - step.environments.clone(), - step.script.clone(), - &tx, - &workspace_path, - ) - .await - { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to run step: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - } + if let Err(err) = registry_client + .push(tokio_stream::iter(request_stream)) + .await + { + return Err(Status::internal(format!( + "failed to push artifact: {:?}", + err + ))); + } - let artifact_path_files = match get_file_paths(&artifact_path, vec![], vec![]) { - Ok(files) => files, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to get output files: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - if artifact_path_files.is_empty() || artifact_path_files.len() == 1 { - if let Err(err) = tx - .send(Err(Status::internal("no output files found"))) - .await - { - error!("failed to send error: {:?}", err); - } + // sanitize output files - return; - } + for path in artifact_path_files.iter() { + if let Err(err) = set_timestamps(path).await { + return Err(Status::internal(format!( + "failed to sanitize output files: {:?}", + err + ))); + } + } - // Create artifact tar from build output files + // Remove artifact archive - if let Err(err) = tx - .send(Ok(ArtifactBuildResponse { - output: format!("packing: {}", manifest_hash), - })) - .await - { - error!("failed to send error: {:?}", err); + if let Err(err) = remove_file(&artifact_archive_path).await { + return Err(Status::internal(format!( + "failed to remove artifact archive: {:?}", + err + ))); + } - return; - } + // Remove workspace - let artifact_archive_path = match create_sandbox_file(Some("tar.zst")).await { - Ok(path) => path, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to create artifact archive: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - if let Err(err) = - compress_zstd(&artifact_path, &artifact_path_files, &artifact_archive_path).await - { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to compress artifact: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } + if let Err(err) = remove_dir_all(workspace_path).await { + return Err(Status::internal(format!( + "failed to remove workspace: {:?}", + err + ))); + } - return; - } + // Remove lock file - // upload artifact to registry + if let Err(err) = remove_file(&lock_path).await { + return Err(Status::internal(format!( + "failed to remove lock file: {:?}", + err + ))); + } - if let Err(err) = tx - .send(Ok(ArtifactBuildResponse { - output: format!("pushing: {}", manifest_hash), - })) - .await - { - error!("failed to send error: {:?}", err); + Ok(()) +} - return; - } +async fn pull_source_archives( + artifact: &Artifact, + workspace_path: &Path, + registry_client: &mut RegistryServiceClient, + tx: &Sender>, +) -> Result<(), Status> { + let workspace_source_dir_path = workspace_path.join("source"); - let artifact_data = match read(&artifact_archive_path).await { - Ok(data) => data, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to read artifact archive: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; + if let Err(err) = create_dir_all(&workspace_source_dir_path).await { + return Err(Status::internal(format!( + "failed to create source path: {:?}", + err + ))); + } - let private_key_path = get_private_key_path(); + for source in artifact.sources.iter() { + handle_source(source, &workspace_source_dir_path, registry_client, tx).await?; + } - if !private_key_path.exists() { - if let Err(err) = tx - .send(Err(Status::internal("private key not found"))) - .await - { - error!("failed to send error: {:?}", err); - } + Ok(()) +} - return; - } +async fn handle_source( + source: &ArtifactSourceId, + workspace_source_dir_path: &Path, + registry_client: &mut RegistryServiceClient, + tx: &Sender>, +) -> Result<(), Status> { + let workspace_source_path = workspace_source_dir_path.join(&source.name); - let source_signature = match vorpal_notary::sign(private_key_path, &artifact_data).await - { - Ok(signature) => signature, - Err(err) => { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to sign artifact: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } - }; - - let mut request_stream = vec![]; - - for chunk in artifact_data.chunks(DEFAULT_CHUNKS_SIZE) { - request_stream.push(RegistryPushRequest { - data: chunk.to_vec(), - data_signature: source_signature.clone().to_vec(), - hash: manifest_hash.to_string(), - kind: RegistryKind::Artifact as i32, - name: artifact.name.clone(), - }); - } + if let Err(err) = create_dir_all(&workspace_source_path).await { + return Err(Status::internal(format!( + "failed to create source path: {:?}", + err + ))); + } - if let Err(err) = registry_client - .push(tokio_stream::iter(request_stream)) - .await - { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to push artifact: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } + let source_cache_path = get_cache_path(&source.hash, &source.name); - return; - } + if source_cache_path.exists() { + let source_cache_files = get_file_paths(&source_cache_path, vec![], vec![]) + .map_err(|err| Status::internal(format!("failed to get source files: {:?}", err)))?; - // sanitize output files - - for path in artifact_path_files.iter() { - if let Err(err) = set_timestamps(path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to sanitize output files: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } - - return; - } + send_message( + tx, + format!("copying source: {}-{}", source.name, source.hash), + ) + .await?; + + let workspace_source_files = copy_files( + &source_cache_path, + source_cache_files.clone(), + &workspace_source_path, + ) + .await + .map_err(|err| Status::internal(format!("failed to copy source files: {:?}", err)))?; + + for path in workspace_source_files.iter() { + if let Err(err) = set_timestamps(path).await { + return Err(Status::internal(format!( + "failed to sanitize output files: {:?}", + err + ))); } + } - // Remove artifact archive - - if let Err(err) = remove_file(&artifact_archive_path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to remove artifact archive: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } + return Ok(()); + } - return; - } + let source_archive_path = get_source_archive_path(&source.hash, &source.name); - // Remove workspace - - if let Err(err) = remove_dir_all(workspace_path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to remove workspace: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } + if source_archive_path.exists() { + send_message( + tx, + format!("caching source: {}-{}", source.name, source.hash), + ) + .await?; + + if let Err(err) = create_dir_all(&source_cache_path).await { + return Err(Status::internal(format!( + "failed to create source path: {:?}", + err + ))); + } + + if let Err(err) = unpack_zstd(&source_cache_path, &source_archive_path).await { + return Err(Status::internal(format!( + "failed to unpack source archive: {:?}", + err + ))); + } + + let source_cache_files = get_file_paths(&source_cache_path, vec![], vec![]) + .map_err(|err| Status::internal(format!("failed to get source files: {:?}", err)))?; - return; + send_message( + tx, + format!("copying source: {}-{}", source.name, source.hash), + ) + .await?; + + let workspace_source_files = copy_files( + &source_cache_path, + source_cache_files, + &workspace_source_path, + ) + .await + .map_err(|err| Status::internal(format!("failed to copy source files: {:?}", err)))?; + + for path in workspace_source_files.iter() { + if let Err(err) = set_timestamps(path).await { + return Err(Status::internal(format!( + "failed to sanitize output files: {:?}", + err + ))); } + } - // Remove lock file - - if let Err(err) = remove_file(&lock_path).await { - if let Err(err) = tx - .send(Err(Status::internal(format!( - "failed to remove lock file: {:?}", - err - )))) - .await - { - error!("failed to send error: {:?}", err); - } + return Ok(()); + } + + send_message( + tx, + format!("pulling source: {}-{}", source.name, source.hash), + ) + .await?; + + let pull_request = RegistryRequest { + hash: source.hash.clone(), + name: source.name.clone(), + kind: RegistryKind::ArtifactSource as i32, + }; + + let response = registry_client.pull(pull_request).await.map_err(|status| { + Status::internal(format!("failed to pull source archive: {:?}", status)) + })?; + + let mut response = response.into_inner(); + let mut response_data = Vec::new(); + + while let Ok(message) = response.message().await { + if message.is_none() { + break; + } + + if let Some(res) = message { + if !res.data.is_empty() { + response_data.extend(res.data); } - }); + } + } - Ok(Response::new(ReceiverStream::new(rx))) + if response_data.is_empty() { + return Ok(()); } + + let mut source_archive = File::create(&source_archive_path) + .await + .map_err(|err| Status::internal(format!("failed to create source archive: {:?}", err)))?; + + if let Err(err) = source_archive.write_all(&response_data).await { + return Err(Status::internal(format!( + "failed to write source archive: {:?}", + err + ))); + } + + if let Err(err) = set_timestamps(&source_archive_path).await { + return Err(Status::internal(format!( + "failed to set source archive timestamps: {:?}", + err + ))); + } + + send_message( + tx, + format!("caching source: {}-{}", source.name, source.hash), + ) + .await?; + + if let Err(err) = create_dir_all(&source_cache_path).await { + return Err(Status::internal(format!( + "failed to create source path: {:?}", + err + ))); + } + + if let Err(err) = unpack_zstd(&source_cache_path, &source_archive_path).await { + return Err(Status::internal(format!( + "failed to unpack source archive: {:?}", + err + ))); + } + + let source_cache_files = get_file_paths(&source_cache_path, vec![], vec![]) + .map_err(|err| Status::internal(format!("failed to get source files: {:?}", err)))?; + + send_message( + tx, + format!("copying source: {}-{}", source.name, source.hash), + ) + .await?; + + let workspace_source_files = copy_files( + &source_cache_path, + source_cache_files.clone(), + &workspace_source_path, + ) + .await + .map_err(|err| Status::internal(format!("failed to copy source files: {:?}", err)))?; + + for path in workspace_source_files.iter() { + if let Err(err) = set_timestamps(path).await { + return Err(Status::internal(format!( + "failed to sanitize output files: {:?}", + err + ))); + } + } + + Ok(()) }