Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: job isolation done #204

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

refactor: job isolation done #204

wants to merge 4 commits into from

Conversation

Mohiiit
Copy link
Contributor

@Mohiiit Mohiiit commented Jan 22, 2025

files and information needed by each job should be provided by the worker of the respective job, no inter-job dependency

@@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt";
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json";
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt";
pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip";

pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've a lot of metadata constants here. should we reuse this?

@@ -133,7 +133,11 @@ impl Job for DaJob {
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

store_blob_data(transformed_data.clone(), block_no, config.clone()).await?;
let blob_data_path = format!("{}/{}", block_no, BLOB_DATA_FILE_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's change to worker specifying full path as discussed

@@ -196,17 +218,17 @@ impl SnosJob {
SnosError::SnosOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() }
})?;

let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will come from metadata now?


let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;
// Get the array of program output paths from metadata
let program_paths: Vec<String> = serde_json::from_str(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. pass path directly
  2. as discussed, should we use config structs for metadata fields

ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(Some(job))
// Add debug logging to see the raw document
tracing::info!(raw_document = ?doc, "Raw document from MongoDB");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::info!(raw_document = ?doc, "Raw document from MongoDB");
tracing::debug!(raw_document = ?doc, "Raw document from MongoDB");

tracing::info!(raw_document = ?doc, "Raw document from MongoDB");

// Try to deserialize and log any errors
match mongodb::bson::from_document::<JobItem>(doc.clone()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're cloning here so that we can log in error later? if yes, should we explain that's why we're cloning it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also types to be precise. maybe we should add this to types.rs OR create a types folder with jobs.rs and metadata.rs?

pub struct ProvingMetadata {
// Required fields
pub block_number: u64,
pub cairo_pie_path: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's required it should not be optional right?
why does proving job care about block_number?
cross_verify can be renamed to ensure_on_chain_registration or something more understandable
let's delete verification_key_path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make an input field which is

enum Input {
Pie()...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove snos_fact and club it with ensure_on_chain_registration using Optional

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add comments to explain what are inputs in metadata and what is added by jobs on the fly


// State tracking
pub last_failed_block_no: Option<u64>,
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes
pub tx_hashes: Vec<String>

block_numbers = block_numbers.into_iter().filter(|&block| block >= last_failed_block).collect::<Vec<u64>>();
}
// Filter block numbers if there was a previous failure
let block_numbers = if let JobSpecificMetadata::StateUpdate(state_metadata) = &job.metadata.specific {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move this part outside

.update_state_for_block(config.clone(), *block_no, snos, nonce)
.await
.map_err(|e| {
let snos = self.fetch_snos_for_block(i, config.clone(), job).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get the snos path here itsefl

.await
.map_err(|e| {
let snos = self.fetch_snos_for_block(i, config.clone(), job).await?;
let txn_hash = match self.update_state_for_block(config.clone(), *block_no, i, snos, nonce, job).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

.unwrap();

// Update metadata in a single place
if let JobSpecificMetadata::StateUpdate(ref mut state_metadata) = job.metadata.specific {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here as well

Comment on lines +37 to +44
// Fetching the blob data (stored in remote storage during DA job) for a particular block
// pub async fn fetch_program_data_for_block(block_number: u64, config: Arc<Config>, job: &JobItem)
// -> color_eyre::Result<Vec<[u8; 32]>> { let storage_client = config.storage();
// let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME;
// let blob_data = storage_client.get_data(&key).await?;
// let transformed_blob_vec_u8 = bytes_to_vec_u8(blob_data.as_ref())?;
// Ok(transformed_blob_vec_u8)
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete

Comment on lines +31 to +40
let proving_metadata = match &proving_job.metadata.specific {
JobSpecificMetadata::Proving(metadata) => metadata,
_ => {
tracing::error!(
job_id = %proving_job.internal_id,
"Invalid metadata type for proving job"
);
continue;
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just use internal id here

Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new proving job"),
for snos_job in successful_snos_jobs {
// Extract SNOS metadata
let snos_metadata = match &snos_job.metadata.specific {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can try some TryInto implementation to make it cleaner

@@ -96,26 +94,73 @@ impl Worker for UpdateStateWorker {
}
}
None => {
if blocks_to_process[0] != 0 {
if blocks_to_process[0] != 0 && blocks_to_process[0] != 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to remove this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants