From e703bff9a437dd8817ace64d1555ba4bfd8fae76 Mon Sep 17 00:00:00 2001 From: mohiiit Date: Wed, 22 Jan 2025 11:43:09 +0530 Subject: [PATCH] refactor: job isolation done --- crates/orchestrator/src/constants.rs | 7 +++ crates/orchestrator/src/jobs/da_job/mod.rs | 16 +++-- .../orchestrator/src/jobs/proving_job/mod.rs | 28 ++++++--- crates/orchestrator/src/jobs/snos_job/mod.rs | 40 +++++++++--- .../src/jobs/state_update_job/mod.rs | 63 ++++++++++++++----- .../src/jobs/state_update_job/utils.rs | 23 +++++-- .../src/tests/jobs/proving_job/mod.rs | 2 +- crates/orchestrator/src/workers/proving.rs | 7 ++- crates/orchestrator/src/workers/snos.rs | 6 +- .../orchestrator/src/workers/update_state.rs | 42 +++++++++++++ .../atlantic-service/src/lib.rs | 19 ++++-- .../prover-client-interface/src/lib.rs | 7 ++- .../prover-clients/sharp-service/src/lib.rs | 7 ++- .../prover-clients/sharp-service/tests/lib.rs | 3 +- 14 files changed, 218 insertions(+), 52 deletions(-) diff --git a/crates/orchestrator/src/constants.rs b/crates/orchestrator/src/constants.rs index 049d3ee3..17a28f5c 100644 --- a/crates/orchestrator/src/constants.rs +++ b/crates/orchestrator/src/constants.rs @@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt"; pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json"; pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt"; pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip"; + +pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path"; +pub const JOB_METADATA_SNOS_OUTPUT_PATH: &str = "snos_output_path"; +pub const JOB_METADATA_PROGRAM_OUTPUT_PATH: &str = "program_output_path"; +pub const JOB_METADATA_CROSS_VERIFY: &str = "cross_verify"; +pub const JOB_METADATA_SNOS_FULL_OUTPUT: &str = "snos_full_output"; +pub const JOB_METADATA_BLOB_DATA_PATH: &str = "blob_data_path"; diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index ab71ab6d..7051dbb6 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -19,7 +19,7 @@ use uuid::Uuid; use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use super::{Job, JobError, OtherError}; use crate::config::Config; -use crate::constants::BLOB_DATA_FILE_NAME; +use crate::constants::{BLOB_DATA_FILE_NAME, JOB_METADATA_BLOB_DATA_PATH}; use crate::jobs::state_update_job::utils::biguint_vec_to_u8_vec; lazy_static! { @@ -133,7 +133,11 @@ impl Job for DaJob { // data transformation on the data tracing::trace!(job_id = ?job.id, "Applied FFT transformation"); - store_blob_data(transformed_data.clone(), block_no, config.clone()).await?; + let blob_data_path = format!("{}/{}", block_no, BLOB_DATA_FILE_NAME); + store_blob_data(transformed_data.clone(), &blob_data_path, config.clone()).await?; + + // Add the path to metadata + job.metadata.insert(JOB_METADATA_BLOB_DATA_PATH.to_string(), blob_data_path); tracing::debug!(job_id = ?job.id, "Stored blob data"); let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await; @@ -344,14 +348,16 @@ pub async fn state_update_to_blob_data( } /// To store the blob data using the storage client with path /blob_data.txt -async fn store_blob_data(blob_data: Vec, block_number: u64, config: Arc) -> Result<(), JobError> { +async fn store_blob_data(blob_data: Vec, blob_data_path: &str, config: Arc) -> Result<(), JobError> { let storage_client = config.storage(); - let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME; let blob_data_vec_u8 = biguint_vec_to_u8_vec(blob_data.as_slice()); if !blob_data_vec_u8.is_empty() { - storage_client.put_data(blob_data_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?; + storage_client + .put_data(blob_data_vec_u8.into(), blob_data_path) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; } Ok(()) diff --git a/crates/orchestrator/src/jobs/proving_job/mod.rs b/crates/orchestrator/src/jobs/proving_job/mod.rs index a034bb4e..0fd04ba0 100644 --- a/crates/orchestrator/src/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/jobs/proving_job/mod.rs @@ -12,7 +12,7 @@ use uuid::Uuid; use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use super::{Job, JobError, OtherError}; use crate::config::Config; -use crate::constants::CAIRO_PIE_FILE_NAME; +use crate::constants::{JOB_METADATA_CAIRO_PIE_PATH, JOB_METADATA_CROSS_VERIFY}; use crate::jobs::constants::JOB_METADATA_SNOS_FACT; #[derive(Error, Debug, PartialEq)] @@ -62,12 +62,15 @@ impl Job for ProvingJob { let internal_id = job.internal_id.clone(); tracing::info!(log_type = "starting", category = "proving", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "Proving job processing started."); - // Cairo Pie path in s3 storage client - let block_number: String = job.internal_id.to_string(); - let cairo_pie_path = block_number + "/" + CAIRO_PIE_FILE_NAME; + // Replace the manual path construction with metadata lookup + let cairo_pie_path = job.metadata.get(JOB_METADATA_CAIRO_PIE_PATH).ok_or_else(|| { + tracing::error!(job_id = %job.internal_id, "Cairo PIE path not found in job metadata"); + ProvingError::CairoPIEWrongPath { internal_id: job.internal_id.clone() } + })?; + tracing::debug!(job_id = %job.internal_id, %cairo_pie_path, "Fetching Cairo PIE file"); - let cairo_pie_file = config.storage().get_data(&cairo_pie_path).await.map_err(|e| { + let cairo_pie_file = config.storage().get_data(cairo_pie_path).await.map_err(|e| { tracing::error!(job_id = %job.internal_id, error = %e, "Failed to fetch Cairo PIE file"); ProvingError::CairoPIEFileFetchFailed(e.to_string()) })?; @@ -111,10 +114,21 @@ impl Job for ProvingJob { OtherError(eyre!("Fact not available in job")) })?; - tracing::debug!(job_id = %job.internal_id, %task_id, "Getting task status from prover client"); + let cross_verify = match job.metadata.get(JOB_METADATA_CROSS_VERIFY) { + Some(value) => value == "true", + None => { + tracing::warn!( + job_id = %job.internal_id, + "Cross verification flag not found in metadata, defaulting to true" + ); + true // Default to true for backward compatibility + } + }; + + tracing::debug!(job_id = %job.internal_id, %task_id, cross_verify, "Getting task status from prover client"); let task_status = config .prover_client() - .get_task_status(&task_id, fact) + .get_task_status(&task_id, fact, cross_verify) .await .wrap_err("Prover Client Error".to_string()) .map_err(|e| { diff --git a/crates/orchestrator/src/jobs/snos_job/mod.rs b/crates/orchestrator/src/jobs/snos_job/mod.rs index c82d48c3..1b07d24b 100644 --- a/crates/orchestrator/src/jobs/snos_job/mod.rs +++ b/crates/orchestrator/src/jobs/snos_job/mod.rs @@ -18,7 +18,10 @@ use uuid::Uuid; use super::constants::{JOB_METADATA_SNOS_BLOCK, JOB_METADATA_SNOS_FACT}; use super::{JobError, OtherError}; use crate::config::Config; -use crate::constants::{CAIRO_PIE_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME}; +use crate::constants::{ + CAIRO_PIE_FILE_NAME, JOB_METADATA_CAIRO_PIE_PATH, JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_FULL_OUTPUT, + JOB_METADATA_SNOS_OUTPUT_PATH, PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME, +}; use crate::data_storage::DataStorage; use crate::jobs::snos_job::error::FactError; use crate::jobs::snos_job::fact_info::get_fact_info; @@ -106,11 +109,25 @@ impl Job for SnosJob { let snos_url = config.snos_config().rpc_for_snos.to_string(); let snos_url = snos_url.trim_end_matches('/'); tracing::debug!(job_id = %job.internal_id, "Calling prove_block function"); + + let full_output = match job.metadata.get(JOB_METADATA_SNOS_FULL_OUTPUT) { + Some(value) => value == "true", + None => { + tracing::warn!( + job_id = %job.internal_id, + "SNOS full output configuration not found in metadata, defaulting to false" + ); + false + } + }; + let (cairo_pie, snos_output) = - prove_block(COMPILED_OS, block_number, snos_url, LayoutName::all_cairo, false).await.map_err(|e| { - tracing::error!(job_id = %job.internal_id, error = %e, "SNOS execution failed"); - SnosError::SnosExecutionError { internal_id: job.internal_id.clone(), message: e.to_string() } - })?; + prove_block(COMPILED_OS, block_number, snos_url, LayoutName::all_cairo, full_output).await.map_err( + |e| { + tracing::error!(job_id = %job.internal_id, error = %e, "SNOS execution failed"); + SnosError::SnosExecutionError { internal_id: job.internal_id.clone(), message: e.to_string() } + }, + )?; tracing::debug!(job_id = %job.internal_id, "prove_block function completed successfully"); let fact_info = get_fact_info(&cairo_pie, None)?; @@ -118,8 +135,13 @@ impl Job for SnosJob { tracing::debug!(job_id = %job.internal_id, "Fact info calculated successfully"); tracing::debug!(job_id = %job.internal_id, "Storing SNOS outputs"); - self.store(config.storage(), &job.internal_id, block_number, cairo_pie, snos_output, program_output).await?; + let (cairo_pie_path, snos_output_path, program_output_path) = self + .store(config.storage(), &job.internal_id, block_number, cairo_pie, snos_output, program_output) + .await?; + job.metadata.insert(JOB_METADATA_CAIRO_PIE_PATH.into(), cairo_pie_path); + job.metadata.insert(JOB_METADATA_SNOS_OUTPUT_PATH.into(), snos_output_path); + job.metadata.insert(JOB_METADATA_PROGRAM_OUTPUT_PATH.into(), program_output_path); job.metadata.insert(JOB_METADATA_SNOS_FACT.into(), fact_info.fact.to_string()); tracing::info!(log_type = "completed", category = "snos", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "SNOS job processed successfully."); @@ -178,7 +200,7 @@ impl SnosJob { cairo_pie: CairoPie, snos_output: StarknetOsOutput, program_output: Vec, - ) -> Result<(), SnosError> { + ) -> Result<(String, String, String), SnosError> { let cairo_pie_key = format!("{block_number}/{CAIRO_PIE_FILE_NAME}"); let cairo_pie_zip_bytes = self.cairo_pie_to_zip_bytes(cairo_pie).await.map_err(|e| { SnosError::CairoPieUnserializable { internal_id: internal_id.to_string(), message: e.to_string() } @@ -196,17 +218,17 @@ impl SnosJob { SnosError::SnosOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() } })?; + let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}"); let program_output: Vec<[u8; 32]> = program_output.iter().map(|f| f.to_bytes_be()).collect(); let encoded_data = bincode::serialize(&program_output).map_err(|e| SnosError::ProgramOutputUnserializable { internal_id: internal_id.to_string(), message: e.to_string(), })?; - let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}"); data_storage.put_data(encoded_data.into(), &program_output_key).await.map_err(|e| { SnosError::ProgramOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() } })?; - Ok(()) + Ok((cairo_pie_key, snos_output_key, program_output_key)) } /// Converts the [CairoPie] input as a zip file and returns it as [Bytes]. diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs index 0856670e..59359944 100644 --- a/crates/orchestrator/src/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -19,7 +19,7 @@ use super::constants::{ }; use super::{JobError, OtherError}; use crate::config::Config; -use crate::constants::{PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME}; +use crate::constants::{JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_OUTPUT_PATH}; use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY; use crate::jobs::state_update_job::utils::fetch_blob_data_for_block; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; @@ -128,12 +128,12 @@ impl Job for StateUpdateJob { let mut nonce = config.settlement_client().get_nonce().await.map_err(|e| JobError::Other(OtherError(e)))?; let mut sent_tx_hashes: Vec = Vec::with_capacity(block_numbers.len()); - for block_no in block_numbers.iter() { + for (i, block_no) in block_numbers.iter().enumerate() { tracing::debug!(job_id = %job.internal_id, block_no = %block_no, "Processing block"); - let snos = self.fetch_snos_for_block(*block_no, config.clone()).await?; + let snos = self.fetch_snos_for_block(i, config.clone(), job).await?; let txn_hash = self - .update_state_for_block(config.clone(), *block_no, snos, nonce) + .update_state_for_block(config.clone(), *block_no, i, snos, nonce, job) .await .map_err(|e| { tracing::error!(job_id = %job.internal_id, block_no = %block_no, error = %e, "Error updating state for block"); @@ -309,22 +309,21 @@ impl StateUpdateJob { &self, config: Arc, block_no: u64, + block_index: usize, snos: StarknetOsOutput, nonce: u64, + job: &JobItem, ) -> Result { let settlement_client = config.settlement_client(); let last_tx_hash_executed = if snos.use_kzg_da == Felt252::ZERO { unimplemented!("update_state_for_block not implemented as of now for calldata DA.") } else if snos.use_kzg_da == Felt252::ONE { - let blob_data = fetch_blob_data_for_block(block_no, config.clone()) + let blob_data = fetch_blob_data_for_block(block_index, config.clone(), job) .await .map_err(|e| JobError::Other(OtherError(e)))?; - let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await?; + let program_output = self.fetch_program_output_for_block(block_index, config.clone(), job).await?; - // TODO : - // Fetching nonce before the transaction is run - // Sending update_state transaction from the settlement client settlement_client .update_state_with_blobs(program_output, blob_data, nonce) .await @@ -336,29 +335,59 @@ impl StateUpdateJob { } /// Retrieves the SNOS output for the corresponding block. - async fn fetch_snos_for_block(&self, block_no: u64, config: Arc) -> Result { + async fn fetch_snos_for_block( + &self, + block_index: usize, + config: Arc, + job: &JobItem, + ) -> Result { let storage_client = config.storage(); - let key = block_no.to_string() + "/" + SNOS_OUTPUT_FILE_NAME; - let snos_output_bytes = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?; + // Get the array of SNOS paths from metadata + let snos_paths: Vec = serde_json::from_str( + job.metadata + .get(JOB_METADATA_SNOS_OUTPUT_PATH) + .ok_or_else(|| JobError::Other(OtherError(eyre!("SNOS output paths not found in metadata"))))?, + ) + .map_err(|e| JobError::Other(OtherError(eyre!("Failed to parse SNOS paths from metadata: {}", e))))?; + + // Get the path for this block + let path = snos_paths.get(block_index).ok_or_else(|| { + JobError::Other(OtherError(eyre!("SNOS output path not found for index {}", block_index))) + })?; + + let snos_output_bytes = storage_client.get_data(path).await.map_err(|e| JobError::Other(OtherError(e)))?; serde_json::from_slice(snos_output_bytes.iter().as_slice()).map_err(|e| { - JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output for block {}: {}", block_no, e))) + JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output from path {}: {}", path, e))) }) } async fn fetch_program_output_for_block( &self, - block_number: u64, + block_index: usize, config: Arc, + job: &JobItem, ) -> Result, JobError> { let storage_client = config.storage(); - let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME; - let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?; + // Get the array of program output paths from metadata + let program_paths: Vec = serde_json::from_str( + job.metadata + .get(JOB_METADATA_PROGRAM_OUTPUT_PATH) + .ok_or_else(|| JobError::Other(OtherError(eyre!("Program output paths not found in metadata"))))?, + ) + .map_err(|e| JobError::Other(OtherError(eyre!("Failed to parse program paths from metadata: {}", e))))?; + + // Get the path for this block + let path = program_paths.get(block_index).ok_or_else(|| { + JobError::Other(OtherError(eyre!("Program output path not found for index {}", block_index))) + })?; + + let program_output = storage_client.get_data(path).await.map_err(|e| JobError::Other(OtherError(e)))?; bincode::deserialize(&program_output).map_err(|e| { - JobError::Other(OtherError(eyre!("Failed to deserialize program output for block {}: {}", block_number, e))) + JobError::Other(OtherError(eyre!("Failed to deserialize program output from path {}: {}", path, e))) }) } diff --git a/crates/orchestrator/src/jobs/state_update_job/utils.rs b/crates/orchestrator/src/jobs/state_update_job/utils.rs index a6acfe4e..a22d9e8e 100644 --- a/crates/orchestrator/src/jobs/state_update_job/utils.rs +++ b/crates/orchestrator/src/jobs/state_update_job/utils.rs @@ -6,15 +6,30 @@ use std::sync::Arc; use alloy::primitives::U256; use color_eyre::eyre::eyre; use num_bigint::BigUint; +use serde_json; use crate::config::Config; -use crate::constants::{BLOB_DATA_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME}; +use crate::constants::{JOB_METADATA_BLOB_DATA_PATH, PROGRAM_OUTPUT_FILE_NAME}; +use crate::jobs::types::JobItem; /// Fetching the blob data (stored in remote storage during DA job) for a particular block -pub async fn fetch_blob_data_for_block(block_number: u64, config: Arc) -> color_eyre::Result>> { +pub async fn fetch_blob_data_for_block( + block_index: usize, + config: Arc, + job: &JobItem, +) -> color_eyre::Result>> { let storage_client = config.storage(); - let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME; - let blob_data = storage_client.get_data(&key).await?; + + // Get the array of blob paths from metadata + let blob_paths: Vec = serde_json::from_str( + job.metadata.get(JOB_METADATA_BLOB_DATA_PATH).ok_or_else(|| eyre!("Blob data paths not found in metadata"))?, + )?; + + // Get the path for this block + let path = + blob_paths.get(block_index).ok_or_else(|| eyre!("Blob data path not found for index {}", block_index))?; + + let blob_data = storage_client.get_data(path).await?; Ok(vec![blob_data.to_vec()]) } diff --git a/crates/orchestrator/src/tests/jobs/proving_job/mod.rs b/crates/orchestrator/src/tests/jobs/proving_job/mod.rs index 162a0af3..e9b3f8e2 100644 --- a/crates/orchestrator/src/tests/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/proving_job/mod.rs @@ -45,7 +45,7 @@ async fn test_create_job() { #[tokio::test] async fn test_verify_job(#[from(default_job_item)] mut job_item: JobItem) { let mut prover_client = MockProverClient::new(); - prover_client.expect_get_task_status().times(1).returning(|_, _| Ok(TaskStatus::Succeeded)); + prover_client.expect_get_task_status().times(1).returning(|_, _, _| Ok(TaskStatus::Succeeded)); let services = TestConfigBuilder::new().configure_prover_client(prover_client.into()).build().await; diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs index 66c52dfe..a152abeb 100644 --- a/crates/orchestrator/src/workers/proving.rs +++ b/crates/orchestrator/src/workers/proving.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use opentelemetry::KeyValue; use crate::config::Config; +use crate::constants::JOB_METADATA_CROSS_VERIFY; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; use crate::metrics::ORCHESTRATOR_METRICS; @@ -26,8 +27,12 @@ impl Worker for ProvingWorker { tracing::debug!("Found {} successful SNOS jobs without proving jobs", successful_snos_jobs.len()); for job in successful_snos_jobs { + let mut metadata = job.metadata.clone(); + // Set cross-verification to true by default + metadata.insert(JOB_METADATA_CROSS_VERIFY.to_string(), "true".to_string()); + tracing::debug!(job_id = %job.internal_id, "Creating proof creation job for SNOS job"); - match create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await { + match create_job(JobType::ProofCreation, job.internal_id.to_string(), metadata, config.clone()).await { Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new proving job"), Err(e) => { tracing::warn!(job_id = %job.internal_id, error = %e, "Failed to create new state transition job"); diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index 0abaf2a1..a609eb1f 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -8,6 +8,7 @@ use opentelemetry::KeyValue; use starknet::providers::Provider; use crate::config::Config; +use crate::constants::JOB_METADATA_SNOS_FULL_OUTPUT; use crate::jobs::create_job; use crate::jobs::types::JobType; use crate::metrics::ORCHESTRATOR_METRICS; @@ -52,7 +53,10 @@ impl Worker for SnosWorker { }; for block_num in block_start..latest_block_number + 1 { - match create_job(JobType::SnosRun, block_num.to_string(), HashMap::new(), config.clone()).await { + let mut metadata = HashMap::new(); + metadata.insert(JOB_METADATA_SNOS_FULL_OUTPUT.to_string(), "false".to_string()); + + match create_job(JobType::SnosRun, block_num.to_string(), metadata, config.clone()).await { Ok(_) => tracing::info!(block_id = %block_num, "Successfully created new Snos job"), Err(e) => { tracing::warn!(block_id = %block_num, error = %e, "Failed to create new Snos job"); diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index a6b7da2d..bab09dc2 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -2,9 +2,12 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use color_eyre::eyre::eyre; use opentelemetry::KeyValue; +use serde_json; use crate::config::Config; +use crate::constants::{JOB_METADATA_BLOB_DATA_PATH, JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_OUTPUT_PATH}; use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; @@ -115,6 +118,45 @@ impl Worker for UpdateStateWorker { blocks_to_process.iter().map(|ele| ele.to_string()).collect::>().join(","), ); + // Initialize vectors to store paths + let mut snos_paths = Vec::new(); + let mut program_paths = Vec::new(); + let mut blob_paths = Vec::new(); + + // For each block, get paths from both DA and SNOS jobs + for block_number in &blocks_to_process { + // Get SNOS job and copy its paths + let snos_job = config + .database() + .get_job_by_internal_id_and_type(&block_number.to_string(), &JobType::SnosRun) + .await? + .ok_or_else(|| eyre!("SNOS job not found for block {}", block_number))?; + + // Get paths from SNOS job + if let Some(snos_path) = snos_job.metadata.get(JOB_METADATA_SNOS_OUTPUT_PATH) { + snos_paths.push(snos_path.clone()); + } + if let Some(program_path) = snos_job.metadata.get(JOB_METADATA_PROGRAM_OUTPUT_PATH) { + program_paths.push(program_path.clone()); + } + + // Get DA job and copy blob data path + let da_job = config + .database() + .get_job_by_internal_id_and_type(&block_number.to_string(), &JobType::DataSubmission) + .await? + .ok_or_else(|| eyre!("DA job not found for block {}", block_number))?; + + if let Some(blob_path) = da_job.metadata.get(JOB_METADATA_BLOB_DATA_PATH) { + blob_paths.push(blob_path.clone()); + } + } + + // Store paths as JSON arrays + metadata.insert(JOB_METADATA_SNOS_OUTPUT_PATH.to_string(), serde_json::to_string(&snos_paths)?); + metadata.insert(JOB_METADATA_PROGRAM_OUTPUT_PATH.to_string(), serde_json::to_string(&program_paths)?); + metadata.insert(JOB_METADATA_BLOB_DATA_PATH.to_string(), serde_json::to_string(&blob_paths)?); + // Creating a single job for all the pending blocks. let new_job_id = blocks_to_process[0].to_string(); match create_job(JobType::StateTransition, new_job_id.clone(), metadata, config.clone()).await { diff --git a/crates/prover-clients/atlantic-service/src/lib.rs b/crates/prover-clients/atlantic-service/src/lib.rs index d10e11ba..ce24841e 100644 --- a/crates/prover-clients/atlantic-service/src/lib.rs +++ b/crates/prover-clients/atlantic-service/src/lib.rs @@ -67,16 +67,27 @@ impl ProverClient for AtlanticProverService { } #[tracing::instrument(skip(self))] - async fn get_task_status(&self, job_key: &str, fact: &str) -> Result { + async fn get_task_status( + &self, + job_key: &str, + fact: &str, + cross_verify: bool, + ) -> Result { let res = self.atlantic_client.get_job_status(job_key).await?; match res.atlantic_query.status { AtlanticQueryStatus::InProgress => Ok(TaskStatus::Processing), AtlanticQueryStatus::Done => { let fact = B256::from_str(fact).map_err(|e| ProverClientError::FailedToConvertFact(e.to_string()))?; - if self.fact_checker.is_valid(&fact).await? { - Ok(TaskStatus::Succeeded) + if cross_verify { + tracing::debug!(fact = %hex::encode(fact), "Cross-verifying fact on chain"); + if self.fact_checker.is_valid(&fact).await? { + Ok(TaskStatus::Succeeded) + } else { + Ok(TaskStatus::Failed(format!("Fact {} is not valid or not registered", hex::encode(fact)))) + } } else { - Ok(TaskStatus::Failed(format!("Fact {} is not valid or not registered", hex::encode(fact)))) + tracing::debug!(fact = %hex::encode(fact), "Skipping cross-verification as it's disabled"); + Ok(TaskStatus::Succeeded) } } AtlanticQueryStatus::Failed => { diff --git a/crates/prover-clients/prover-client-interface/src/lib.rs b/crates/prover-clients/prover-client-interface/src/lib.rs index cd560c09..743cedb5 100644 --- a/crates/prover-clients/prover-client-interface/src/lib.rs +++ b/crates/prover-clients/prover-client-interface/src/lib.rs @@ -16,7 +16,12 @@ use mockall::automock; #[async_trait] pub trait ProverClient: Send + Sync { async fn submit_task(&self, task: Task, proof_layout: LayoutName) -> Result; - async fn get_task_status(&self, task_id: &str, fact: &str) -> Result; + async fn get_task_status( + &self, + task_id: &str, + fact: &str, + cross_verify: bool, + ) -> Result; } pub enum Task { diff --git a/crates/prover-clients/sharp-service/src/lib.rs b/crates/prover-clients/sharp-service/src/lib.rs index 6d0228c8..d991b9c5 100644 --- a/crates/prover-clients/sharp-service/src/lib.rs +++ b/crates/prover-clients/sharp-service/src/lib.rs @@ -63,7 +63,12 @@ impl ProverClient for SharpProverService { } #[tracing::instrument(skip(self), ret, err)] - async fn get_task_status(&self, job_key: &str, fact: &str) -> Result { + async fn get_task_status( + &self, + job_key: &str, + fact: &str, + _cross_verify: bool, + ) -> Result { tracing::info!( log_type = "starting", category = "get_task_status", diff --git a/crates/prover-clients/sharp-service/tests/lib.rs b/crates/prover-clients/sharp-service/tests/lib.rs index c59ff733..b180aa64 100644 --- a/crates/prover-clients/sharp-service/tests/lib.rs +++ b/crates/prover-clients/sharp-service/tests/lib.rs @@ -87,7 +87,8 @@ async fn prover_client_get_task_status_works(#[case] cairo_job_status: CairoJobS then.status(200).body(serde_json::to_vec(&get_task_status_sharp_response(&cairo_job_status)).unwrap()); }); - let task_status = sharp_service.get_task_status("c31381bf-4739-4667-b5b8-b08af1c6b1c7", TEST_FACT).await.unwrap(); + let task_status = + sharp_service.get_task_status("c31381bf-4739-4667-b5b8-b08af1c6b1c7", TEST_FACT, false).await.unwrap(); assert_eq!(task_status, get_task_status_expectation(&cairo_job_status), "Cairo Job Status assertion failed"); sharp_add_job_call.assert();