Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: job isolation done #204

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/orchestrator/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt";
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json";
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt";
pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip";

pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've a lot of metadata constants here. should we reuse this?

pub const JOB_METADATA_SNOS_OUTPUT_PATH: &str = "snos_output_path";
pub const JOB_METADATA_PROGRAM_OUTPUT_PATH: &str = "program_output_path";
pub const JOB_METADATA_CROSS_VERIFY: &str = "cross_verify";
pub const JOB_METADATA_SNOS_FULL_OUTPUT: &str = "snos_full_output";
pub const JOB_METADATA_BLOB_DATA_PATH: &str = "blob_data_path";
16 changes: 11 additions & 5 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use uuid::Uuid;
use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;
use crate::constants::{BLOB_DATA_FILE_NAME, JOB_METADATA_BLOB_DATA_PATH};
use crate::jobs::state_update_job::utils::biguint_vec_to_u8_vec;

lazy_static! {
Expand Down Expand Up @@ -133,7 +133,11 @@ impl Job for DaJob {
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

store_blob_data(transformed_data.clone(), block_no, config.clone()).await?;
let blob_data_path = format!("{}/{}", block_no, BLOB_DATA_FILE_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's change to worker specifying full path as discussed

store_blob_data(transformed_data.clone(), &blob_data_path, config.clone()).await?;

// Add the path to metadata
job.metadata.insert(JOB_METADATA_BLOB_DATA_PATH.to_string(), blob_data_path);
tracing::debug!(job_id = ?job.id, "Stored blob data");

let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await;
Expand Down Expand Up @@ -344,14 +348,16 @@ pub async fn state_update_to_blob_data(
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
async fn store_blob_data(blob_data: Vec<BigUint>, block_number: u64, config: Arc<Config>) -> Result<(), JobError> {
async fn store_blob_data(blob_data: Vec<BigUint>, blob_data_path: &str, config: Arc<Config>) -> Result<(), JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;

let blob_data_vec_u8 = biguint_vec_to_u8_vec(blob_data.as_slice());

if !blob_data_vec_u8.is_empty() {
storage_client.put_data(blob_data_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?;
storage_client
.put_data(blob_data_vec_u8.into(), blob_data_path)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
}

Ok(())
Expand Down
28 changes: 21 additions & 7 deletions crates/orchestrator/src/jobs/proving_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use uuid::Uuid;
use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::CAIRO_PIE_FILE_NAME;
use crate::constants::{JOB_METADATA_CAIRO_PIE_PATH, JOB_METADATA_CROSS_VERIFY};
use crate::jobs::constants::JOB_METADATA_SNOS_FACT;

#[derive(Error, Debug, PartialEq)]
Expand Down Expand Up @@ -62,12 +62,15 @@ impl Job for ProvingJob {
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "proving", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "Proving job processing started.");

// Cairo Pie path in s3 storage client
let block_number: String = job.internal_id.to_string();
let cairo_pie_path = block_number + "/" + CAIRO_PIE_FILE_NAME;
// Replace the manual path construction with metadata lookup
let cairo_pie_path = job.metadata.get(JOB_METADATA_CAIRO_PIE_PATH).ok_or_else(|| {
tracing::error!(job_id = %job.internal_id, "Cairo PIE path not found in job metadata");
ProvingError::CairoPIEWrongPath { internal_id: job.internal_id.clone() }
})?;

tracing::debug!(job_id = %job.internal_id, %cairo_pie_path, "Fetching Cairo PIE file");

let cairo_pie_file = config.storage().get_data(&cairo_pie_path).await.map_err(|e| {
let cairo_pie_file = config.storage().get_data(cairo_pie_path).await.map_err(|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "Failed to fetch Cairo PIE file");
ProvingError::CairoPIEFileFetchFailed(e.to_string())
})?;
Expand Down Expand Up @@ -111,10 +114,21 @@ impl Job for ProvingJob {
OtherError(eyre!("Fact not available in job"))
})?;

tracing::debug!(job_id = %job.internal_id, %task_id, "Getting task status from prover client");
let cross_verify = match job.metadata.get(JOB_METADATA_CROSS_VERIFY) {
Some(value) => value == "true",
None => {
tracing::warn!(
job_id = %job.internal_id,
"Cross verification flag not found in metadata, defaulting to true"
);
true // Default to true for backward compatibility
}
};

tracing::debug!(job_id = %job.internal_id, %task_id, cross_verify, "Getting task status from prover client");
let task_status = config
.prover_client()
.get_task_status(&task_id, fact)
.get_task_status(&task_id, fact, cross_verify)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cross_verify part should ideally be outside

.await
.wrap_err("Prover Client Error".to_string())
.map_err(|e| {
Expand Down
40 changes: 31 additions & 9 deletions crates/orchestrator/src/jobs/snos_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use uuid::Uuid;
use super::constants::{JOB_METADATA_SNOS_BLOCK, JOB_METADATA_SNOS_FACT};
use super::{JobError, OtherError};
use crate::config::Config;
use crate::constants::{CAIRO_PIE_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME};
use crate::constants::{
CAIRO_PIE_FILE_NAME, JOB_METADATA_CAIRO_PIE_PATH, JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_FULL_OUTPUT,
JOB_METADATA_SNOS_OUTPUT_PATH, PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME,
};
use crate::data_storage::DataStorage;
use crate::jobs::snos_job::error::FactError;
use crate::jobs::snos_job::fact_info::get_fact_info;
Expand Down Expand Up @@ -106,20 +109,39 @@ impl Job for SnosJob {
let snos_url = config.snos_config().rpc_for_snos.to_string();
let snos_url = snos_url.trim_end_matches('/');
tracing::debug!(job_id = %job.internal_id, "Calling prove_block function");

let full_output = match job.metadata.get(JOB_METADATA_SNOS_FULL_OUTPUT) {
Some(value) => value == "true",
None => {
tracing::warn!(
job_id = %job.internal_id,
"SNOS full output configuration not found in metadata, defaulting to false"
);
false
}
};

let (cairo_pie, snos_output) =
prove_block(COMPILED_OS, block_number, snos_url, LayoutName::all_cairo, false).await.map_err(|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "SNOS execution failed");
SnosError::SnosExecutionError { internal_id: job.internal_id.clone(), message: e.to_string() }
})?;
prove_block(COMPILED_OS, block_number, snos_url, LayoutName::all_cairo, full_output).await.map_err(
|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "SNOS execution failed");
SnosError::SnosExecutionError { internal_id: job.internal_id.clone(), message: e.to_string() }
},
)?;
tracing::debug!(job_id = %job.internal_id, "prove_block function completed successfully");

let fact_info = get_fact_info(&cairo_pie, None)?;
let program_output = fact_info.program_output;
tracing::debug!(job_id = %job.internal_id, "Fact info calculated successfully");

tracing::debug!(job_id = %job.internal_id, "Storing SNOS outputs");
self.store(config.storage(), &job.internal_id, block_number, cairo_pie, snos_output, program_output).await?;
let (cairo_pie_path, snos_output_path, program_output_path) = self
.store(config.storage(), &job.internal_id, block_number, cairo_pie, snos_output, program_output)
.await?;

job.metadata.insert(JOB_METADATA_CAIRO_PIE_PATH.into(), cairo_pie_path);
job.metadata.insert(JOB_METADATA_SNOS_OUTPUT_PATH.into(), snos_output_path);
job.metadata.insert(JOB_METADATA_PROGRAM_OUTPUT_PATH.into(), program_output_path);
job.metadata.insert(JOB_METADATA_SNOS_FACT.into(), fact_info.fact.to_string());
tracing::info!(log_type = "completed", category = "snos", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "SNOS job processed successfully.");

Expand Down Expand Up @@ -178,7 +200,7 @@ impl SnosJob {
cairo_pie: CairoPie,
snos_output: StarknetOsOutput,
program_output: Vec<Felt252>,
) -> Result<(), SnosError> {
) -> Result<(String, String, String), SnosError> {
let cairo_pie_key = format!("{block_number}/{CAIRO_PIE_FILE_NAME}");
let cairo_pie_zip_bytes = self.cairo_pie_to_zip_bytes(cairo_pie).await.map_err(|e| {
SnosError::CairoPieUnserializable { internal_id: internal_id.to_string(), message: e.to_string() }
Expand All @@ -196,17 +218,17 @@ impl SnosJob {
SnosError::SnosOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() }
})?;

let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will come from metadata now?

let program_output: Vec<[u8; 32]> = program_output.iter().map(|f| f.to_bytes_be()).collect();
let encoded_data = bincode::serialize(&program_output).map_err(|e| SnosError::ProgramOutputUnserializable {
internal_id: internal_id.to_string(),
message: e.to_string(),
})?;
let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}");
data_storage.put_data(encoded_data.into(), &program_output_key).await.map_err(|e| {
SnosError::ProgramOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() }
})?;

Ok(())
Ok((cairo_pie_key, snos_output_key, program_output_key))
}

/// Converts the [CairoPie] input as a zip file and returns it as [Bytes].
Expand Down
63 changes: 46 additions & 17 deletions crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::constants::{
};
use super::{JobError, OtherError};
use crate::config::Config;
use crate::constants::{PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME};
use crate::constants::{JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_OUTPUT_PATH};
use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY;
use crate::jobs::state_update_job::utils::fetch_blob_data_for_block;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
Expand Down Expand Up @@ -128,12 +128,12 @@ impl Job for StateUpdateJob {

let mut nonce = config.settlement_client().get_nonce().await.map_err(|e| JobError::Other(OtherError(e)))?;
let mut sent_tx_hashes: Vec<String> = Vec::with_capacity(block_numbers.len());
for block_no in block_numbers.iter() {
for (i, block_no) in block_numbers.iter().enumerate() {
tracing::debug!(job_id = %job.internal_id, block_no = %block_no, "Processing block");

let snos = self.fetch_snos_for_block(*block_no, config.clone()).await?;
let snos = self.fetch_snos_for_block(i, config.clone(), job).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get the snos path here itsefl

let txn_hash = self
.update_state_for_block(config.clone(), *block_no, snos, nonce)
.update_state_for_block(config.clone(), *block_no, i, snos, nonce, job)
.await
.map_err(|e| {
tracing::error!(job_id = %job.internal_id, block_no = %block_no, error = %e, "Error updating state for block");
Expand Down Expand Up @@ -309,22 +309,21 @@ impl StateUpdateJob {
&self,
config: Arc<Config>,
block_no: u64,
block_index: usize,
snos: StarknetOsOutput,
nonce: u64,
job: &JobItem,
) -> Result<String, JobError> {
let settlement_client = config.settlement_client();
let last_tx_hash_executed = if snos.use_kzg_da == Felt252::ZERO {
unimplemented!("update_state_for_block not implemented as of now for calldata DA.")
} else if snos.use_kzg_da == Felt252::ONE {
let blob_data = fetch_blob_data_for_block(block_no, config.clone())
let blob_data = fetch_blob_data_for_block(block_index, config.clone(), job)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await?;
let program_output = self.fetch_program_output_for_block(block_index, config.clone(), job).await?;

// TODO :
// Fetching nonce before the transaction is run
// Sending update_state transaction from the settlement client
settlement_client
.update_state_with_blobs(program_output, blob_data, nonce)
.await
Expand All @@ -336,29 +335,59 @@ impl StateUpdateJob {
}

/// Retrieves the SNOS output for the corresponding block.
async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> Result<StarknetOsOutput, JobError> {
async fn fetch_snos_for_block(
&self,
block_index: usize,
config: Arc<Config>,
job: &JobItem,
) -> Result<StarknetOsOutput, JobError> {
let storage_client = config.storage();
let key = block_no.to_string() + "/" + SNOS_OUTPUT_FILE_NAME;

let snos_output_bytes = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;
// Get the array of SNOS paths from metadata
let snos_paths: Vec<String> = serde_json::from_str(
job.metadata
.get(JOB_METADATA_SNOS_OUTPUT_PATH)
.ok_or_else(|| JobError::Other(OtherError(eyre!("SNOS output paths not found in metadata"))))?,
)
.map_err(|e| JobError::Other(OtherError(eyre!("Failed to parse SNOS paths from metadata: {}", e))))?;

// Get the path for this block
let path = snos_paths.get(block_index).ok_or_else(|| {
JobError::Other(OtherError(eyre!("SNOS output path not found for index {}", block_index)))
})?;

let snos_output_bytes = storage_client.get_data(path).await.map_err(|e| JobError::Other(OtherError(e)))?;

serde_json::from_slice(snos_output_bytes.iter().as_slice()).map_err(|e| {
JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output for block {}: {}", block_no, e)))
JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output from path {}: {}", path, e)))
})
}

async fn fetch_program_output_for_block(
&self,
block_number: u64,
block_index: usize,
config: Arc<Config>,
job: &JobItem,
) -> Result<Vec<[u8; 32]>, JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME;

let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;
// Get the array of program output paths from metadata
let program_paths: Vec<String> = serde_json::from_str(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. pass path directly
  2. as discussed, should we use config structs for metadata fields

job.metadata
.get(JOB_METADATA_PROGRAM_OUTPUT_PATH)
.ok_or_else(|| JobError::Other(OtherError(eyre!("Program output paths not found in metadata"))))?,
)
.map_err(|e| JobError::Other(OtherError(eyre!("Failed to parse program paths from metadata: {}", e))))?;

// Get the path for this block
let path = program_paths.get(block_index).ok_or_else(|| {
JobError::Other(OtherError(eyre!("Program output path not found for index {}", block_index)))
})?;

let program_output = storage_client.get_data(path).await.map_err(|e| JobError::Other(OtherError(e)))?;

bincode::deserialize(&program_output).map_err(|e| {
JobError::Other(OtherError(eyre!("Failed to deserialize program output for block {}: {}", block_number, e)))
JobError::Other(OtherError(eyre!("Failed to deserialize program output from path {}: {}", path, e)))
})
}

Expand Down
23 changes: 19 additions & 4 deletions crates/orchestrator/src/jobs/state_update_job/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,30 @@ use std::sync::Arc;
use alloy::primitives::U256;
use color_eyre::eyre::eyre;
use num_bigint::BigUint;
use serde_json;

use crate::config::Config;
use crate::constants::{BLOB_DATA_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME};
use crate::constants::{JOB_METADATA_BLOB_DATA_PATH, PROGRAM_OUTPUT_FILE_NAME};
use crate::jobs::types::JobItem;

/// Fetching the blob data (stored in remote storage during DA job) for a particular block
pub async fn fetch_blob_data_for_block(block_number: u64, config: Arc<Config>) -> color_eyre::Result<Vec<Vec<u8>>> {
pub async fn fetch_blob_data_for_block(
block_index: usize,
config: Arc<Config>,
job: &JobItem,
) -> color_eyre::Result<Vec<Vec<u8>>> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;
let blob_data = storage_client.get_data(&key).await?;

// Get the array of blob paths from metadata
let blob_paths: Vec<String> = serde_json::from_str(
job.metadata.get(JOB_METADATA_BLOB_DATA_PATH).ok_or_else(|| eyre!("Blob data paths not found in metadata"))?,
)?;

// Get the path for this block
let path =
blob_paths.get(block_index).ok_or_else(|| eyre!("Blob data path not found for index {}", block_index))?;

let blob_data = storage_client.get_data(path).await?;
Ok(vec![blob_data.to_vec()])
}

Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/jobs/proving_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn test_create_job() {
#[tokio::test]
async fn test_verify_job(#[from(default_job_item)] mut job_item: JobItem) {
let mut prover_client = MockProverClient::new();
prover_client.expect_get_task_status().times(1).returning(|_, _| Ok(TaskStatus::Succeeded));
prover_client.expect_get_task_status().times(1).returning(|_, _, _| Ok(TaskStatus::Succeeded));

let services = TestConfigBuilder::new().configure_prover_client(prover_client.into()).build().await;

Expand Down
7 changes: 6 additions & 1 deletion crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_trait::async_trait;
use opentelemetry::KeyValue;

use crate::config::Config;
use crate::constants::JOB_METADATA_CROSS_VERIFY;
use crate::jobs::create_job;
use crate::jobs::types::{JobStatus, JobType};
use crate::metrics::ORCHESTRATOR_METRICS;
Expand All @@ -26,8 +27,12 @@ impl Worker for ProvingWorker {
tracing::debug!("Found {} successful SNOS jobs without proving jobs", successful_snos_jobs.len());

for job in successful_snos_jobs {
let mut metadata = job.metadata.clone();
// Set cross-verification to true by default
metadata.insert(JOB_METADATA_CROSS_VERIFY.to_string(), "true".to_string());

tracing::debug!(job_id = %job.internal_id, "Creating proof creation job for SNOS job");
match create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await {
match create_job(JobType::ProofCreation, job.internal_id.to_string(), metadata, config.clone()).await {
Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new proving job"),
Err(e) => {
tracing::warn!(job_id = %job.internal_id, error = %e, "Failed to create new state transition job");
Expand Down
Loading
Loading