Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make backfill behavior more explicit #700

Merged
merged 21 commits into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand Down Expand Up @@ -33,6 +33,9 @@ pub fn setup_account_restoration_processor_config(
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: None,
mode: ProcessorMode::Default,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand Down Expand Up @@ -32,6 +32,9 @@ pub fn setup_acc_txn_processor_config(
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: None,
mode: ProcessorMode::Default,
},
processor_name,
)
Expand Down
5 changes: 4 additions & 1 deletion rust/integration-tests/src/sdk_tests/ans_processor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::{
config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
},
processors::ans_processor::AnsProcessorConfig,
Expand Down Expand Up @@ -44,6 +44,9 @@ pub fn setup_ans_processor_config(
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: None,
mode: ProcessorMode::Default,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand Down Expand Up @@ -32,6 +32,9 @@ pub fn setup_default_processor_config(
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: None,
mode: ProcessorMode::Default,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand Down Expand Up @@ -32,6 +32,9 @@ pub fn setup_events_processor_config(
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: None,
mode: ProcessorMode::Default,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand Down Expand Up @@ -33,6 +33,9 @@ pub fn setup_fa_processor_config(
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: None,
mode: ProcessorMode::Default,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::{
config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
},
processors::objects_processor::ObjectsProcessorConfig,
Expand Down Expand Up @@ -43,6 +43,9 @@ pub fn setup_objects_processor_config(
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: None,
mode: ProcessorMode::Default,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::{
config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
},
processors::stake_processor::StakeProcessorConfig,
Expand Down Expand Up @@ -40,6 +40,9 @@ pub fn setup_stake_processor_config(
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: None,
mode: ProcessorMode::Default,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::{
config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
},
processors::token_v2_processor::TokenV2ProcessorConfig,
Expand Down Expand Up @@ -42,6 +42,9 @@ pub fn setup_token_v2_processor_config(
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: None,
mode: ProcessorMode::Default,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand Down Expand Up @@ -33,6 +33,9 @@ pub fn setup_user_txn_processor_config(
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: None,
mode: ProcessorMode::Default,
},
processor_name,
)
Expand Down
6 changes: 3 additions & 3 deletions rust/processor/parser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ health_check_port: 8084
server_config:
processor_config:
type: default_processor
postgres_connection_string: postgresql://postgres:@localhost:5432/default_processor
indexer_grpc_data_service_address: http://127.0.0.1:50051
auth_token: AUTH_TOKEN
postgres_connection_string: postgresql://postgres:@localhost:5432/default_processor
indexer_grpc_data_service_address: http://127.0.0.1:50051
auth_token: AUTH_TOKEN
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ CREATE TABLE backfill_processor_status (
last_updated TIMESTAMP NOT NULL DEFAULT NOW(),
last_transaction_timestamp TIMESTAMP NULL,
backfill_start_version BIGINT NOT NULL,
backfill_end_version BIGINT NULL,
backfill_end_version BIGINT NOT NULL,
PRIMARY KEY (backfill_alias)
);
2 changes: 1 addition & 1 deletion rust/processor/src/db/postgres/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ diesel::table! {
last_updated -> Timestamp,
last_transaction_timestamp -> Nullable<Timestamp>,
backfill_start_version -> Int8,
backfill_end_version -> Nullable<Int8>,
backfill_end_version -> Int8,
}
}

Expand Down
91 changes: 89 additions & 2 deletions rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,84 @@ use serde::{Deserialize, Serialize};
pub const QUERY_DEFAULT_RETRIES: u32 = 5;
pub const QUERY_DEFAULT_RETRY_DELAY_MS: u64 = 500;

#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
#[serde(deny_unknown_fields)]
pub enum ProcessorMode {
#[serde(rename = "default")]
#[default]
Default,
#[serde(rename = "backfill")]
Backfill,
#[serde(rename = "testing")]
Testing,
}

#[derive(Clone, Debug, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerProcessorConfig {
pub processor_config: ProcessorConfig,
pub transaction_stream_config: TransactionStreamConfig,
pub db_config: DbConfig,
pub backfill_config: Option<BackfillConfig>,
pub bootstrap_config: Option<BootStrapConfig>,
pub testing_config: Option<TestingConfig>,
#[serde(default)]
pub mode: ProcessorMode,
}

impl IndexerProcessorConfig {
fn validate(&self) -> Result<(), String> {
match self.mode {
ProcessorMode::Testing => {
if self.testing_config.is_none() {
return Err("testing_config must be present when mode is 'testing'".to_string());
}
},
ProcessorMode::Backfill => {
if self.backfill_config.is_none() {
return Err(
"backfill_config must be present when mode is 'backfill'".to_string()
);
}
},
ProcessorMode::Default => {},
}
Ok(())
}
}

impl<'de> Deserialize<'de> for IndexerProcessorConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct Inner {
processor_config: ProcessorConfig,
transaction_stream_config: TransactionStreamConfig,
db_config: DbConfig,
backfill_config: Option<BackfillConfig>,
bootstrap_config: Option<BootStrapConfig>,
testing_config: Option<TestingConfig>,
#[serde(default)]
mode: ProcessorMode,
}

let inner = Inner::deserialize(deserializer)?;
let config = IndexerProcessorConfig {
processor_config: inner.processor_config,
transaction_stream_config: inner.transaction_stream_config,
db_config: inner.db_config,
backfill_config: inner.backfill_config,
bootstrap_config: inner.bootstrap_config,
testing_config: inner.testing_config,
mode: inner.mode,
};

config.validate().map_err(serde::de::Error::custom)?;
Ok(config)
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -155,5 +226,21 @@ impl RunnableConfig for IndexerProcessorConfig {
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct BackfillConfig {
pub backfill_alias: String,
pub backfill_id: String,
pub initial_starting_version: u64,
pub ending_version: u64,
pub overwrite_checkpoint: bool,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct BootStrapConfig {
pub initial_starting_version: u64,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct TestingConfig {
pub override_starting_version: u64,
pub ending_version: u64,
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct BackfillProcessorStatus {
pub last_success_version: i64,
pub last_transaction_timestamp: Option<chrono::NaiveDateTime>,
pub backfill_start_version: i64,
pub backfill_end_version: Option<i64>,
pub backfill_end_version: i64,
}

#[derive(AsChangeset, Debug, Queryable)]
Expand All @@ -72,14 +72,16 @@ pub struct BackfillProcessorStatusQuery {
pub last_updated: chrono::NaiveDateTime,
pub last_transaction_timestamp: Option<chrono::NaiveDateTime>,
pub backfill_start_version: i64,
pub backfill_end_version: Option<i64>,
pub backfill_end_version: i64,
}

impl BackfillProcessorStatusQuery {
pub async fn get_by_processor(
backfill_alias: &str,
processor_type: &str,
backfill_id: &str,
conn: &mut DbPoolConnection<'_>,
) -> diesel::QueryResult<Option<Self>> {
let backfill_alias = format!("{}_{}", processor_type, backfill_id);
backfill_processor_status::table
.filter(backfill_processor_status::backfill_alias.eq(backfill_alias))
.first::<Self>(conn)
Expand Down
Loading
Loading