-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: job isolation done #204
base: main
Are you sure you want to change the base?
Changes from all commits
e703bff
ff84440
decc1ad
d11c246
a35594a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -216,14 +216,26 @@ impl Database for MongoDb { | |
|
||
tracing::debug!(job_type = ?job_type, category = "db_call", "Fetching latest job by type"); | ||
|
||
// Get the first (and only) result if it exists | ||
match cursor.try_next().await? { | ||
Some(doc) => { | ||
let job: JobItem = mongodb::bson::from_document(doc)?; | ||
let attributes = [KeyValue::new("db_operation_name", "get_latest_job_by_type")]; | ||
let duration = start.elapsed(); | ||
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); | ||
Ok(Some(job)) | ||
// Try to deserialize and log any errors | ||
match mongodb::bson::from_document::<JobItem>(doc.clone()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we're cloning here so that we can log in error later? if yes, should we explain that's why we're cloning it? |
||
Ok(job) => { | ||
tracing::debug!(deserialized_job = ?job, "Successfully deserialized job"); | ||
let attributes = [KeyValue::new("db_operation_name", "get_latest_job_by_type")]; | ||
let duration = start.elapsed(); | ||
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); | ||
Ok(Some(job)) | ||
} | ||
Err(e) => { | ||
tracing::error!( | ||
error = %e, | ||
document = ?doc, | ||
"Failed to deserialize document into JobItem" | ||
); | ||
Err(eyre!("Failed to deserialize document: {}", e)) | ||
} | ||
} | ||
} | ||
None => Ok(None), | ||
} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is also types to be precise. maybe we should add this to types.rs OR create a types folder with jobs.rs and metadata.rs? |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,130 @@ | ||||||
use chrono::{DateTime, Utc}; | ||||||
use serde::{Deserialize, Serialize}; | ||||||
use color_eyre::eyre::eyre; | ||||||
use color_eyre::eyre; | ||||||
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] | ||||||
pub struct CommonMetadata { | ||||||
pub process_attempt_no: u64, | ||||||
pub process_retry_attempt_no: u64, | ||||||
pub verification_attempt_no: u64, | ||||||
pub verification_retry_attempt_no: u64, | ||||||
#[serde(with = "chrono::serde::ts_seconds_option")] | ||||||
pub process_completed_at: Option<DateTime<Utc>>, | ||||||
#[serde(with = "chrono::serde::ts_seconds_option")] | ||||||
pub verification_completed_at: Option<DateTime<Utc>>, | ||||||
pub failure_reason: Option<String>, | ||||||
} | ||||||
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] | ||||||
pub struct SnosMetadata { | ||||||
// Required fields | ||||||
pub block_number: u64, | ||||||
pub full_output: bool, | ||||||
|
||||||
// Optional fields populated during processing | ||||||
pub cairo_pie_path: Option<String>, | ||||||
pub snos_output_path: Option<String>, | ||||||
pub program_output_path: Option<String>, | ||||||
pub snos_fact: Option<String>, | ||||||
} | ||||||
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] | ||||||
pub struct StateUpdateMetadata { | ||||||
// Required fields | ||||||
pub blocks_to_settle: Vec<u64>, | ||||||
// Paths for data | ||||||
pub snos_output_paths: Vec<String>, | ||||||
pub program_output_paths: Vec<String>, | ||||||
pub blob_data_paths: Vec<String>, | ||||||
|
||||||
// State tracking | ||||||
pub last_failed_block_no: Option<u64>, | ||||||
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] | ||||||
pub enum ProvingInputType { | ||||||
Proof(String), | ||||||
CairoPie(String), | ||||||
} | ||||||
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] | ||||||
pub struct ProvingMetadata { | ||||||
// Required fields | ||||||
pub block_number: u64, | ||||||
pub input_path: Option<ProvingInputType>, | ||||||
|
||||||
pub ensure_on_chain_registration: Option<String>, | ||||||
pub download_proof: Option<String>, | ||||||
|
||||||
} | ||||||
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] | ||||||
pub struct DaMetadata { | ||||||
// Required fields | ||||||
pub block_number: u64, | ||||||
|
||||||
// DA specific fields | ||||||
pub blob_data_path: Option<String>, | ||||||
pub tx_hash: Option<String>, | ||||||
} | ||||||
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] | ||||||
#[serde(tag = "type")] | ||||||
pub enum JobSpecificMetadata { | ||||||
Snos(SnosMetadata), | ||||||
StateUpdate(StateUpdateMetadata), | ||||||
Proving(ProvingMetadata), | ||||||
Da(DaMetadata), | ||||||
} | ||||||
|
||||||
impl TryInto<SnosMetadata> for JobSpecificMetadata { | ||||||
type Error = eyre::Error; | ||||||
|
||||||
fn try_into(self) -> Result<SnosMetadata, Self::Error> { | ||||||
match self { | ||||||
JobSpecificMetadata::Snos(metadata) => Ok(metadata.clone()), | ||||||
_ => Err(eyre!("Invalid metadata type: expected SNOS metadata")), | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
impl TryInto<ProvingMetadata> for JobSpecificMetadata { | ||||||
type Error = eyre::Error; | ||||||
|
||||||
fn try_into(self) -> Result<ProvingMetadata, Self::Error> { | ||||||
match self { | ||||||
JobSpecificMetadata::Proving(metadata) => Ok(metadata.clone()), | ||||||
_ => Err(eyre!("Invalid metadata type: expected Proving metadata")), | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
impl TryInto<DaMetadata> for JobSpecificMetadata { | ||||||
type Error = eyre::Error; | ||||||
|
||||||
fn try_into(self) -> Result<DaMetadata, Self::Error> { | ||||||
match self { | ||||||
JobSpecificMetadata::Da(metadata) => Ok(metadata.clone()), | ||||||
_ => Err(eyre!("Invalid metadata type: expected DA metadata")), | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
impl TryInto<StateUpdateMetadata> for JobSpecificMetadata { | ||||||
type Error = eyre::Error; | ||||||
|
||||||
fn try_into(self) -> Result<StateUpdateMetadata, Self::Error> { | ||||||
match self { | ||||||
JobSpecificMetadata::StateUpdate(metadata) => Ok(metadata.clone()), | ||||||
_ => Err(eyre!("Invalid metadata type: expected State Update metadata")), | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] | ||||||
pub struct JobMetadata { | ||||||
pub common: CommonMetadata, | ||||||
pub specific: JobSpecificMetadata, | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we've a lot of metadata constants here. should we reuse this?