Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: job isolation done #204

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- added metadata serialization and deserialization
- Added retry job endpoint for failed jobs
- readme: setup instructions added
- Added : Grafana dashboard
Expand Down Expand Up @@ -51,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- refactor: job isolation added, each job will have needed information from it's worker
- verify_job now handles VerificationTimeout status
- refactor: expect removed and added error wraps
- refactor: Readme and .env.example
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ axum-macros = "0.4.1"
bincode = "1.3.3"
bytes = "1.7.2"
color-eyre = "0.6.2"
chrono = "0.4.0"
chrono = { version = "0.4", features = ["serde"] }
c-kzg = "1.0.3"
dotenvy = "0.15.7"
futures = "0.3.30"
Expand Down
7 changes: 7 additions & 0 deletions crates/orchestrator/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt";
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json";
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt";
pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip";

pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've a lot of metadata constants here. should we reuse this?

pub const JOB_METADATA_SNOS_OUTPUT_PATH: &str = "snos_output_path";
pub const JOB_METADATA_PROGRAM_OUTPUT_PATH: &str = "program_output_path";
pub const JOB_METADATA_CROSS_VERIFY: &str = "cross_verify";
pub const JOB_METADATA_SNOS_FULL_OUTPUT: &str = "snos_full_output";
pub const JOB_METADATA_BLOB_DATA_PATH: &str = "blob_data_path";
24 changes: 18 additions & 6 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,26 @@ impl Database for MongoDb {

tracing::debug!(job_type = ?job_type, category = "db_call", "Fetching latest job by type");

// Get the first (and only) result if it exists
match cursor.try_next().await? {
Some(doc) => {
let job: JobItem = mongodb::bson::from_document(doc)?;
let attributes = [KeyValue::new("db_operation_name", "get_latest_job_by_type")];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(Some(job))
// Try to deserialize and log any errors
match mongodb::bson::from_document::<JobItem>(doc.clone()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're cloning here so that we can log in error later? if yes, should we explain that's why we're cloning it?

Ok(job) => {
tracing::debug!(deserialized_job = ?job, "Successfully deserialized job");
let attributes = [KeyValue::new("db_operation_name", "get_latest_job_by_type")];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(Some(job))
}
Err(e) => {
tracing::error!(
error = %e,
document = ?doc,
"Failed to deserialize document into JobItem"
);
Err(eyre!("Failed to deserialize document: {}", e))
}
}
}
None => Ok(None),
}
Expand Down
77 changes: 61 additions & 16 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use chrono::{SubsecRound, Utc};
use color_eyre::eyre::WrapErr;
use color_eyre::eyre::{eyre, WrapErr};
use lazy_static::lazy_static;
use num_bigint::{BigUint, ToBigUint};
use num_traits::{Num, Zero};
Expand All @@ -19,7 +19,7 @@ use uuid::Uuid;
use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;
use crate::jobs::metadata::{JobMetadata, JobSpecificMetadata, DaMetadata};
use crate::jobs::state_update_job::utils::biguint_vec_to_u8_vec;

lazy_static! {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl Job for DaJob {
&self,
_config: Arc<Config>,
internal_id: String,
metadata: HashMap<String, String>,
metadata: JobMetadata,
) -> Result<JobItem, JobError> {
let job_id = Uuid::new_v4();
tracing::info!(log_type = "starting", category = "da", function_type = "create_job", block_no = %internal_id, "DA job creation started.");
Expand All @@ -91,7 +91,23 @@ impl Job for DaJob {
#[tracing::instrument(fields(category = "da"), skip(self, config), ret, err)]
async fn process_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<String, JobError> {
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "da", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "DA job processing started.");
tracing::info!(
log_type = "starting",
category = "da",
function_type = "process_job",
job_id = ?job.id,
block_no = %internal_id,
"DA job processing started."
);

// Get DA-specific metadata
let mut da_metadata: DaMetadata = job.metadata.specific.clone()
.try_into()
.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Invalid metadata type for DA job");
JobError::Other(OtherError(e))
})?;

let block_no = job.internal_id.parse::<u64>().wrap_err("Failed to parse u64".to_string()).map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to parse block number");
JobError::Other(OtherError(e))
Expand All @@ -115,13 +131,14 @@ impl Job for DaJob {
MaybePendingStateUpdate::Update(state_update) => state_update,
};
tracing::debug!(job_id = ?job.id, "Retrieved state update");

// constructing the data from the rpc
let blob_data = state_update_to_blob_data(block_no, state_update, config.clone()).await.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to convert state update to blob data");
JobError::Other(OtherError(e))
})?;

// transforming the data so that we can apply FFT on this.
// @note: we can skip this step if in the above step we return vec<BigUint> directly
let blob_data_biguint = convert_to_biguint(blob_data.clone());
tracing::trace!(job_id = ?job.id, "Converted blob data to BigUint");

Expand All @@ -130,16 +147,26 @@ impl Job for DaJob {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to apply FFT transformation");
JobError::Other(OtherError(e))
})?;
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

store_blob_data(transformed_data.clone(), block_no, config.clone()).await?;
// Get blob data path from metadata
let blob_data_path = da_metadata.blob_data_path.as_ref().ok_or_else(|| {
tracing::error!(job_id = ?job.id, "Blob data path not found in metadata");
JobError::Other(OtherError(eyre!("Blob data path not found in metadata")))
})?;

// Store the transformed data
store_blob_data(transformed_data.clone(), blob_data_path, config.clone()).await?;
tracing::debug!(job_id = ?job.id, "Stored blob data");

let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await;
let max_blob_per_txn = config.da_client().max_blob_per_txn().await;
tracing::trace!(job_id = ?job.id, max_bytes_per_blob = max_bytes_per_blob, max_blob_per_txn = max_blob_per_txn, "Retrieved DA client configuration");
// converting BigUints to Vec<u8>, one Vec<u8> represents one blob data
tracing::trace!(
job_id = ?job.id,
max_bytes_per_blob = max_bytes_per_blob,
max_blob_per_txn = max_blob_per_txn,
"Retrieved DA client configuration"
);

let blob_array = data_to_blobs(max_bytes_per_blob, transformed_data)?;
let current_blob_length: u64 = blob_array
Expand All @@ -152,9 +179,14 @@ impl Job for DaJob {
})?;
tracing::debug!(job_id = ?job.id, blob_count = current_blob_length, "Converted data to blobs");

// there is a limit on number of blobs per txn, checking that here
// Check blob limit
if current_blob_length > max_blob_per_txn {
tracing::warn!(job_id = ?job.id, current_blob_length = current_blob_length, max_blob_per_txn = max_blob_per_txn, "Exceeded maximum number of blobs per transaction");
tracing::warn!(
job_id = ?job.id,
current_blob_length = current_blob_length,
max_blob_per_txn = max_blob_per_txn,
"Exceeded maximum number of blobs per transaction"
);
Err(DaError::MaxBlobsLimitExceeded {
max_blob_per_txn,
current_blob_length,
Expand All @@ -163,13 +195,24 @@ impl Job for DaJob {
})?
}

// making the txn to the DA layer
// Publish to DA layer
let external_id = config.da_client().publish_state_diff(blob_array, &[0; 32]).await.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to publish state diff to DA layer");
JobError::Other(OtherError(e))
})?;

tracing::info!(log_type = "completed", category = "da", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, external_id = ?external_id, "Successfully published state diff to DA layer.");
da_metadata.tx_hash = Some(external_id.clone());
job.metadata.specific = JobSpecificMetadata::Da(da_metadata);

tracing::info!(
log_type = "completed",
category = "da",
function_type = "process_job",
job_id = ?job.id,
block_no = %internal_id,
external_id = ?external_id,
"Successfully published state diff to DA layer."
);
Ok(external_id)
}

Expand Down Expand Up @@ -344,14 +387,16 @@ pub async fn state_update_to_blob_data(
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
async fn store_blob_data(blob_data: Vec<BigUint>, block_number: u64, config: Arc<Config>) -> Result<(), JobError> {
async fn store_blob_data(blob_data: Vec<BigUint>, blob_data_path: &str, config: Arc<Config>) -> Result<(), JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;

let blob_data_vec_u8 = biguint_vec_to_u8_vec(blob_data.as_slice());

if !blob_data_vec_u8.is_empty() {
storage_client.put_data(blob_data_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?;
storage_client
.put_data(blob_data_vec_u8.into(), blob_data_path)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
}

Ok(())
Expand Down
130 changes: 130 additions & 0 deletions crates/orchestrator/src/jobs/metadata.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also types to be precise. maybe we should add this to types.rs OR create a types folder with jobs.rs and metadata.rs?

Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use color_eyre::eyre::eyre;
use color_eyre::eyre;

#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct CommonMetadata {
pub process_attempt_no: u64,
pub process_retry_attempt_no: u64,
pub verification_attempt_no: u64,
pub verification_retry_attempt_no: u64,
#[serde(with = "chrono::serde::ts_seconds_option")]
pub process_completed_at: Option<DateTime<Utc>>,
#[serde(with = "chrono::serde::ts_seconds_option")]
pub verification_completed_at: Option<DateTime<Utc>>,
pub failure_reason: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SnosMetadata {
// Required fields
pub block_number: u64,
pub full_output: bool,

// Optional fields populated during processing
pub cairo_pie_path: Option<String>,
pub snos_output_path: Option<String>,
pub program_output_path: Option<String>,
pub snos_fact: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct StateUpdateMetadata {
// Required fields
pub blocks_to_settle: Vec<u64>,
// Paths for data
pub snos_output_paths: Vec<String>,
pub program_output_paths: Vec<String>,
pub blob_data_paths: Vec<String>,

// State tracking
pub last_failed_block_no: Option<u64>,
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes
pub tx_hashes: Vec<String>

}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ProvingInputType {
Proof(String),
CairoPie(String),
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ProvingMetadata {
// Required fields
pub block_number: u64,
pub input_path: Option<ProvingInputType>,

pub ensure_on_chain_registration: Option<String>,
pub download_proof: Option<String>,

}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DaMetadata {
// Required fields
pub block_number: u64,

// DA specific fields
pub blob_data_path: Option<String>,
pub tx_hash: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type")]
pub enum JobSpecificMetadata {
Snos(SnosMetadata),
StateUpdate(StateUpdateMetadata),
Proving(ProvingMetadata),
Da(DaMetadata),
}

impl TryInto<SnosMetadata> for JobSpecificMetadata {
type Error = eyre::Error;

fn try_into(self) -> Result<SnosMetadata, Self::Error> {
match self {
JobSpecificMetadata::Snos(metadata) => Ok(metadata.clone()),
_ => Err(eyre!("Invalid metadata type: expected SNOS metadata")),
}
}
}

impl TryInto<ProvingMetadata> for JobSpecificMetadata {
type Error = eyre::Error;

fn try_into(self) -> Result<ProvingMetadata, Self::Error> {
match self {
JobSpecificMetadata::Proving(metadata) => Ok(metadata.clone()),
_ => Err(eyre!("Invalid metadata type: expected Proving metadata")),
}
}
}

impl TryInto<DaMetadata> for JobSpecificMetadata {
type Error = eyre::Error;

fn try_into(self) -> Result<DaMetadata, Self::Error> {
match self {
JobSpecificMetadata::Da(metadata) => Ok(metadata.clone()),
_ => Err(eyre!("Invalid metadata type: expected DA metadata")),
}
}
}

impl TryInto<StateUpdateMetadata> for JobSpecificMetadata {
type Error = eyre::Error;

fn try_into(self) -> Result<StateUpdateMetadata, Self::Error> {
match self {
JobSpecificMetadata::StateUpdate(metadata) => Ok(metadata.clone()),
_ => Err(eyre!("Invalid metadata type: expected State Update metadata")),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct JobMetadata {
pub common: CommonMetadata,
pub specific: JobSpecificMetadata,
}
Loading
Loading