From d33e649e415c354cc2a1e3c49131725552d69ba0 Mon Sep 17 00:00:00 2001 From: man0s <95379755+losman0s@users.noreply.github.com> Date: Fri, 6 Sep 2024 17:23:42 +0800 Subject: [PATCH] Man0s/crossbar legacy indexer (#245) * cli: lut check cmd * indexer: swb pull support * fix: exclude indexer from workspace --- clients/rust/marginfi-cli/src/entrypoint.rs | 12 ++ .../rust/marginfi-cli/src/processor/group.rs | 121 +++++++++++++++ observability/indexer/Cargo.toml | 16 +- .../indexer/src/commands/index_accounts.rs | 12 +- .../src/commands/index_transactions.rs | 17 +-- .../indexer/src/commands/snapshot_accounts.rs | 67 ++++++-- observability/indexer/src/utils/crossbar.rs | 143 ++++++++++++++++++ observability/indexer/src/utils/metrics.rs | 13 +- observability/indexer/src/utils/mod.rs | 2 + observability/indexer/src/utils/snapshot.rs | 34 ++++- observability/indexer/src/utils/swb_pull.rs | 51 +++++++ programs/marginfi/src/state/price.rs | 16 +- 12 files changed, 464 insertions(+), 40 deletions(-) create mode 100644 observability/indexer/src/utils/crossbar.rs create mode 100644 observability/indexer/src/utils/swb_pull.rs diff --git a/clients/rust/marginfi-cli/src/entrypoint.rs b/clients/rust/marginfi-cli/src/entrypoint.rs index 9e7a62b42..3c54a7267 100644 --- a/clients/rust/marginfi-cli/src/entrypoint.rs +++ b/clients/rust/marginfi-cli/src/entrypoint.rs @@ -153,6 +153,10 @@ pub enum GroupCommand { #[clap(short = 't', long)] existing_token_lookup_tables: Vec, }, + CheckLookupTable { + #[clap(short = 't', long)] + existing_token_lookup_tables: Vec, + }, } #[derive(Clone, Copy, Debug, Parser, ArgEnum)] @@ -606,6 +610,14 @@ fn group(subcmd: GroupCommand, global_options: &GlobalOptions) -> Result<()> { processor::handle_bankruptcy_for_accounts(&config, &profile, accounts) } + GroupCommand::CheckLookupTable { + existing_token_lookup_tables, + } => processor::group::process_check_lookup_tables( + &config, + &profile, + existing_token_lookup_tables, + ), + GroupCommand::UpdateLookupTable { existing_token_lookup_tables, } => processor::group::process_update_lookup_tables( diff --git a/clients/rust/marginfi-cli/src/processor/group.rs b/clients/rust/marginfi-cli/src/processor/group.rs index 4a83e6d16..1c6fd1e5a 100644 --- a/clients/rust/marginfi-cli/src/processor/group.rs +++ b/clients/rust/marginfi-cli/src/processor/group.rs @@ -15,6 +15,127 @@ use std::mem::size_of; const CHUNK_SIZE: usize = 22; const KEY_BATCH_SIZE: usize = 20; +pub fn process_check_lookup_tables( + config: &Config, + profile: &Profile, + existing_lookup_tables: Vec, +) -> Result<()> { + let rpc = config.mfi_program.rpc(); + let marginfi_group = profile.marginfi_group.expect("group not set"); + + let mut accounts: Vec = vec![]; + + for chunk in existing_lookup_tables.chunks(CHUNK_SIZE) { + let accounts_2: Vec = rpc + .get_multiple_accounts(chunk)? + .into_iter() + .flatten() + .collect(); + + accounts.extend(accounts_2); + } + + let lookup_tables: Vec = accounts + .iter_mut() + .zip(existing_lookup_tables.iter()) + .map(|(account, address)| { + let lookup_table = AddressLookupTable::deserialize(&account.data).unwrap(); + println!( + "Loaded table {} with {} addresses", + address, + lookup_table.addresses.len() + ); + + if lookup_table.meta.authority != Some(config.authority()) { + println!( + "Lookup table {} has wrong authority {:?}", + address, lookup_table.meta.authority, + ); + } + + lookup_table + }) + .collect(); + + let banks = config + .mfi_program + .accounts::(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 8 + size_of::() + size_of::(), + marginfi_group.to_bytes().to_vec(), + ))])?; + + let _bank_pks = banks.iter().map(|(pk, _)| *pk).collect::>(); + + let oracle_pks = banks + .iter() + .flat_map(|(_, bank)| bank.config.oracle_keys) + .filter(|pk| pk != &Pubkey::default()) + .collect::>(); + + // Dedup the oracle pks. + let _oracle_pks = oracle_pks + .into_iter() + .fold(vec![], |mut acc, pk| { + if !acc.contains(&pk) { + acc.push(pk); + } + acc + }) + .into_iter() + .collect::>(); + + // Join keys + let mut keys = vec![ + config.mfi_program.id(), + marginfi_group, + spl_token::id(), + system_program::id(), + ]; + + for (bank_pk, bank) in banks.iter() { + keys.push(*bank_pk); + keys.push(bank.liquidity_vault); + let (vault_auth, _) = utils::find_bank_vault_authority_pda( + bank_pk, + marginfi::state::marginfi_group::BankVaultType::Liquidity, + &marginfi::ID, + ); + + keys.push(vault_auth); + + keys.extend_from_slice( + &bank + .config + .oracle_keys + .iter() + .filter(|pk| **pk != Pubkey::default()) + .cloned() + .collect::>(), + ); + } + + keys.dedup(); + + // Find missing keys in lookup tables + let missing_keys = keys + .iter() + .filter(|pk| { + let missing = !lookup_tables + .iter() + .any(|lookup_table| lookup_table.addresses.iter().any(|address| &address == pk)); + + println!("Key {} missing: {}", pk, missing); + + missing + }) + .cloned() + .collect::>(); + + println!("Missing {} keys", missing_keys.len()); + + Ok(()) +} + pub fn process_update_lookup_tables( config: &Config, profile: &Profile, diff --git a/observability/indexer/Cargo.toml b/observability/indexer/Cargo.toml index afbdcc036..ad13b5a2c 100644 --- a/observability/indexer/Cargo.toml +++ b/observability/indexer/Cargo.toml @@ -28,14 +28,22 @@ marginfi = { path = "../../programs/marginfi", features = [ ] } gcp-bigquery-client = "0.16.7" -google-cloud-default = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994", features = ["pubsub"] } +google-cloud-default = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994", features = [ + "pubsub", +] } google-cloud-auth = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994" } google-cloud-pubsub = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994" } google-cloud-gax = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994" } -google-cloud-googleapis = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994", features = ["bytes", "pubsub"] } +google-cloud-googleapis = { git = " https://github.com/mrgnlabs/google-cloud-rust.git", rev = "3f651f2d9fd8cca547bb11490d2575d9bf90f994", features = [ + "bytes", + "pubsub", +] } yup-oauth2 = "8.3.0" -yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "a2cd1498ac64baa1017d4a4cdefbf46100215b4c" } -yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "a2cd1498ac64baa1017d4a4cdefbf46100215b4c" } +yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "87e1755b0d7a4e8101cb5feb6f30063aa91f343f" } +yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "87e1755b0d7a4e8101cb5feb6f30063aa91f343f" } +switchboard-on-demand-client = "0.1.7" +switchboard-on-demand = "0.1.7" +hex = "0.4.3" fixed = "1.12.0" fixed-macro = "1.2.0" dotenv = "0.15.0" diff --git a/observability/indexer/src/commands/index_accounts.rs b/observability/indexer/src/commands/index_accounts.rs index c745dea91..729006f26 100644 --- a/observability/indexer/src/commands/index_accounts.rs +++ b/observability/indexer/src/commands/index_accounts.rs @@ -112,11 +112,13 @@ pub async fn index_accounts(config: IndexAccountsConfig) -> Result<()> { async fn listen_to_updates(ctx: Arc) { loop { info!("Connecting geyser client"); - let geyser_client_connection_result = GeyserGrpcClient::connect( - ctx.config.rpc_endpoint.to_string(), - Some(ctx.config.rpc_token.to_string()), - None, - ); + let geyser_client_connection_result = + GeyserGrpcClient::build_from_shared(ctx.config.rpc_endpoint.to_string()) + .unwrap() + .x_token(Some(ctx.config.rpc_token.to_string())) + .unwrap() + .connect() + .await; let mut geyser_client = match geyser_client_connection_result { Ok(geyser_client) => geyser_client, diff --git a/observability/indexer/src/commands/index_transactions.rs b/observability/indexer/src/commands/index_transactions.rs index a5872bf9e..b40b98f90 100644 --- a/observability/indexer/src/commands/index_transactions.rs +++ b/observability/indexer/src/commands/index_transactions.rs @@ -115,16 +115,13 @@ pub async fn index_transactions(config: IndexTransactionsConfig) -> Result<()> { async fn listen_to_updates(ctx: Arc) { loop { info!("Connecting geyser client"); - let geyser_client_connection_result = GeyserGrpcClient::connect_with_timeout( - ctx.config.rpc_endpoint.to_string(), - Some(ctx.config.rpc_token.to_string()), - None, - Some(Duration::from_secs(10)), - Some(Duration::from_secs(10)), - false, - ) - .await; - info!("Connected"); + let geyser_client_connection_result = + GeyserGrpcClient::build_from_shared(ctx.config.rpc_endpoint.to_string()) + .unwrap() + .x_token(Some(ctx.config.rpc_token.to_string())) + .unwrap() + .connect() + .await; let mut geyser_client = match geyser_client_connection_result { Ok(geyser_client) => geyser_client, diff --git a/observability/indexer/src/commands/snapshot_accounts.rs b/observability/indexer/src/commands/snapshot_accounts.rs index cc8eae67f..641bcf7e5 100644 --- a/observability/indexer/src/commands/snapshot_accounts.rs +++ b/observability/indexer/src/commands/snapshot_accounts.rs @@ -1,7 +1,9 @@ use crate::utils::convert_account; +use crate::utils::crossbar::{CrossbarCache, SwbPullFeedMeta}; use crate::utils::metrics::{LendingPoolBankMetrics, MarginfiAccountMetrics, MarginfiGroupMetrics}; -use crate::utils::snapshot::Snapshot; use crate::utils::snapshot::{AccountRoutingType, BankUpdateRoutingType}; +use crate::utils::snapshot::{OracleData, Snapshot}; +use crate::utils::swb_pull::overwrite_price_from_sim; use anyhow::Result; use chrono::{DateTime, Utc}; use envconfig::Envconfig; @@ -112,14 +114,16 @@ pub struct Context { account_updates_queue: Arc>>>, latest_slots_with_commitment: Arc>>, account_snapshot: Arc>, + crossbar_store: Arc, stream_disconnection_count: Arc, update_processing_error_count: Arc, } impl Context { pub async fn new(config: &SnapshotAccountsConfig) -> Self { + let rpc_endpoint = format!("{}/{}", config.rpc_endpoint, config.rpc_token); let rpc_client = Arc::new(RpcClient::new_with_commitment( - format!("{}/{}", config.rpc_endpoint, config.rpc_token), + rpc_endpoint, CommitmentConfig { commitment: solana_sdk::commitment_config::CommitmentLevel::Finalized, }, @@ -132,6 +136,7 @@ impl Context { account_updates_queue: Arc::new(Mutex::new(BTreeMap::new())), latest_slots_with_commitment: Arc::new(Mutex::new(BTreeSet::new())), account_snapshot: Arc::new(Mutex::new(Snapshot::new(config.program_id, rpc_client))), + crossbar_store: Arc::new(CrossbarCache::new()), stream_disconnection_count: Arc::new(AtomicU64::new(0)), update_processing_error_count: Arc::new(AtomicU64::new(0)), } @@ -188,6 +193,26 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> { snapshot.init().await.unwrap(); info!("Summary: {snapshot}"); + let swb_feed_accounts_and_hashes = snapshot + .price_feeds + .iter() + .filter_map(|(pk, od)| match od { + OracleData::SwitchboardPull(feed) => Some((*pk, hex::encode(feed.feed.feed_hash))), + _ => None, + }) + .collect::>(); + + context.crossbar_store.track_feeds( + swb_feed_accounts_and_hashes + .into_iter() + .map(|(feed_address, feed_hash)| SwbPullFeedMeta { + feed_hash, + feed_address, + }) + .collect::>(), + ); + context.crossbar_store.refresh_prices().await; + snapshot .routing_lookup .iter() @@ -207,6 +232,26 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> { let geyser_subscription_config = compute_geyser_config(&config, &non_program_accounts).await; *context.geyser_subscription_config.lock().await = (false, geyser_subscription_config.clone()); + let update_crossbar_cache_handle = tokio::spawn({ + let context = context.clone(); + async move { + loop { + context.crossbar_store.refresh_prices().await; + let mut snapshot = context.account_snapshot.lock().await; + let feeds_per_address: HashMap = + context.crossbar_store.get_prices_per_address(); + for (address, price) in feeds_per_address { + if let Some(od) = snapshot.price_feeds.get_mut(&address) { + if let OracleData::SwitchboardPull(feed) = od { + overwrite_price_from_sim(feed, &price); + } + } + } + tokio::time::sleep(std::time::Duration::from_secs(20)).await; + } + } + }); + let listen_to_updates_handle = tokio::spawn({ let context = context.clone(); async move { listen_to_updates(context).await } @@ -226,6 +271,7 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> { }); join_all([ + update_crossbar_cache_handle, listen_to_updates_handle, process_account_updates_handle, update_account_map_handle, @@ -239,15 +285,14 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> { async fn listen_to_updates(ctx: Arc) { loop { info!("Connecting geyser client"); - let geyser_client_connection_result = GeyserGrpcClient::connect_with_timeout( - ctx.config.rpc_endpoint.to_string(), - Some(ctx.config.rpc_token.to_string()), - None, - Some(Duration::from_secs(10)), - Some(Duration::from_secs(10)), - false, - ) - .await; + let geyser_client_connection_result = + GeyserGrpcClient::build_from_shared(ctx.config.rpc_endpoint.to_string()) + .unwrap() + .x_token(Some(ctx.config.rpc_token.to_string())) + .unwrap() + .connect() + .await; + info!("Connected"); let mut geyser_client = match geyser_client_connection_result { diff --git a/observability/indexer/src/utils/crossbar.rs b/observability/indexer/src/utils/crossbar.rs new file mode 100644 index 000000000..377960d1e --- /dev/null +++ b/observability/indexer/src/utils/crossbar.rs @@ -0,0 +1,143 @@ +use solana_sdk::pubkey::Pubkey; +use std::{collections::HashMap, sync::Mutex}; +use switchboard_on_demand_client::CrossbarClient; + +pub struct SwbPullFeedMeta { + pub feed_address: Pubkey, + pub feed_hash: String, +} + +pub struct SwbPullFeedInfo { + pub feed_meta: SwbPullFeedMeta, + pub simulated_price: SimulatedPrice, +} + +#[derive(Clone, Debug)] +pub struct SimulatedPrice { + pub value: f64, + pub std_dev: f64, + pub timestamp: i64, +} + +pub struct CrossbarCache { + crossbar_client: CrossbarClient, + pub feeds: Mutex>, +} + +impl CrossbarCache { + /// Creates a new CrossbarCache empty instance + pub fn new() -> Self { + let crossbar_client = CrossbarClient::default(None); + Self { + crossbar_client, + feeds: Mutex::new(HashMap::new()), + } + } + + pub fn track_feeds(&self, feeds: Vec) { + for feed in feeds.into_iter() { + self.feeds.lock().unwrap().insert( + feed.feed_hash.clone(), + SwbPullFeedInfo { + feed_meta: feed, + simulated_price: SimulatedPrice { + value: 0.0, + std_dev: 0.0, + timestamp: 0, + }, + }, + ); + } + } + + pub async fn refresh_prices(&self) { + if self.feeds.lock().unwrap().is_empty() { + return; + } + + let feed_hashes = self + .feeds + .lock() + .unwrap() + .values() + .map(|feed| feed.feed_meta.feed_hash.clone()) + .collect::>(); + + let simulated_prices = self + .crossbar_client + .simulate_feeds(&feed_hashes.iter().map(|x| x.as_str()).collect::>()) + .await + .unwrap(); + + let timestamp = chrono::Utc::now().timestamp(); + + let mut feeds = self.feeds.lock().unwrap(); + for simulated_response in simulated_prices { + if let Some(price) = calculate_price(simulated_response.results) { + if let Some(feed) = feeds.get_mut(&simulated_response.feedHash) { + feed.simulated_price = SimulatedPrice { + value: price, + std_dev: 0.0, + timestamp, + }; + } + } + } + } + + pub fn get_prices_per_address(&self) -> HashMap { + let mut feeds_per_address = HashMap::new(); + let feeds = self.feeds.lock().unwrap(); + for feed in feeds.values() { + feeds_per_address.insert(feed.feed_meta.feed_address, feed.simulated_price.clone()); + } + feeds_per_address + } +} + +/// Calculate the median of a list of numbers +fn calculate_price(mut numbers: Vec) -> Option { + if numbers.is_empty() { + return None; + } + + numbers.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let mid = numbers.len() / 2; + + if numbers.len() % 2 == 0 { + Some((numbers[mid - 1] + numbers[mid]) / 2.0) + } else { + Some(numbers[mid]) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use std::sync::Mutex; + + #[tokio::test] + async fn test_crossbar_maintainer_new() { + let price = Arc::new(Mutex::new(0.0)); + let feed_hash = + "0x4c935636f2523f6aeeb6dc7b7dab0e86a13ff2c794f7895fc78851d69fdb593b".to_string(); + let price2 = Arc::new(Mutex::new(0.0)); + let feed_hash2 = + "0x5686ebe26b52d5c67dc10b63240c6d937af75d86bfcacf46865cd5da62f760e9".to_string(); + let crossbar_maintainer = CrossbarCache::new(); + crossbar_maintainer.track_feeds(vec![ + SwbPullFeedMeta { + feed_address: Pubkey::new_unique(), + feed_hash: feed_hash.clone(), + }, + SwbPullFeedMeta { + feed_address: Pubkey::new_unique(), + feed_hash: feed_hash2.clone(), + }, + ]); + crossbar_maintainer.refresh_prices().await; + println!("Price: {:?}", price.lock().unwrap()); + println!("Price2: {:?}", price2.lock().unwrap()); + } +} diff --git a/observability/indexer/src/utils/metrics.rs b/observability/indexer/src/utils/metrics.rs index 5e7babb29..ee0fe4058 100644 --- a/observability/indexer/src/utils/metrics.rs +++ b/observability/indexer/src/utils/metrics.rs @@ -464,15 +464,20 @@ impl MarginfiAccountMetrics { match oracle_data { OracleData::Pyth(price_feed) => ( *oracle_pk, - OraclePriceFeedAdapter::PythEma(price_feed.clone()), + OraclePriceFeedAdapter::PythLegacy(price_feed.clone()), ), OracleData::Switchboard(pf) => ( *oracle_pk, OraclePriceFeedAdapter::SwitchboardV2(pf.clone()), ), - OracleData::PythPush(pf) => { - (*oracle_pk, OraclePriceFeedAdapter::PythPush(pf.clone())) - } + OracleData::PythPush(pf) => ( + *oracle_pk, + OraclePriceFeedAdapter::PythPushOracle(pf.clone()), + ), + OracleData::SwitchboardPull(pf) => ( + *oracle_pk, + OraclePriceFeedAdapter::SwitchboardPull(pf.clone()), + ), } })); diff --git a/observability/indexer/src/utils/mod.rs b/observability/indexer/src/utils/mod.rs index 7f8874e69..a85e1377b 100644 --- a/observability/indexer/src/utils/mod.rs +++ b/observability/indexer/src/utils/mod.rs @@ -1,11 +1,13 @@ use solana_sdk::{account::Account, pubkey::Pubkey}; pub mod big_query; +pub mod crossbar; pub mod errors; pub mod marginfi_account_dup; pub mod metrics; pub mod protos; pub mod snapshot; +pub mod swb_pull; pub mod transactions_crawler; pub fn convert_account( diff --git a/observability/indexer/src/utils/snapshot.rs b/observability/indexer/src/utils/snapshot.rs index 53c0d1036..cd9ba2ccb 100644 --- a/observability/indexer/src/utils/snapshot.rs +++ b/observability/indexer/src/utils/snapshot.rs @@ -1,3 +1,4 @@ +use crate::common::get_multiple_accounts_chunked2; use anchor_client::anchor_lang::AccountDeserialize; use anchor_client::anchor_lang::Discriminator; use fixed::types::I80F48; @@ -24,7 +25,7 @@ use std::{ }; use tracing::info; -use crate::common::get_multiple_accounts_chunked2; +use super::swb_pull::load_swb_pull_account; #[derive(Clone, Debug)] pub struct BankAccounts { @@ -41,6 +42,7 @@ pub enum AccountRoutingType { Bank(Pubkey, BankUpdateRoutingType), PriceFeedPyth, PriceFeedSwitchboard, + PriceFeedSwitchboardPull, PriceFeedPythPushOracle, } @@ -54,9 +56,10 @@ pub enum BankUpdateRoutingType { #[derive(Clone, Debug)] pub enum OracleData { - Pyth(PythEmaPriceFeed), + Pyth(PythLegacyPriceFeed), Switchboard(SwitchboardV2PriceFeed), PythPush(PythPushOraclePriceFeed), + SwitchboardPull(SwitchboardPullPriceFeed), } impl OracleData { @@ -75,6 +78,9 @@ impl OracleData { OracleData::PythPush(price_feed) => price_feed .get_price_of_type(oracle_price_type, bias) .unwrap(), + OracleData::SwitchboardPull(price_feed) => price_feed + .get_price_of_type(oracle_price_type, bias) + .unwrap(), } } } @@ -219,7 +225,7 @@ impl Snapshot { match bank.config.oracle_setup { OracleSetup::None => (), - OracleSetup::PythEma => { + OracleSetup::PythLegacy => { let oracle_address = bank.config.oracle_keys[0]; self.routing_lookup .insert(oracle_address, AccountRoutingType::PriceFeedPyth); @@ -252,6 +258,15 @@ impl Snapshot { mfi_sponsored_oracle_address, AccountRoutingType::PriceFeedPythPushOracle, ); + + accounts_to_fetch.push(pyth_sponsored_oracle_address); + accounts_to_fetch.push(mfi_sponsored_oracle_address); + } + OracleSetup::SwitchboardPull => { + let oracle_address = bank.config.oracle_keys[0]; + self.routing_lookup + .insert(oracle_address, AccountRoutingType::PriceFeedSwitchboardPull); + accounts_to_fetch.push(oracle_address); } } @@ -307,7 +322,7 @@ impl Snapshot { AccountRoutingType::PriceFeedPyth => { let mut account = account.clone(); let ai = (account_pubkey, &mut account).into_account_info(); - let pf = PythEmaPriceFeed::load_checked(&ai, 0, u64::MAX).unwrap(); + let pf = PythLegacyPriceFeed::load_checked(&ai, 0, u64::MAX).unwrap(); self.price_feeds .insert(*account_pubkey, OracleData::Pyth(pf)); } @@ -362,6 +377,17 @@ impl Snapshot { self.price_feeds .insert(feed_id_pk, OracleData::PythPush(pf)); } + AccountRoutingType::PriceFeedSwitchboardPull => { + let mut account = account.clone(); + let ai = (account_pubkey, &mut account).into_account_info(); + let pf = load_swb_pull_account(&ai).unwrap(); + self.price_feeds.insert( + *account_pubkey, + OracleData::SwitchboardPull(SwitchboardPullPriceFeed { + feed: Box::new((&pf).into()), + }), + ); + } } } } diff --git a/observability/indexer/src/utils/swb_pull.rs b/observability/indexer/src/utils/swb_pull.rs new file mode 100644 index 000000000..4dd7733f1 --- /dev/null +++ b/observability/indexer/src/utils/swb_pull.rs @@ -0,0 +1,51 @@ +use fixed::types::I80F48; +use marginfi::constants::EXP_10_I80F48; +use marginfi::state::price::SwitchboardPullPriceFeed; +use solana_sdk::account_info::AccountInfo; +use switchboard_on_demand::PullFeedAccountData; + +use super::crossbar::SimulatedPrice; + +pub fn overwrite_price_from_sim( + current_data: &mut SwitchboardPullPriceFeed, + simulated_price: &SimulatedPrice, +) { + let value: i128 = I80F48::from_num(simulated_price.value) + .checked_mul(EXP_10_I80F48[switchboard_on_demand::PRECISION as usize]) + .unwrap() + .to_num(); + let std_dev: i128 = I80F48::from_num(simulated_price.std_dev) + .checked_mul(EXP_10_I80F48[switchboard_on_demand::PRECISION as usize]) + .unwrap() + .to_num(); + + current_data.feed.result.value = value; + current_data.feed.result.std_dev = std_dev; + // other fields are ignored because not used by the indexer +} + +pub fn load_swb_pull_account(account_info: &AccountInfo) -> anyhow::Result { + let bytes = &account_info.data.borrow().to_vec(); + + if bytes + .as_ptr() + .align_offset(std::mem::align_of::()) + != 0 + { + return Err(anyhow::anyhow!("Invalid alignment")); + } + + let num = bytes.len() / std::mem::size_of::(); + let mut vec: Vec = Vec::with_capacity(num); + + unsafe { + vec.set_len(num); + std::ptr::copy_nonoverlapping( + bytes[8..std::mem::size_of::() + 8].as_ptr(), + vec.as_mut_ptr() as *mut u8, + bytes.len(), + ); + } + + Ok(vec[0]) +} diff --git a/programs/marginfi/src/state/price.rs b/programs/marginfi/src/state/price.rs index 12742cd7b..2fe51ceb3 100644 --- a/programs/marginfi/src/state/price.rs +++ b/programs/marginfi/src/state/price.rs @@ -308,7 +308,7 @@ impl PriceAdapter for PythLegacyPriceFeed { #[cfg_attr(feature = "client", derive(Clone, Debug))] pub struct SwitchboardPullPriceFeed { - feed: Box, + pub feed: Box, } impl SwitchboardPullPriceFeed { @@ -767,14 +767,22 @@ impl PriceAdapter for PythPushOraclePriceFeed { /// A slimmed down version of the PullFeedAccountData struct copied from the /// switchboard-on-demand/src/pull_feed.rs #[cfg_attr(feature = "client", derive(Clone, Debug))] -struct LitePullFeedAccountData { +pub struct LitePullFeedAccountData { pub result: CurrentResult, + #[cfg(feature = "client")] + pub feed_hash: [u8; 32], + #[cfg(feature = "client")] + pub last_update_timestamp: i64, } impl From<&PullFeedAccountData> for LitePullFeedAccountData { fn from(feed: &PullFeedAccountData) -> Self { Self { result: feed.result, + #[cfg(feature = "client")] + feed_hash: feed.feed_hash, + #[cfg(feature = "client")] + last_update_timestamp: feed.last_update_timestamp, } } } @@ -783,6 +791,10 @@ impl From> for LitePullFeedAccountData { fn from(feed: Ref<'_, PullFeedAccountData>) -> Self { Self { result: feed.result, + #[cfg(feature = "client")] + feed_hash: feed.feed_hash, + #[cfg(feature = "client")] + last_update_timestamp: feed.last_update_timestamp, } } }