Skip to content

Commit

Permalink
refactor: job isolation done
Browse files Browse the repository at this point in the history
  • Loading branch information
Mohiiit committed Jan 22, 2025
1 parent 9000464 commit e703bff
Show file tree
Hide file tree
Showing 14 changed files with 218 additions and 52 deletions.
7 changes: 7 additions & 0 deletions crates/orchestrator/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt";
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json";
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt";
pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip";

pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path";
pub const JOB_METADATA_SNOS_OUTPUT_PATH: &str = "snos_output_path";
pub const JOB_METADATA_PROGRAM_OUTPUT_PATH: &str = "program_output_path";
pub const JOB_METADATA_CROSS_VERIFY: &str = "cross_verify";
pub const JOB_METADATA_SNOS_FULL_OUTPUT: &str = "snos_full_output";
pub const JOB_METADATA_BLOB_DATA_PATH: &str = "blob_data_path";
16 changes: 11 additions & 5 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use uuid::Uuid;
use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;
use crate::constants::{BLOB_DATA_FILE_NAME, JOB_METADATA_BLOB_DATA_PATH};
use crate::jobs::state_update_job::utils::biguint_vec_to_u8_vec;

lazy_static! {
Expand Down Expand Up @@ -133,7 +133,11 @@ impl Job for DaJob {
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

store_blob_data(transformed_data.clone(), block_no, config.clone()).await?;
let blob_data_path = format!("{}/{}", block_no, BLOB_DATA_FILE_NAME);
store_blob_data(transformed_data.clone(), &blob_data_path, config.clone()).await?;

// Add the path to metadata
job.metadata.insert(JOB_METADATA_BLOB_DATA_PATH.to_string(), blob_data_path);
tracing::debug!(job_id = ?job.id, "Stored blob data");

let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await;
Expand Down Expand Up @@ -344,14 +348,16 @@ pub async fn state_update_to_blob_data(
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
async fn store_blob_data(blob_data: Vec<BigUint>, block_number: u64, config: Arc<Config>) -> Result<(), JobError> {
async fn store_blob_data(blob_data: Vec<BigUint>, blob_data_path: &str, config: Arc<Config>) -> Result<(), JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;

let blob_data_vec_u8 = biguint_vec_to_u8_vec(blob_data.as_slice());

if !blob_data_vec_u8.is_empty() {
storage_client.put_data(blob_data_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?;
storage_client
.put_data(blob_data_vec_u8.into(), blob_data_path)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
}

Ok(())
Expand Down
28 changes: 21 additions & 7 deletions crates/orchestrator/src/jobs/proving_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use uuid::Uuid;
use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::CAIRO_PIE_FILE_NAME;
use crate::constants::{JOB_METADATA_CAIRO_PIE_PATH, JOB_METADATA_CROSS_VERIFY};
use crate::jobs::constants::JOB_METADATA_SNOS_FACT;

#[derive(Error, Debug, PartialEq)]
Expand Down Expand Up @@ -62,12 +62,15 @@ impl Job for ProvingJob {
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "proving", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "Proving job processing started.");

// Cairo Pie path in s3 storage client
let block_number: String = job.internal_id.to_string();
let cairo_pie_path = block_number + "/" + CAIRO_PIE_FILE_NAME;
// Replace the manual path construction with metadata lookup
let cairo_pie_path = job.metadata.get(JOB_METADATA_CAIRO_PIE_PATH).ok_or_else(|| {
tracing::error!(job_id = %job.internal_id, "Cairo PIE path not found in job metadata");
ProvingError::CairoPIEWrongPath { internal_id: job.internal_id.clone() }
})?;

tracing::debug!(job_id = %job.internal_id, %cairo_pie_path, "Fetching Cairo PIE file");

let cairo_pie_file = config.storage().get_data(&cairo_pie_path).await.map_err(|e| {
let cairo_pie_file = config.storage().get_data(cairo_pie_path).await.map_err(|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "Failed to fetch Cairo PIE file");
ProvingError::CairoPIEFileFetchFailed(e.to_string())
})?;
Expand Down Expand Up @@ -111,10 +114,21 @@ impl Job for ProvingJob {
OtherError(eyre!("Fact not available in job"))
})?;

tracing::debug!(job_id = %job.internal_id, %task_id, "Getting task status from prover client");
let cross_verify = match job.metadata.get(JOB_METADATA_CROSS_VERIFY) {
Some(value) => value == "true",
None => {
tracing::warn!(
job_id = %job.internal_id,
"Cross verification flag not found in metadata, defaulting to true"
);
true // Default to true for backward compatibility
}
};

tracing::debug!(job_id = %job.internal_id, %task_id, cross_verify, "Getting task status from prover client");
let task_status = config
.prover_client()
.get_task_status(&task_id, fact)
.get_task_status(&task_id, fact, cross_verify)
.await
.wrap_err("Prover Client Error".to_string())
.map_err(|e| {
Expand Down
40 changes: 31 additions & 9 deletions crates/orchestrator/src/jobs/snos_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use uuid::Uuid;
use super::constants::{JOB_METADATA_SNOS_BLOCK, JOB_METADATA_SNOS_FACT};
use super::{JobError, OtherError};
use crate::config::Config;
use crate::constants::{CAIRO_PIE_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME};
use crate::constants::{
CAIRO_PIE_FILE_NAME, JOB_METADATA_CAIRO_PIE_PATH, JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_FULL_OUTPUT,
JOB_METADATA_SNOS_OUTPUT_PATH, PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME,
};
use crate::data_storage::DataStorage;
use crate::jobs::snos_job::error::FactError;
use crate::jobs::snos_job::fact_info::get_fact_info;
Expand Down Expand Up @@ -106,20 +109,39 @@ impl Job for SnosJob {
let snos_url = config.snos_config().rpc_for_snos.to_string();
let snos_url = snos_url.trim_end_matches('/');
tracing::debug!(job_id = %job.internal_id, "Calling prove_block function");

let full_output = match job.metadata.get(JOB_METADATA_SNOS_FULL_OUTPUT) {
Some(value) => value == "true",
None => {
tracing::warn!(
job_id = %job.internal_id,
"SNOS full output configuration not found in metadata, defaulting to false"
);
false
}
};

let (cairo_pie, snos_output) =
prove_block(COMPILED_OS, block_number, snos_url, LayoutName::all_cairo, false).await.map_err(|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "SNOS execution failed");
SnosError::SnosExecutionError { internal_id: job.internal_id.clone(), message: e.to_string() }
})?;
prove_block(COMPILED_OS, block_number, snos_url, LayoutName::all_cairo, full_output).await.map_err(
|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "SNOS execution failed");
SnosError::SnosExecutionError { internal_id: job.internal_id.clone(), message: e.to_string() }
},
)?;
tracing::debug!(job_id = %job.internal_id, "prove_block function completed successfully");

let fact_info = get_fact_info(&cairo_pie, None)?;
let program_output = fact_info.program_output;
tracing::debug!(job_id = %job.internal_id, "Fact info calculated successfully");

tracing::debug!(job_id = %job.internal_id, "Storing SNOS outputs");
self.store(config.storage(), &job.internal_id, block_number, cairo_pie, snos_output, program_output).await?;
let (cairo_pie_path, snos_output_path, program_output_path) = self
.store(config.storage(), &job.internal_id, block_number, cairo_pie, snos_output, program_output)
.await?;

job.metadata.insert(JOB_METADATA_CAIRO_PIE_PATH.into(), cairo_pie_path);
job.metadata.insert(JOB_METADATA_SNOS_OUTPUT_PATH.into(), snos_output_path);
job.metadata.insert(JOB_METADATA_PROGRAM_OUTPUT_PATH.into(), program_output_path);
job.metadata.insert(JOB_METADATA_SNOS_FACT.into(), fact_info.fact.to_string());
tracing::info!(log_type = "completed", category = "snos", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "SNOS job processed successfully.");

Expand Down Expand Up @@ -178,7 +200,7 @@ impl SnosJob {
cairo_pie: CairoPie,
snos_output: StarknetOsOutput,
program_output: Vec<Felt252>,
) -> Result<(), SnosError> {
) -> Result<(String, String, String), SnosError> {
let cairo_pie_key = format!("{block_number}/{CAIRO_PIE_FILE_NAME}");
let cairo_pie_zip_bytes = self.cairo_pie_to_zip_bytes(cairo_pie).await.map_err(|e| {
SnosError::CairoPieUnserializable { internal_id: internal_id.to_string(), message: e.to_string() }
Expand All @@ -196,17 +218,17 @@ impl SnosJob {
SnosError::SnosOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() }
})?;

let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}");
let program_output: Vec<[u8; 32]> = program_output.iter().map(|f| f.to_bytes_be()).collect();
let encoded_data = bincode::serialize(&program_output).map_err(|e| SnosError::ProgramOutputUnserializable {
internal_id: internal_id.to_string(),
message: e.to_string(),
})?;
let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}");
data_storage.put_data(encoded_data.into(), &program_output_key).await.map_err(|e| {
SnosError::ProgramOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() }
})?;

Ok(())
Ok((cairo_pie_key, snos_output_key, program_output_key))
}

/// Converts the [CairoPie] input as a zip file and returns it as [Bytes].
Expand Down
63 changes: 46 additions & 17 deletions crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::constants::{
};
use super::{JobError, OtherError};
use crate::config::Config;
use crate::constants::{PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME};
use crate::constants::{JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_OUTPUT_PATH};
use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY;
use crate::jobs::state_update_job::utils::fetch_blob_data_for_block;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
Expand Down Expand Up @@ -128,12 +128,12 @@ impl Job for StateUpdateJob {

let mut nonce = config.settlement_client().get_nonce().await.map_err(|e| JobError::Other(OtherError(e)))?;
let mut sent_tx_hashes: Vec<String> = Vec::with_capacity(block_numbers.len());
for block_no in block_numbers.iter() {
for (i, block_no) in block_numbers.iter().enumerate() {
tracing::debug!(job_id = %job.internal_id, block_no = %block_no, "Processing block");

let snos = self.fetch_snos_for_block(*block_no, config.clone()).await?;
let snos = self.fetch_snos_for_block(i, config.clone(), job).await?;
let txn_hash = self
.update_state_for_block(config.clone(), *block_no, snos, nonce)
.update_state_for_block(config.clone(), *block_no, i, snos, nonce, job)
.await
.map_err(|e| {
tracing::error!(job_id = %job.internal_id, block_no = %block_no, error = %e, "Error updating state for block");
Expand Down Expand Up @@ -309,22 +309,21 @@ impl StateUpdateJob {
&self,
config: Arc<Config>,
block_no: u64,
block_index: usize,
snos: StarknetOsOutput,
nonce: u64,
job: &JobItem,
) -> Result<String, JobError> {
let settlement_client = config.settlement_client();
let last_tx_hash_executed = if snos.use_kzg_da == Felt252::ZERO {
unimplemented!("update_state_for_block not implemented as of now for calldata DA.")
} else if snos.use_kzg_da == Felt252::ONE {
let blob_data = fetch_blob_data_for_block(block_no, config.clone())
let blob_data = fetch_blob_data_for_block(block_index, config.clone(), job)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await?;
let program_output = self.fetch_program_output_for_block(block_index, config.clone(), job).await?;

// TODO :
// Fetching nonce before the transaction is run
// Sending update_state transaction from the settlement client
settlement_client
.update_state_with_blobs(program_output, blob_data, nonce)
.await
Expand All @@ -336,29 +335,59 @@ impl StateUpdateJob {
}

/// Retrieves the SNOS output for the corresponding block.
async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> Result<StarknetOsOutput, JobError> {
async fn fetch_snos_for_block(
&self,
block_index: usize,
config: Arc<Config>,
job: &JobItem,
) -> Result<StarknetOsOutput, JobError> {
let storage_client = config.storage();
let key = block_no.to_string() + "/" + SNOS_OUTPUT_FILE_NAME;

let snos_output_bytes = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;
// Get the array of SNOS paths from metadata
let snos_paths: Vec<String> = serde_json::from_str(
job.metadata
.get(JOB_METADATA_SNOS_OUTPUT_PATH)
.ok_or_else(|| JobError::Other(OtherError(eyre!("SNOS output paths not found in metadata"))))?,
)
.map_err(|e| JobError::Other(OtherError(eyre!("Failed to parse SNOS paths from metadata: {}", e))))?;

// Get the path for this block
let path = snos_paths.get(block_index).ok_or_else(|| {
JobError::Other(OtherError(eyre!("SNOS output path not found for index {}", block_index)))
})?;

let snos_output_bytes = storage_client.get_data(path).await.map_err(|e| JobError::Other(OtherError(e)))?;

serde_json::from_slice(snos_output_bytes.iter().as_slice()).map_err(|e| {
JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output for block {}: {}", block_no, e)))
JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output from path {}: {}", path, e)))
})
}

async fn fetch_program_output_for_block(
&self,
block_number: u64,
block_index: usize,
config: Arc<Config>,
job: &JobItem,
) -> Result<Vec<[u8; 32]>, JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME;

let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;
// Get the array of program output paths from metadata
let program_paths: Vec<String> = serde_json::from_str(
job.metadata
.get(JOB_METADATA_PROGRAM_OUTPUT_PATH)
.ok_or_else(|| JobError::Other(OtherError(eyre!("Program output paths not found in metadata"))))?,
)
.map_err(|e| JobError::Other(OtherError(eyre!("Failed to parse program paths from metadata: {}", e))))?;

// Get the path for this block
let path = program_paths.get(block_index).ok_or_else(|| {
JobError::Other(OtherError(eyre!("Program output path not found for index {}", block_index)))
})?;

let program_output = storage_client.get_data(path).await.map_err(|e| JobError::Other(OtherError(e)))?;

bincode::deserialize(&program_output).map_err(|e| {
JobError::Other(OtherError(eyre!("Failed to deserialize program output for block {}: {}", block_number, e)))
JobError::Other(OtherError(eyre!("Failed to deserialize program output from path {}: {}", path, e)))
})
}

Expand Down
23 changes: 19 additions & 4 deletions crates/orchestrator/src/jobs/state_update_job/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,30 @@ use std::sync::Arc;
use alloy::primitives::U256;
use color_eyre::eyre::eyre;
use num_bigint::BigUint;
use serde_json;

use crate::config::Config;
use crate::constants::{BLOB_DATA_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME};
use crate::constants::{JOB_METADATA_BLOB_DATA_PATH, PROGRAM_OUTPUT_FILE_NAME};
use crate::jobs::types::JobItem;

/// Fetching the blob data (stored in remote storage during DA job) for a particular block
pub async fn fetch_blob_data_for_block(block_number: u64, config: Arc<Config>) -> color_eyre::Result<Vec<Vec<u8>>> {
pub async fn fetch_blob_data_for_block(
block_index: usize,
config: Arc<Config>,
job: &JobItem,
) -> color_eyre::Result<Vec<Vec<u8>>> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;
let blob_data = storage_client.get_data(&key).await?;

// Get the array of blob paths from metadata
let blob_paths: Vec<String> = serde_json::from_str(
job.metadata.get(JOB_METADATA_BLOB_DATA_PATH).ok_or_else(|| eyre!("Blob data paths not found in metadata"))?,
)?;

// Get the path for this block
let path =
blob_paths.get(block_index).ok_or_else(|| eyre!("Blob data path not found for index {}", block_index))?;

let blob_data = storage_client.get_data(path).await?;
Ok(vec![blob_data.to_vec()])
}

Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/jobs/proving_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn test_create_job() {
#[tokio::test]
async fn test_verify_job(#[from(default_job_item)] mut job_item: JobItem) {
let mut prover_client = MockProverClient::new();
prover_client.expect_get_task_status().times(1).returning(|_, _| Ok(TaskStatus::Succeeded));
prover_client.expect_get_task_status().times(1).returning(|_, _, _| Ok(TaskStatus::Succeeded));

let services = TestConfigBuilder::new().configure_prover_client(prover_client.into()).build().await;

Expand Down
7 changes: 6 additions & 1 deletion crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_trait::async_trait;
use opentelemetry::KeyValue;

use crate::config::Config;
use crate::constants::JOB_METADATA_CROSS_VERIFY;
use crate::jobs::create_job;
use crate::jobs::types::{JobStatus, JobType};
use crate::metrics::ORCHESTRATOR_METRICS;
Expand All @@ -26,8 +27,12 @@ impl Worker for ProvingWorker {
tracing::debug!("Found {} successful SNOS jobs without proving jobs", successful_snos_jobs.len());

for job in successful_snos_jobs {
let mut metadata = job.metadata.clone();
// Set cross-verification to true by default
metadata.insert(JOB_METADATA_CROSS_VERIFY.to_string(), "true".to_string());

tracing::debug!(job_id = %job.internal_id, "Creating proof creation job for SNOS job");
match create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await {
match create_job(JobType::ProofCreation, job.internal_id.to_string(), metadata, config.clone()).await {
Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new proving job"),
Err(e) => {
tracing::warn!(job_id = %job.internal_id, error = %e, "Failed to create new state transition job");
Expand Down
Loading

0 comments on commit e703bff

Please sign in to comment.