-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: job isolation done #204
base: main
Are you sure you want to change the base?
Conversation
@@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt"; | |||
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json"; | |||
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt"; | |||
pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip"; | |||
|
|||
pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we've a lot of metadata constants here. should we reuse this?
@@ -133,7 +133,11 @@ impl Job for DaJob { | |||
// data transformation on the data | |||
tracing::trace!(job_id = ?job.id, "Applied FFT transformation"); | |||
|
|||
store_blob_data(transformed_data.clone(), block_no, config.clone()).await?; | |||
let blob_data_path = format!("{}/{}", block_no, BLOB_DATA_FILE_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's change to worker specifying full path as discussed
@@ -196,17 +218,17 @@ impl SnosJob { | |||
SnosError::SnosOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() } | |||
})?; | |||
|
|||
let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will come from metadata now?
|
||
let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?; | ||
// Get the array of program output paths from metadata | ||
let program_paths: Vec<String> = serde_json::from_str( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- pass path directly
- as discussed, should we use config structs for metadata fields
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); | ||
Ok(Some(job)) | ||
// Add debug logging to see the raw document | ||
tracing::info!(raw_document = ?doc, "Raw document from MongoDB"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracing::info!(raw_document = ?doc, "Raw document from MongoDB"); | |
tracing::debug!(raw_document = ?doc, "Raw document from MongoDB"); |
tracing::info!(raw_document = ?doc, "Raw document from MongoDB"); | ||
|
||
// Try to deserialize and log any errors | ||
match mongodb::bson::from_document::<JobItem>(doc.clone()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're cloning here so that we can log in error later? if yes, should we explain that's why we're cloning it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is also types to be precise. maybe we should add this to types.rs OR create a types folder with jobs.rs and metadata.rs?
pub struct ProvingMetadata { | ||
// Required fields | ||
pub block_number: u64, | ||
pub cairo_pie_path: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's required it should not be optional right?
why does proving job care about block_number
?
cross_verify
can be renamed to ensure_on_chain_registration
or something more understandable
let's delete verification_key_path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make an input field which is
enum Input {
Pie()...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can remove snos_fact
and club it with ensure_on_chain_registration
using Optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also add comments to explain what are inputs in metadata and what is added by jobs on the fly
|
||
// State tracking | ||
pub last_failed_block_no: Option<u64>, | ||
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes | |
pub tx_hashes: Vec<String> |
block_numbers = block_numbers.into_iter().filter(|&block| block >= last_failed_block).collect::<Vec<u64>>(); | ||
} | ||
// Filter block numbers if there was a previous failure | ||
let block_numbers = if let JobSpecificMetadata::StateUpdate(state_metadata) = &job.metadata.specific { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can move this part outside
.update_state_for_block(config.clone(), *block_no, snos, nonce) | ||
.await | ||
.map_err(|e| { | ||
let snos = self.fetch_snos_for_block(i, config.clone(), job).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get the snos path here itsefl
.await | ||
.map_err(|e| { | ||
let snos = self.fetch_snos_for_block(i, config.clone(), job).await?; | ||
let txn_hash = match self.update_state_for_block(config.clone(), *block_no, i, snos, nonce, job).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
.unwrap(); | ||
|
||
// Update metadata in a single place | ||
if let JobSpecificMetadata::StateUpdate(ref mut state_metadata) = job.metadata.specific { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here as well
// Fetching the blob data (stored in remote storage during DA job) for a particular block | ||
// pub async fn fetch_program_data_for_block(block_number: u64, config: Arc<Config>, job: &JobItem) | ||
// -> color_eyre::Result<Vec<[u8; 32]>> { let storage_client = config.storage(); | ||
// let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME; | ||
// let blob_data = storage_client.get_data(&key).await?; | ||
// let transformed_blob_vec_u8 = bytes_to_vec_u8(blob_data.as_ref())?; | ||
// Ok(transformed_blob_vec_u8) | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete
let proving_metadata = match &proving_job.metadata.specific { | ||
JobSpecificMetadata::Proving(metadata) => metadata, | ||
_ => { | ||
tracing::error!( | ||
job_id = %proving_job.internal_id, | ||
"Invalid metadata type for proving job" | ||
); | ||
continue; | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can just use internal id here
Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new proving job"), | ||
for snos_job in successful_snos_jobs { | ||
// Extract SNOS metadata | ||
let snos_metadata = match &snos_job.metadata.specific { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can try some TryInto implementation to make it cleaner
@@ -96,26 +94,73 @@ impl Worker for UpdateStateWorker { | |||
} | |||
} | |||
None => { | |||
if blocks_to_process[0] != 0 { | |||
if blocks_to_process[0] != 0 && blocks_to_process[0] != 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to remove this
files and information needed by each job should be provided by the worker of the respective job, no inter-job dependency