Skip to content

Commit

Permalink
Merge pull request #270 from anoma/tiago/token-supplies-other-than-nam
Browse files Browse the repository at this point in the history
index non native token supplies
  • Loading branch information
sug0 authored Feb 10, 2025
2 parents 91fd6d1 + a0dc2af commit 6dee2e5
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 8 deletions.
56 changes: 50 additions & 6 deletions chain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ use chain::services::{
use chrono::{NaiveDateTime, Utc};
use clap::Parser;
use deadpool_diesel::postgres::Object;
use futures::future::FutureExt;
use futures::stream::StreamExt;
use namada_sdk::time::DateTimeUtc;
use orm::migrations::run_migrations;
use repository::pgf as namada_pgf_repository;
use shared::balance::TokenSupply;
use shared::block::Block;
use shared::block_result::BlockResult;
use shared::checksums::Checksums;
Expand Down Expand Up @@ -236,15 +239,29 @@ async fn crawling_fn(
));
let addresses = block.addresses_with_balance_change(&native_token);

let native_token_supplies = first_block_in_epoch
let token_supplies = first_block_in_epoch
.eq(&block_height)
.then(|| {
namada_service::get_token_supplies(&client, &native_token, epoch)
.then_some(async {
let native_fut = namada_service::get_native_token_supply(
&client,
&native_token,
epoch,
)
.map(|result| result.into_rpc_error());

let non_native_fut =
query_non_native_supplies(&client, &conn, epoch);

let (native, mut non_native) =
futures::try_join!(native_fut, non_native_fut)?;

non_native.push(native);

Ok(non_native)
})
.future()
.await
.transpose()
.into_rpc_error()?;
.transpose()?;

let validators_addresses = if first_block_in_epoch.eq(&block_height) {
namada_service::get_all_consensus_validators_addresses_at(
Expand Down Expand Up @@ -424,7 +441,7 @@ async fn crawling_fn(

repository::balance::insert_token_supplies(
transaction_conn,
native_token_supplies,
token_supplies.into_iter().flatten(),
)?;

repository::block::upsert_block(
Expand Down Expand Up @@ -737,3 +754,30 @@ async fn get_block(

Ok((block, tm_block_response, epoch))
}

async fn query_non_native_supplies(
client: &HttpClient,
conn: &Object,
epoch: u32,
) -> Result<Vec<TokenSupply>, MainError> {
let token_addresses = db_service::get_non_native_tokens(conn)
.await
.into_db_error()?;

let mut buffer = Vec::with_capacity(1);

let mut stream = futures::stream::iter(token_addresses)
.map(|address| async move {
namada_service::get_token_supply(client, address, epoch)
.await
.into_rpc_error()
})
.buffer_unordered(32);

while let Some(maybe_supply) = stream.next().await {
let supply = maybe_supply?;
buffer.push(supply);
}

Ok(buffer)
}
20 changes: 19 additions & 1 deletion chain/src/services/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};
use orm::crawler_state::{
ChainCrawlerStateDb, CrawlerNameDb, EpochCrawlerStateDb,
};
use orm::schema::crawler_state;
use orm::schema::{crawler_state, token};
use orm::token::TokenTypeDb;
use shared::block::{BlockHeight, Epoch};
use shared::crawler_state::{ChainCrawlerState, EpochCrawlerState};
use shared::error::ContextDbInteractError;
Expand Down Expand Up @@ -75,3 +76,20 @@ pub async fn get_pos_crawler_state(
timestamp: crawler_state.timestamp.and_utc().timestamp(),
})
}

pub async fn get_non_native_tokens(
conn: &Object,
) -> anyhow::Result<Vec<String>> {
let token_addrs: Vec<String> = conn
.interact(move |conn| {
token::table
.filter(token::dsl::token_type.ne(TokenTypeDb::Native))
.select(token::dsl::address)
.load(conn)
})
.await
.context_db_interact_error()?
.context("Failed to read non-native token addrs from the db")?;

Ok(token_addrs)
}
25 changes: 24 additions & 1 deletion chain/src/services/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ pub async fn get_pgf_receipients(
.collect::<HashSet<_>>()
}

pub async fn get_token_supplies(
pub async fn get_native_token_supply(
client: &HttpClient,
native_token: &Id,
epoch: u32,
Expand All @@ -841,3 +841,26 @@ pub async fn get_token_supplies(
effective: Some(effective_supply.into()),
})
}

pub async fn get_token_supply(
client: &HttpClient,
token: String,
epoch: u32,
) -> anyhow::Result<TokenSupply> {
let address: NamadaSdkAddress =
token.parse().context("Failed to parse token address")?;

let supply = rpc::get_token_total_supply(client, &address)
.await
.map(Amount::from)
.with_context(|| {
format!("Failed to query total supply of token {token}")
})?;

anyhow::Ok(TokenSupply {
address: token,
epoch: epoch as _,
total: supply.into(),
effective: None,
})
}

0 comments on commit 6dee2e5

Please sign in to comment.