diff --git a/crates/orchestrator/src/constants.rs b/crates/orchestrator/src/constants.rs
index 049d3ee3..17a28f5c 100644
--- a/crates/orchestrator/src/constants.rs
+++ b/crates/orchestrator/src/constants.rs
@@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt";
 pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json";
 pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt";
 pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip";
+
+pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path";
+pub const JOB_METADATA_SNOS_OUTPUT_PATH: &str = "snos_output_path";
+pub const JOB_METADATA_PROGRAM_OUTPUT_PATH: &str = "program_output_path";
+pub const JOB_METADATA_CROSS_VERIFY: &str = "cross_verify";
+pub const JOB_METADATA_SNOS_FULL_OUTPUT: &str = "snos_full_output";
+pub const JOB_METADATA_BLOB_DATA_PATH: &str = "blob_data_path";
diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs
index ab71ab6d..7051dbb6 100644
--- a/crates/orchestrator/src/jobs/da_job/mod.rs
+++ b/crates/orchestrator/src/jobs/da_job/mod.rs
@@ -19,7 +19,7 @@ use uuid::Uuid;
 use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
 use super::{Job, JobError, OtherError};
 use crate::config::Config;
-use crate::constants::BLOB_DATA_FILE_NAME;
+use crate::constants::{BLOB_DATA_FILE_NAME, JOB_METADATA_BLOB_DATA_PATH};
 use crate::jobs::state_update_job::utils::biguint_vec_to_u8_vec;
 
 lazy_static! {
@@ -133,7 +133,11 @@ impl Job for DaJob {
         // data transformation on the data
         tracing::trace!(job_id = ?job.id, "Applied FFT transformation");
 
-        store_blob_data(transformed_data.clone(), block_no, config.clone()).await?;
+        let blob_data_path = format!("{}/{}", block_no, BLOB_DATA_FILE_NAME);
+        store_blob_data(transformed_data.clone(), &blob_data_path, config.clone()).await?;
+
+        // Add the path to metadata
+        job.metadata.insert(JOB_METADATA_BLOB_DATA_PATH.to_string(), blob_data_path);
         tracing::debug!(job_id = ?job.id, "Stored blob data");
 
         let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await;
@@ -344,14 +348,16 @@ pub async fn state_update_to_blob_data(
 }
 
 /// To store the blob data using the storage client with path <block_number>/blob_data.txt
-async fn store_blob_data(blob_data: Vec<BigUint>, block_number: u64, config: Arc<Config>) -> Result<(), JobError> {
+async fn store_blob_data(blob_data: Vec<BigUint>, blob_data_path: &str, config: Arc<Config>) -> Result<(), JobError> {
     let storage_client = config.storage();
-    let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;
 
     let blob_data_vec_u8 = biguint_vec_to_u8_vec(blob_data.as_slice());
 
     if !blob_data_vec_u8.is_empty() {
-        storage_client.put_data(blob_data_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?;
+        storage_client
+            .put_data(blob_data_vec_u8.into(), blob_data_path)
+            .await
+            .map_err(|e| JobError::Other(OtherError(e)))?;
     }
 
     Ok(())
diff --git a/crates/orchestrator/src/jobs/proving_job/mod.rs b/crates/orchestrator/src/jobs/proving_job/mod.rs
index a034bb4e..0fd04ba0 100644
--- a/crates/orchestrator/src/jobs/proving_job/mod.rs
+++ b/crates/orchestrator/src/jobs/proving_job/mod.rs
@@ -12,7 +12,7 @@ use uuid::Uuid;
 use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
 use super::{Job, JobError, OtherError};
 use crate::config::Config;
-use crate::constants::CAIRO_PIE_FILE_NAME;
+use crate::constants::{JOB_METADATA_CAIRO_PIE_PATH, JOB_METADATA_CROSS_VERIFY};
 use crate::jobs::constants::JOB_METADATA_SNOS_FACT;
 
 #[derive(Error, Debug, PartialEq)]
@@ -62,12 +62,15 @@ impl Job for ProvingJob {
         let internal_id = job.internal_id.clone();
         tracing::info!(log_type = "starting", category = "proving", function_type = "process_job", job_id = ?job.id,  block_no = %internal_id, "Proving job processing started.");
 
-        // Cairo Pie path in s3 storage client
-        let block_number: String = job.internal_id.to_string();
-        let cairo_pie_path = block_number + "/" + CAIRO_PIE_FILE_NAME;
+        // Replace the manual path construction with metadata lookup
+        let cairo_pie_path = job.metadata.get(JOB_METADATA_CAIRO_PIE_PATH).ok_or_else(|| {
+            tracing::error!(job_id = %job.internal_id, "Cairo PIE path not found in job metadata");
+            ProvingError::CairoPIEWrongPath { internal_id: job.internal_id.clone() }
+        })?;
+
         tracing::debug!(job_id = %job.internal_id, %cairo_pie_path, "Fetching Cairo PIE file");
 
-        let cairo_pie_file = config.storage().get_data(&cairo_pie_path).await.map_err(|e| {
+        let cairo_pie_file = config.storage().get_data(cairo_pie_path).await.map_err(|e| {
             tracing::error!(job_id = %job.internal_id, error = %e, "Failed to fetch Cairo PIE file");
             ProvingError::CairoPIEFileFetchFailed(e.to_string())
         })?;
@@ -111,10 +114,21 @@ impl Job for ProvingJob {
             OtherError(eyre!("Fact not available in job"))
         })?;
 
-        tracing::debug!(job_id = %job.internal_id, %task_id, "Getting task status from prover client");
+        let cross_verify = match job.metadata.get(JOB_METADATA_CROSS_VERIFY) {
+            Some(value) => value == "true",
+            None => {
+                tracing::warn!(
+                    job_id = %job.internal_id,
+                    "Cross verification flag not found in metadata, defaulting to true"
+                );
+                true // Default to true for backward compatibility
+            }
+        };
+
+        tracing::debug!(job_id = %job.internal_id, %task_id, cross_verify, "Getting task status from prover client");
         let task_status = config
             .prover_client()
-            .get_task_status(&task_id, fact)
+            .get_task_status(&task_id, fact, cross_verify)
             .await
             .wrap_err("Prover Client Error".to_string())
             .map_err(|e| {
diff --git a/crates/orchestrator/src/jobs/snos_job/mod.rs b/crates/orchestrator/src/jobs/snos_job/mod.rs
index c82d48c3..1b07d24b 100644
--- a/crates/orchestrator/src/jobs/snos_job/mod.rs
+++ b/crates/orchestrator/src/jobs/snos_job/mod.rs
@@ -18,7 +18,10 @@ use uuid::Uuid;
 use super::constants::{JOB_METADATA_SNOS_BLOCK, JOB_METADATA_SNOS_FACT};
 use super::{JobError, OtherError};
 use crate::config::Config;
-use crate::constants::{CAIRO_PIE_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME};
+use crate::constants::{
+    CAIRO_PIE_FILE_NAME, JOB_METADATA_CAIRO_PIE_PATH, JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_FULL_OUTPUT,
+    JOB_METADATA_SNOS_OUTPUT_PATH, PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME,
+};
 use crate::data_storage::DataStorage;
 use crate::jobs::snos_job::error::FactError;
 use crate::jobs::snos_job::fact_info::get_fact_info;
@@ -106,11 +109,25 @@ impl Job for SnosJob {
         let snos_url = config.snos_config().rpc_for_snos.to_string();
         let snos_url = snos_url.trim_end_matches('/');
         tracing::debug!(job_id = %job.internal_id, "Calling prove_block function");
+
+        let full_output = match job.metadata.get(JOB_METADATA_SNOS_FULL_OUTPUT) {
+            Some(value) => value == "true",
+            None => {
+                tracing::warn!(
+                    job_id = %job.internal_id,
+                    "SNOS full output configuration not found in metadata, defaulting to false"
+                );
+                false
+            }
+        };
+
         let (cairo_pie, snos_output) =
-            prove_block(COMPILED_OS, block_number, snos_url, LayoutName::all_cairo, false).await.map_err(|e| {
-                tracing::error!(job_id = %job.internal_id, error = %e, "SNOS execution failed");
-                SnosError::SnosExecutionError { internal_id: job.internal_id.clone(), message: e.to_string() }
-            })?;
+            prove_block(COMPILED_OS, block_number, snos_url, LayoutName::all_cairo, full_output).await.map_err(
+                |e| {
+                    tracing::error!(job_id = %job.internal_id, error = %e, "SNOS execution failed");
+                    SnosError::SnosExecutionError { internal_id: job.internal_id.clone(), message: e.to_string() }
+                },
+            )?;
         tracing::debug!(job_id = %job.internal_id, "prove_block function completed successfully");
 
         let fact_info = get_fact_info(&cairo_pie, None)?;
@@ -118,8 +135,13 @@ impl Job for SnosJob {
         tracing::debug!(job_id = %job.internal_id, "Fact info calculated successfully");
 
         tracing::debug!(job_id = %job.internal_id, "Storing SNOS outputs");
-        self.store(config.storage(), &job.internal_id, block_number, cairo_pie, snos_output, program_output).await?;
+        let (cairo_pie_path, snos_output_path, program_output_path) = self
+            .store(config.storage(), &job.internal_id, block_number, cairo_pie, snos_output, program_output)
+            .await?;
 
+        job.metadata.insert(JOB_METADATA_CAIRO_PIE_PATH.into(), cairo_pie_path);
+        job.metadata.insert(JOB_METADATA_SNOS_OUTPUT_PATH.into(), snos_output_path);
+        job.metadata.insert(JOB_METADATA_PROGRAM_OUTPUT_PATH.into(), program_output_path);
         job.metadata.insert(JOB_METADATA_SNOS_FACT.into(), fact_info.fact.to_string());
         tracing::info!(log_type = "completed", category = "snos", function_type = "process_job", job_id = ?job.id,  block_no = %internal_id, "SNOS job processed successfully.");
 
@@ -178,7 +200,7 @@ impl SnosJob {
         cairo_pie: CairoPie,
         snos_output: StarknetOsOutput,
         program_output: Vec<Felt252>,
-    ) -> Result<(), SnosError> {
+    ) -> Result<(String, String, String), SnosError> {
         let cairo_pie_key = format!("{block_number}/{CAIRO_PIE_FILE_NAME}");
         let cairo_pie_zip_bytes = self.cairo_pie_to_zip_bytes(cairo_pie).await.map_err(|e| {
             SnosError::CairoPieUnserializable { internal_id: internal_id.to_string(), message: e.to_string() }
@@ -196,17 +218,17 @@ impl SnosJob {
             SnosError::SnosOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() }
         })?;
 
+        let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}");
         let program_output: Vec<[u8; 32]> = program_output.iter().map(|f| f.to_bytes_be()).collect();
         let encoded_data = bincode::serialize(&program_output).map_err(|e| SnosError::ProgramOutputUnserializable {
             internal_id: internal_id.to_string(),
             message: e.to_string(),
         })?;
-        let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}");
         data_storage.put_data(encoded_data.into(), &program_output_key).await.map_err(|e| {
             SnosError::ProgramOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() }
         })?;
 
-        Ok(())
+        Ok((cairo_pie_key, snos_output_key, program_output_key))
     }
 
     /// Converts the [CairoPie] input as a zip file and returns it as [Bytes].
diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs
index 0856670e..59359944 100644
--- a/crates/orchestrator/src/jobs/state_update_job/mod.rs
+++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs
@@ -19,7 +19,7 @@ use super::constants::{
 };
 use super::{JobError, OtherError};
 use crate::config::Config;
-use crate::constants::{PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME};
+use crate::constants::{JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_OUTPUT_PATH};
 use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY;
 use crate::jobs::state_update_job::utils::fetch_blob_data_for_block;
 use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
@@ -128,12 +128,12 @@ impl Job for StateUpdateJob {
 
         let mut nonce = config.settlement_client().get_nonce().await.map_err(|e| JobError::Other(OtherError(e)))?;
         let mut sent_tx_hashes: Vec<String> = Vec::with_capacity(block_numbers.len());
-        for block_no in block_numbers.iter() {
+        for (i, block_no) in block_numbers.iter().enumerate() {
             tracing::debug!(job_id = %job.internal_id, block_no = %block_no, "Processing block");
 
-            let snos = self.fetch_snos_for_block(*block_no, config.clone()).await?;
+            let snos = self.fetch_snos_for_block(i, config.clone(), job).await?;
             let txn_hash = self
-                .update_state_for_block(config.clone(), *block_no, snos, nonce)
+                .update_state_for_block(config.clone(), *block_no, i, snos, nonce, job)
                 .await
                 .map_err(|e| {
                     tracing::error!(job_id = %job.internal_id, block_no = %block_no, error = %e, "Error updating state for block");
@@ -309,22 +309,21 @@ impl StateUpdateJob {
         &self,
         config: Arc<Config>,
         block_no: u64,
+        block_index: usize,
         snos: StarknetOsOutput,
         nonce: u64,
+        job: &JobItem,
     ) -> Result<String, JobError> {
         let settlement_client = config.settlement_client();
         let last_tx_hash_executed = if snos.use_kzg_da == Felt252::ZERO {
             unimplemented!("update_state_for_block not implemented as of now for calldata DA.")
         } else if snos.use_kzg_da == Felt252::ONE {
-            let blob_data = fetch_blob_data_for_block(block_no, config.clone())
+            let blob_data = fetch_blob_data_for_block(block_index, config.clone(), job)
                 .await
                 .map_err(|e| JobError::Other(OtherError(e)))?;
 
-            let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await?;
+            let program_output = self.fetch_program_output_for_block(block_index, config.clone(), job).await?;
 
-            // TODO :
-            // Fetching nonce before the transaction is run
-            // Sending update_state transaction from the settlement client
             settlement_client
                 .update_state_with_blobs(program_output, blob_data, nonce)
                 .await
@@ -336,29 +335,59 @@ impl StateUpdateJob {
     }
 
     /// Retrieves the SNOS output for the corresponding block.
-    async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> Result<StarknetOsOutput, JobError> {
+    async fn fetch_snos_for_block(
+        &self,
+        block_index: usize,
+        config: Arc<Config>,
+        job: &JobItem,
+    ) -> Result<StarknetOsOutput, JobError> {
         let storage_client = config.storage();
-        let key = block_no.to_string() + "/" + SNOS_OUTPUT_FILE_NAME;
 
-        let snos_output_bytes = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;
+        // Get the array of SNOS paths from metadata
+        let snos_paths: Vec<String> = serde_json::from_str(
+            job.metadata
+                .get(JOB_METADATA_SNOS_OUTPUT_PATH)
+                .ok_or_else(|| JobError::Other(OtherError(eyre!("SNOS output paths not found in metadata"))))?,
+        )
+        .map_err(|e| JobError::Other(OtherError(eyre!("Failed to parse SNOS paths from metadata: {}", e))))?;
+
+        // Get the path for this block
+        let path = snos_paths.get(block_index).ok_or_else(|| {
+            JobError::Other(OtherError(eyre!("SNOS output path not found for index {}", block_index)))
+        })?;
+
+        let snos_output_bytes = storage_client.get_data(path).await.map_err(|e| JobError::Other(OtherError(e)))?;
 
         serde_json::from_slice(snos_output_bytes.iter().as_slice()).map_err(|e| {
-            JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output for block {}: {}", block_no, e)))
+            JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output from path {}: {}", path, e)))
         })
     }
 
     async fn fetch_program_output_for_block(
         &self,
-        block_number: u64,
+        block_index: usize,
         config: Arc<Config>,
+        job: &JobItem,
     ) -> Result<Vec<[u8; 32]>, JobError> {
         let storage_client = config.storage();
-        let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME;
 
-        let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;
+        // Get the array of program output paths from metadata
+        let program_paths: Vec<String> = serde_json::from_str(
+            job.metadata
+                .get(JOB_METADATA_PROGRAM_OUTPUT_PATH)
+                .ok_or_else(|| JobError::Other(OtherError(eyre!("Program output paths not found in metadata"))))?,
+        )
+        .map_err(|e| JobError::Other(OtherError(eyre!("Failed to parse program paths from metadata: {}", e))))?;
+
+        // Get the path for this block
+        let path = program_paths.get(block_index).ok_or_else(|| {
+            JobError::Other(OtherError(eyre!("Program output path not found for index {}", block_index)))
+        })?;
+
+        let program_output = storage_client.get_data(path).await.map_err(|e| JobError::Other(OtherError(e)))?;
 
         bincode::deserialize(&program_output).map_err(|e| {
-            JobError::Other(OtherError(eyre!("Failed to deserialize program output for block {}: {}", block_number, e)))
+            JobError::Other(OtherError(eyre!("Failed to deserialize program output from path {}: {}", path, e)))
         })
     }
 
diff --git a/crates/orchestrator/src/jobs/state_update_job/utils.rs b/crates/orchestrator/src/jobs/state_update_job/utils.rs
index a6acfe4e..a22d9e8e 100644
--- a/crates/orchestrator/src/jobs/state_update_job/utils.rs
+++ b/crates/orchestrator/src/jobs/state_update_job/utils.rs
@@ -6,15 +6,30 @@ use std::sync::Arc;
 use alloy::primitives::U256;
 use color_eyre::eyre::eyre;
 use num_bigint::BigUint;
+use serde_json;
 
 use crate::config::Config;
-use crate::constants::{BLOB_DATA_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME};
+use crate::constants::{JOB_METADATA_BLOB_DATA_PATH, PROGRAM_OUTPUT_FILE_NAME};
+use crate::jobs::types::JobItem;
 
 /// Fetching the blob data (stored in remote storage during DA job) for a particular block
-pub async fn fetch_blob_data_for_block(block_number: u64, config: Arc<Config>) -> color_eyre::Result<Vec<Vec<u8>>> {
+pub async fn fetch_blob_data_for_block(
+    block_index: usize,
+    config: Arc<Config>,
+    job: &JobItem,
+) -> color_eyre::Result<Vec<Vec<u8>>> {
     let storage_client = config.storage();
-    let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;
-    let blob_data = storage_client.get_data(&key).await?;
+
+    // Get the array of blob paths from metadata
+    let blob_paths: Vec<String> = serde_json::from_str(
+        job.metadata.get(JOB_METADATA_BLOB_DATA_PATH).ok_or_else(|| eyre!("Blob data paths not found in metadata"))?,
+    )?;
+
+    // Get the path for this block
+    let path =
+        blob_paths.get(block_index).ok_or_else(|| eyre!("Blob data path not found for index {}", block_index))?;
+
+    let blob_data = storage_client.get_data(path).await?;
     Ok(vec![blob_data.to_vec()])
 }
 
diff --git a/crates/orchestrator/src/tests/jobs/proving_job/mod.rs b/crates/orchestrator/src/tests/jobs/proving_job/mod.rs
index 162a0af3..e9b3f8e2 100644
--- a/crates/orchestrator/src/tests/jobs/proving_job/mod.rs
+++ b/crates/orchestrator/src/tests/jobs/proving_job/mod.rs
@@ -45,7 +45,7 @@ async fn test_create_job() {
 #[tokio::test]
 async fn test_verify_job(#[from(default_job_item)] mut job_item: JobItem) {
     let mut prover_client = MockProverClient::new();
-    prover_client.expect_get_task_status().times(1).returning(|_, _| Ok(TaskStatus::Succeeded));
+    prover_client.expect_get_task_status().times(1).returning(|_, _, _| Ok(TaskStatus::Succeeded));
 
     let services = TestConfigBuilder::new().configure_prover_client(prover_client.into()).build().await;
 
diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs
index 66c52dfe..a152abeb 100644
--- a/crates/orchestrator/src/workers/proving.rs
+++ b/crates/orchestrator/src/workers/proving.rs
@@ -4,6 +4,7 @@ use async_trait::async_trait;
 use opentelemetry::KeyValue;
 
 use crate::config::Config;
+use crate::constants::JOB_METADATA_CROSS_VERIFY;
 use crate::jobs::create_job;
 use crate::jobs::types::{JobStatus, JobType};
 use crate::metrics::ORCHESTRATOR_METRICS;
@@ -26,8 +27,12 @@ impl Worker for ProvingWorker {
         tracing::debug!("Found {} successful SNOS jobs without proving jobs", successful_snos_jobs.len());
 
         for job in successful_snos_jobs {
+            let mut metadata = job.metadata.clone();
+            // Set cross-verification to true by default
+            metadata.insert(JOB_METADATA_CROSS_VERIFY.to_string(), "true".to_string());
+
             tracing::debug!(job_id = %job.internal_id, "Creating proof creation job for SNOS job");
-            match create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await {
+            match create_job(JobType::ProofCreation, job.internal_id.to_string(), metadata, config.clone()).await {
                 Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new proving job"),
                 Err(e) => {
                     tracing::warn!(job_id = %job.internal_id, error = %e, "Failed to create new state transition job");
diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs
index 0abaf2a1..a609eb1f 100644
--- a/crates/orchestrator/src/workers/snos.rs
+++ b/crates/orchestrator/src/workers/snos.rs
@@ -8,6 +8,7 @@ use opentelemetry::KeyValue;
 use starknet::providers::Provider;
 
 use crate::config::Config;
+use crate::constants::JOB_METADATA_SNOS_FULL_OUTPUT;
 use crate::jobs::create_job;
 use crate::jobs::types::JobType;
 use crate::metrics::ORCHESTRATOR_METRICS;
@@ -52,7 +53,10 @@ impl Worker for SnosWorker {
         };
 
         for block_num in block_start..latest_block_number + 1 {
-            match create_job(JobType::SnosRun, block_num.to_string(), HashMap::new(), config.clone()).await {
+            let mut metadata = HashMap::new();
+            metadata.insert(JOB_METADATA_SNOS_FULL_OUTPUT.to_string(), "false".to_string());
+
+            match create_job(JobType::SnosRun, block_num.to_string(), metadata, config.clone()).await {
                 Ok(_) => tracing::info!(block_id = %block_num, "Successfully created new Snos job"),
                 Err(e) => {
                     tracing::warn!(block_id = %block_num, error = %e, "Failed to create new Snos job");
diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs
index a6b7da2d..bab09dc2 100644
--- a/crates/orchestrator/src/workers/update_state.rs
+++ b/crates/orchestrator/src/workers/update_state.rs
@@ -2,9 +2,12 @@ use std::collections::HashMap;
 use std::sync::Arc;
 
 use async_trait::async_trait;
+use color_eyre::eyre::eyre;
 use opentelemetry::KeyValue;
+use serde_json;
 
 use crate::config::Config;
+use crate::constants::{JOB_METADATA_BLOB_DATA_PATH, JOB_METADATA_PROGRAM_OUTPUT_PATH, JOB_METADATA_SNOS_OUTPUT_PATH};
 use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY;
 use crate::jobs::create_job;
 use crate::jobs::types::{JobStatus, JobType};
@@ -115,6 +118,45 @@ impl Worker for UpdateStateWorker {
             blocks_to_process.iter().map(|ele| ele.to_string()).collect::<Vec<String>>().join(","),
         );
 
+        // Initialize vectors to store paths
+        let mut snos_paths = Vec::new();
+        let mut program_paths = Vec::new();
+        let mut blob_paths = Vec::new();
+
+        // For each block, get paths from both DA and SNOS jobs
+        for block_number in &blocks_to_process {
+            // Get SNOS job and copy its paths
+            let snos_job = config
+                .database()
+                .get_job_by_internal_id_and_type(&block_number.to_string(), &JobType::SnosRun)
+                .await?
+                .ok_or_else(|| eyre!("SNOS job not found for block {}", block_number))?;
+
+            // Get paths from SNOS job
+            if let Some(snos_path) = snos_job.metadata.get(JOB_METADATA_SNOS_OUTPUT_PATH) {
+                snos_paths.push(snos_path.clone());
+            }
+            if let Some(program_path) = snos_job.metadata.get(JOB_METADATA_PROGRAM_OUTPUT_PATH) {
+                program_paths.push(program_path.clone());
+            }
+
+            // Get DA job and copy blob data path
+            let da_job = config
+                .database()
+                .get_job_by_internal_id_and_type(&block_number.to_string(), &JobType::DataSubmission)
+                .await?
+                .ok_or_else(|| eyre!("DA job not found for block {}", block_number))?;
+
+            if let Some(blob_path) = da_job.metadata.get(JOB_METADATA_BLOB_DATA_PATH) {
+                blob_paths.push(blob_path.clone());
+            }
+        }
+
+        // Store paths as JSON arrays
+        metadata.insert(JOB_METADATA_SNOS_OUTPUT_PATH.to_string(), serde_json::to_string(&snos_paths)?);
+        metadata.insert(JOB_METADATA_PROGRAM_OUTPUT_PATH.to_string(), serde_json::to_string(&program_paths)?);
+        metadata.insert(JOB_METADATA_BLOB_DATA_PATH.to_string(), serde_json::to_string(&blob_paths)?);
+
         // Creating a single job for all the pending blocks.
         let new_job_id = blocks_to_process[0].to_string();
         match create_job(JobType::StateTransition, new_job_id.clone(), metadata, config.clone()).await {
diff --git a/crates/prover-clients/atlantic-service/src/lib.rs b/crates/prover-clients/atlantic-service/src/lib.rs
index d10e11ba..ce24841e 100644
--- a/crates/prover-clients/atlantic-service/src/lib.rs
+++ b/crates/prover-clients/atlantic-service/src/lib.rs
@@ -67,16 +67,27 @@ impl ProverClient for AtlanticProverService {
     }
 
     #[tracing::instrument(skip(self))]
-    async fn get_task_status(&self, job_key: &str, fact: &str) -> Result<TaskStatus, ProverClientError> {
+    async fn get_task_status(
+        &self,
+        job_key: &str,
+        fact: &str,
+        cross_verify: bool,
+    ) -> Result<TaskStatus, ProverClientError> {
         let res = self.atlantic_client.get_job_status(job_key).await?;
         match res.atlantic_query.status {
             AtlanticQueryStatus::InProgress => Ok(TaskStatus::Processing),
             AtlanticQueryStatus::Done => {
                 let fact = B256::from_str(fact).map_err(|e| ProverClientError::FailedToConvertFact(e.to_string()))?;
-                if self.fact_checker.is_valid(&fact).await? {
-                    Ok(TaskStatus::Succeeded)
+                if cross_verify {
+                    tracing::debug!(fact = %hex::encode(fact), "Cross-verifying fact on chain");
+                    if self.fact_checker.is_valid(&fact).await? {
+                        Ok(TaskStatus::Succeeded)
+                    } else {
+                        Ok(TaskStatus::Failed(format!("Fact {} is not valid or not registered", hex::encode(fact))))
+                    }
                 } else {
-                    Ok(TaskStatus::Failed(format!("Fact {} is not valid or not registered", hex::encode(fact))))
+                    tracing::debug!(fact = %hex::encode(fact), "Skipping cross-verification as it's disabled");
+                    Ok(TaskStatus::Succeeded)
                 }
             }
             AtlanticQueryStatus::Failed => {
diff --git a/crates/prover-clients/prover-client-interface/src/lib.rs b/crates/prover-clients/prover-client-interface/src/lib.rs
index cd560c09..743cedb5 100644
--- a/crates/prover-clients/prover-client-interface/src/lib.rs
+++ b/crates/prover-clients/prover-client-interface/src/lib.rs
@@ -16,7 +16,12 @@ use mockall::automock;
 #[async_trait]
 pub trait ProverClient: Send + Sync {
     async fn submit_task(&self, task: Task, proof_layout: LayoutName) -> Result<String, ProverClientError>;
-    async fn get_task_status(&self, task_id: &str, fact: &str) -> Result<TaskStatus, ProverClientError>;
+    async fn get_task_status(
+        &self,
+        task_id: &str,
+        fact: &str,
+        cross_verify: bool,
+    ) -> Result<TaskStatus, ProverClientError>;
 }
 
 pub enum Task {
diff --git a/crates/prover-clients/sharp-service/src/lib.rs b/crates/prover-clients/sharp-service/src/lib.rs
index 6d0228c8..d991b9c5 100644
--- a/crates/prover-clients/sharp-service/src/lib.rs
+++ b/crates/prover-clients/sharp-service/src/lib.rs
@@ -63,7 +63,12 @@ impl ProverClient for SharpProverService {
     }
 
     #[tracing::instrument(skip(self), ret, err)]
-    async fn get_task_status(&self, job_key: &str, fact: &str) -> Result<TaskStatus, ProverClientError> {
+    async fn get_task_status(
+        &self,
+        job_key: &str,
+        fact: &str,
+        _cross_verify: bool,
+    ) -> Result<TaskStatus, ProverClientError> {
         tracing::info!(
             log_type = "starting",
             category = "get_task_status",
diff --git a/crates/prover-clients/sharp-service/tests/lib.rs b/crates/prover-clients/sharp-service/tests/lib.rs
index c59ff733..b180aa64 100644
--- a/crates/prover-clients/sharp-service/tests/lib.rs
+++ b/crates/prover-clients/sharp-service/tests/lib.rs
@@ -87,7 +87,8 @@ async fn prover_client_get_task_status_works(#[case] cairo_job_status: CairoJobS
         then.status(200).body(serde_json::to_vec(&get_task_status_sharp_response(&cairo_job_status)).unwrap());
     });
 
-    let task_status = sharp_service.get_task_status("c31381bf-4739-4667-b5b8-b08af1c6b1c7", TEST_FACT).await.unwrap();
+    let task_status =
+        sharp_service.get_task_status("c31381bf-4739-4667-b5b8-b08af1c6b1c7", TEST_FACT, false).await.unwrap();
     assert_eq!(task_status, get_task_status_expectation(&cairo_job_status), "Cairo Job Status assertion failed");
 
     sharp_add_job_call.assert();