-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: job isolation done #204
base: main
Are you sure you want to change the base?
Changes from 1 commit
e703bff
ff84440
decc1ad
d11c246
a35594a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ use uuid::Uuid; | |
use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; | ||
use super::{Job, JobError, OtherError}; | ||
use crate::config::Config; | ||
use crate::constants::BLOB_DATA_FILE_NAME; | ||
use crate::constants::{BLOB_DATA_FILE_NAME, JOB_METADATA_BLOB_DATA_PATH}; | ||
use crate::jobs::state_update_job::utils::biguint_vec_to_u8_vec; | ||
|
||
lazy_static! { | ||
|
@@ -133,7 +133,11 @@ impl Job for DaJob { | |
// data transformation on the data | ||
tracing::trace!(job_id = ?job.id, "Applied FFT transformation"); | ||
|
||
store_blob_data(transformed_data.clone(), block_no, config.clone()).await?; | ||
let blob_data_path = format!("{}/{}", block_no, BLOB_DATA_FILE_NAME); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's change to worker specifying full path as discussed |
||
store_blob_data(transformed_data.clone(), &blob_data_path, config.clone()).await?; | ||
|
||
// Add the path to metadata | ||
job.metadata.insert(JOB_METADATA_BLOB_DATA_PATH.to_string(), blob_data_path); | ||
tracing::debug!(job_id = ?job.id, "Stored blob data"); | ||
|
||
let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await; | ||
|
@@ -344,14 +348,16 @@ pub async fn state_update_to_blob_data( | |
} | ||
|
||
/// To store the blob data using the storage client with path <block_number>/blob_data.txt | ||
async fn store_blob_data(blob_data: Vec<BigUint>, block_number: u64, config: Arc<Config>) -> Result<(), JobError> { | ||
async fn store_blob_data(blob_data: Vec<BigUint>, blob_data_path: &str, config: Arc<Config>) -> Result<(), JobError> { | ||
let storage_client = config.storage(); | ||
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME; | ||
|
||
let blob_data_vec_u8 = biguint_vec_to_u8_vec(blob_data.as_slice()); | ||
|
||
if !blob_data_vec_u8.is_empty() { | ||
storage_client.put_data(blob_data_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?; | ||
storage_client | ||
.put_data(blob_data_vec_u8.into(), blob_data_path) | ||
.await | ||
.map_err(|e| JobError::Other(OtherError(e)))?; | ||
} | ||
|
||
Ok(()) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,7 @@ use uuid::Uuid; | |
use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; | ||
use super::{Job, JobError, OtherError}; | ||
use crate::config::Config; | ||
use crate::constants::CAIRO_PIE_FILE_NAME; | ||
use crate::constants::{JOB_METADATA_CAIRO_PIE_PATH, JOB_METADATA_CROSS_VERIFY}; | ||
use crate::jobs::constants::JOB_METADATA_SNOS_FACT; | ||
|
||
#[derive(Error, Debug, PartialEq)] | ||
|
@@ -62,12 +62,15 @@ impl Job for ProvingJob { | |
let internal_id = job.internal_id.clone(); | ||
tracing::info!(log_type = "starting", category = "proving", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "Proving job processing started."); | ||
|
||
// Cairo Pie path in s3 storage client | ||
let block_number: String = job.internal_id.to_string(); | ||
let cairo_pie_path = block_number + "/" + CAIRO_PIE_FILE_NAME; | ||
// Replace the manual path construction with metadata lookup | ||
let cairo_pie_path = job.metadata.get(JOB_METADATA_CAIRO_PIE_PATH).ok_or_else(|| { | ||
tracing::error!(job_id = %job.internal_id, "Cairo PIE path not found in job metadata"); | ||
ProvingError::CairoPIEWrongPath { internal_id: job.internal_id.clone() } | ||
})?; | ||
|
||
tracing::debug!(job_id = %job.internal_id, %cairo_pie_path, "Fetching Cairo PIE file"); | ||
|
||
let cairo_pie_file = config.storage().get_data(&cairo_pie_path).await.map_err(|e| { | ||
let cairo_pie_file = config.storage().get_data(cairo_pie_path).await.map_err(|e| { | ||
tracing::error!(job_id = %job.internal_id, error = %e, "Failed to fetch Cairo PIE file"); | ||
ProvingError::CairoPIEFileFetchFailed(e.to_string()) | ||
})?; | ||
|
@@ -111,10 +114,21 @@ impl Job for ProvingJob { | |
OtherError(eyre!("Fact not available in job")) | ||
})?; | ||
|
||
tracing::debug!(job_id = %job.internal_id, %task_id, "Getting task status from prover client"); | ||
let cross_verify = match job.metadata.get(JOB_METADATA_CROSS_VERIFY) { | ||
Some(value) => value == "true", | ||
None => { | ||
tracing::warn!( | ||
job_id = %job.internal_id, | ||
"Cross verification flag not found in metadata, defaulting to true" | ||
); | ||
true // Default to true for backward compatibility | ||
} | ||
}; | ||
|
||
tracing::debug!(job_id = %job.internal_id, %task_id, cross_verify, "Getting task status from prover client"); | ||
let task_status = config | ||
.prover_client() | ||
.get_task_status(&task_id, fact) | ||
.get_task_status(&task_id, fact, cross_verify) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cross_verify part should ideally be outside |
||
.await | ||
.wrap_err("Prover Client Error".to_string()) | ||
.map_err(|e| { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,10 @@ use uuid::Uuid; | |
use super::constants::{JOB_METADATA_SNOS_BLOCK, JOB_METADATA_SNOS_FACT}; | ||
use super::{JobError, OtherError}; | ||
use crate::config::Config; | ||
use crate::constants::{CAIRO_PIE_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME}; | ||
use crate::constants::{ | ||
CAIRO_PIE_FILE_NAME, JOB_METADATA_CAIRO_PIE_PATH, JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_FULL_OUTPUT, | ||
JOB_METADATA_SNOS_OUTPUT_PATH, PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME, | ||
}; | ||
use crate::data_storage::DataStorage; | ||
use crate::jobs::snos_job::error::FactError; | ||
use crate::jobs::snos_job::fact_info::get_fact_info; | ||
|
@@ -106,20 +109,39 @@ impl Job for SnosJob { | |
let snos_url = config.snos_config().rpc_for_snos.to_string(); | ||
let snos_url = snos_url.trim_end_matches('/'); | ||
tracing::debug!(job_id = %job.internal_id, "Calling prove_block function"); | ||
|
||
let full_output = match job.metadata.get(JOB_METADATA_SNOS_FULL_OUTPUT) { | ||
Some(value) => value == "true", | ||
None => { | ||
tracing::warn!( | ||
job_id = %job.internal_id, | ||
"SNOS full output configuration not found in metadata, defaulting to false" | ||
); | ||
false | ||
} | ||
}; | ||
|
||
let (cairo_pie, snos_output) = | ||
prove_block(COMPILED_OS, block_number, snos_url, LayoutName::all_cairo, false).await.map_err(|e| { | ||
tracing::error!(job_id = %job.internal_id, error = %e, "SNOS execution failed"); | ||
SnosError::SnosExecutionError { internal_id: job.internal_id.clone(), message: e.to_string() } | ||
})?; | ||
prove_block(COMPILED_OS, block_number, snos_url, LayoutName::all_cairo, full_output).await.map_err( | ||
|e| { | ||
tracing::error!(job_id = %job.internal_id, error = %e, "SNOS execution failed"); | ||
SnosError::SnosExecutionError { internal_id: job.internal_id.clone(), message: e.to_string() } | ||
}, | ||
)?; | ||
tracing::debug!(job_id = %job.internal_id, "prove_block function completed successfully"); | ||
|
||
let fact_info = get_fact_info(&cairo_pie, None)?; | ||
let program_output = fact_info.program_output; | ||
tracing::debug!(job_id = %job.internal_id, "Fact info calculated successfully"); | ||
|
||
tracing::debug!(job_id = %job.internal_id, "Storing SNOS outputs"); | ||
self.store(config.storage(), &job.internal_id, block_number, cairo_pie, snos_output, program_output).await?; | ||
let (cairo_pie_path, snos_output_path, program_output_path) = self | ||
.store(config.storage(), &job.internal_id, block_number, cairo_pie, snos_output, program_output) | ||
.await?; | ||
|
||
job.metadata.insert(JOB_METADATA_CAIRO_PIE_PATH.into(), cairo_pie_path); | ||
job.metadata.insert(JOB_METADATA_SNOS_OUTPUT_PATH.into(), snos_output_path); | ||
job.metadata.insert(JOB_METADATA_PROGRAM_OUTPUT_PATH.into(), program_output_path); | ||
job.metadata.insert(JOB_METADATA_SNOS_FACT.into(), fact_info.fact.to_string()); | ||
tracing::info!(log_type = "completed", category = "snos", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "SNOS job processed successfully."); | ||
|
||
|
@@ -178,7 +200,7 @@ impl SnosJob { | |
cairo_pie: CairoPie, | ||
snos_output: StarknetOsOutput, | ||
program_output: Vec<Felt252>, | ||
) -> Result<(), SnosError> { | ||
) -> Result<(String, String, String), SnosError> { | ||
let cairo_pie_key = format!("{block_number}/{CAIRO_PIE_FILE_NAME}"); | ||
let cairo_pie_zip_bytes = self.cairo_pie_to_zip_bytes(cairo_pie).await.map_err(|e| { | ||
SnosError::CairoPieUnserializable { internal_id: internal_id.to_string(), message: e.to_string() } | ||
|
@@ -196,17 +218,17 @@ impl SnosJob { | |
SnosError::SnosOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() } | ||
})?; | ||
|
||
let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will come from metadata now? |
||
let program_output: Vec<[u8; 32]> = program_output.iter().map(|f| f.to_bytes_be()).collect(); | ||
let encoded_data = bincode::serialize(&program_output).map_err(|e| SnosError::ProgramOutputUnserializable { | ||
internal_id: internal_id.to_string(), | ||
message: e.to_string(), | ||
})?; | ||
let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}"); | ||
data_storage.put_data(encoded_data.into(), &program_output_key).await.map_err(|e| { | ||
SnosError::ProgramOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() } | ||
})?; | ||
|
||
Ok(()) | ||
Ok((cairo_pie_key, snos_output_key, program_output_key)) | ||
} | ||
|
||
/// Converts the [CairoPie] input as a zip file and returns it as [Bytes]. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ use super::constants::{ | |
}; | ||
use super::{JobError, OtherError}; | ||
use crate::config::Config; | ||
use crate::constants::{PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME}; | ||
use crate::constants::{JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_OUTPUT_PATH}; | ||
use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY; | ||
use crate::jobs::state_update_job::utils::fetch_blob_data_for_block; | ||
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; | ||
|
@@ -128,12 +128,12 @@ impl Job for StateUpdateJob { | |
|
||
let mut nonce = config.settlement_client().get_nonce().await.map_err(|e| JobError::Other(OtherError(e)))?; | ||
let mut sent_tx_hashes: Vec<String> = Vec::with_capacity(block_numbers.len()); | ||
for block_no in block_numbers.iter() { | ||
for (i, block_no) in block_numbers.iter().enumerate() { | ||
tracing::debug!(job_id = %job.internal_id, block_no = %block_no, "Processing block"); | ||
|
||
let snos = self.fetch_snos_for_block(*block_no, config.clone()).await?; | ||
let snos = self.fetch_snos_for_block(i, config.clone(), job).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. get the snos path here itsefl |
||
let txn_hash = self | ||
.update_state_for_block(config.clone(), *block_no, snos, nonce) | ||
.update_state_for_block(config.clone(), *block_no, i, snos, nonce, job) | ||
.await | ||
.map_err(|e| { | ||
tracing::error!(job_id = %job.internal_id, block_no = %block_no, error = %e, "Error updating state for block"); | ||
|
@@ -309,22 +309,21 @@ impl StateUpdateJob { | |
&self, | ||
config: Arc<Config>, | ||
block_no: u64, | ||
block_index: usize, | ||
snos: StarknetOsOutput, | ||
nonce: u64, | ||
job: &JobItem, | ||
) -> Result<String, JobError> { | ||
let settlement_client = config.settlement_client(); | ||
let last_tx_hash_executed = if snos.use_kzg_da == Felt252::ZERO { | ||
unimplemented!("update_state_for_block not implemented as of now for calldata DA.") | ||
} else if snos.use_kzg_da == Felt252::ONE { | ||
let blob_data = fetch_blob_data_for_block(block_no, config.clone()) | ||
let blob_data = fetch_blob_data_for_block(block_index, config.clone(), job) | ||
.await | ||
.map_err(|e| JobError::Other(OtherError(e)))?; | ||
|
||
let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await?; | ||
let program_output = self.fetch_program_output_for_block(block_index, config.clone(), job).await?; | ||
|
||
// TODO : | ||
// Fetching nonce before the transaction is run | ||
// Sending update_state transaction from the settlement client | ||
settlement_client | ||
.update_state_with_blobs(program_output, blob_data, nonce) | ||
.await | ||
|
@@ -336,29 +335,59 @@ impl StateUpdateJob { | |
} | ||
|
||
/// Retrieves the SNOS output for the corresponding block. | ||
async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> Result<StarknetOsOutput, JobError> { | ||
async fn fetch_snos_for_block( | ||
&self, | ||
block_index: usize, | ||
config: Arc<Config>, | ||
job: &JobItem, | ||
) -> Result<StarknetOsOutput, JobError> { | ||
let storage_client = config.storage(); | ||
let key = block_no.to_string() + "/" + SNOS_OUTPUT_FILE_NAME; | ||
|
||
let snos_output_bytes = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?; | ||
// Get the array of SNOS paths from metadata | ||
let snos_paths: Vec<String> = serde_json::from_str( | ||
job.metadata | ||
.get(JOB_METADATA_SNOS_OUTPUT_PATH) | ||
.ok_or_else(|| JobError::Other(OtherError(eyre!("SNOS output paths not found in metadata"))))?, | ||
) | ||
.map_err(|e| JobError::Other(OtherError(eyre!("Failed to parse SNOS paths from metadata: {}", e))))?; | ||
|
||
// Get the path for this block | ||
let path = snos_paths.get(block_index).ok_or_else(|| { | ||
JobError::Other(OtherError(eyre!("SNOS output path not found for index {}", block_index))) | ||
})?; | ||
|
||
let snos_output_bytes = storage_client.get_data(path).await.map_err(|e| JobError::Other(OtherError(e)))?; | ||
|
||
serde_json::from_slice(snos_output_bytes.iter().as_slice()).map_err(|e| { | ||
JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output for block {}: {}", block_no, e))) | ||
JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output from path {}: {}", path, e))) | ||
}) | ||
} | ||
|
||
async fn fetch_program_output_for_block( | ||
&self, | ||
block_number: u64, | ||
block_index: usize, | ||
config: Arc<Config>, | ||
job: &JobItem, | ||
) -> Result<Vec<[u8; 32]>, JobError> { | ||
let storage_client = config.storage(); | ||
let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME; | ||
|
||
let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?; | ||
// Get the array of program output paths from metadata | ||
let program_paths: Vec<String> = serde_json::from_str( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
job.metadata | ||
.get(JOB_METADATA_PROGRAM_OUTPUT_PATH) | ||
.ok_or_else(|| JobError::Other(OtherError(eyre!("Program output paths not found in metadata"))))?, | ||
) | ||
.map_err(|e| JobError::Other(OtherError(eyre!("Failed to parse program paths from metadata: {}", e))))?; | ||
|
||
// Get the path for this block | ||
let path = program_paths.get(block_index).ok_or_else(|| { | ||
JobError::Other(OtherError(eyre!("Program output path not found for index {}", block_index))) | ||
})?; | ||
|
||
let program_output = storage_client.get_data(path).await.map_err(|e| JobError::Other(OtherError(e)))?; | ||
|
||
bincode::deserialize(&program_output).map_err(|e| { | ||
JobError::Other(OtherError(eyre!("Failed to deserialize program output for block {}: {}", block_number, e))) | ||
JobError::Other(OtherError(eyre!("Failed to deserialize program output from path {}: {}", path, e))) | ||
}) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we've a lot of metadata constants here. should we reuse this?