diff --git a/src/kitwallet/mod.rs b/src/kitwallet/mod.rs new file mode 100644 index 0000000..5bc8094 --- /dev/null +++ b/src/kitwallet/mod.rs @@ -0,0 +1,113 @@ +use std::{collections::HashMap, num::NonZeroU32, sync::Arc}; + +use anyhow::bail; +use governor::{Quota, RateLimiter}; +use tokio::sync::RwLock; +use tracing::{error, info}; +use tta_rust::RateLim; + +#[derive(Clone)] +pub struct KitWallet { + rate_limiter: Arc>, + client: reqwest::Client, + cache: Arc>>>, +} + +impl Default for KitWallet { + fn default() -> Self { + Self::new() + } +} + +impl KitWallet { + pub fn new() -> Self { + Self { + rate_limiter: Arc::new(RwLock::new(RateLimiter::direct(Quota::per_second( + NonZeroU32::new(4u32).unwrap(), + )))), + client: reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(60)) + .build() + .unwrap(), + cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + // TODO(plg): expire the cache. + pub async fn get_likely_tokens(&self, account: String) -> anyhow::Result> { + let cache_read = self.cache.read().await; + + if let Some(likely_tokens) = cache_read.get(&account) { + return Ok(likely_tokens.clone()); + } + + drop(cache_read); // Release the read lock + + // Now, only here do we apply the rate limiter + self.rate_limiter.read().await.until_ready().await; + + info!( + "Account {} likely tokens not cached, fetching from API", + account + ); + let likely_tokens = self + .client + .get(format!( + "https://api.kitwallet.app/account/{}/likelyTokens", + account + )) + .send() + .await? + .json::>() + .await?; + + // Insert the result into the cache + let mut cache_write = self.cache.write().await; + cache_write.insert(account.clone(), likely_tokens.clone()); + + Ok(likely_tokens) + } + + // get all in parallel + pub async fn get_likely_tokens_for_accounts( + &self, + accounts: Vec, + ) -> anyhow::Result>> { + let mut tasks = Vec::new(); + for account in accounts { + let account = account.clone(); + let self_clone = self.clone(); + tasks.push(tokio::spawn(async move { + let likely_tokens = match self_clone.get_likely_tokens(account.clone()).await { + Ok(likely_tokens) => likely_tokens, + Err(e) => { + error!( + "Error fetching likely tokens for account {}: {}", + account, e + ); + bail!( + "Error fetching likely tokens for account {}: {}", + account, + e + ) + } + }; + anyhow::Ok((account, likely_tokens)) + })); + } + + let mut likely_tokens_for_accounts = HashMap::new(); + for task in tasks { + let (account, likely_tokens) = match task.await? { + Ok(a) => a, + Err(err) => { + error!("Error fetching likely tokens: {}", err); + continue; + } + }; + likely_tokens_for_accounts.insert(account, likely_tokens); + } + + Ok(likely_tokens_for_accounts) + } +} diff --git a/src/lib.rs b/src/lib.rs index eaff4e0..c57d052 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,18 @@ use std::collections::HashSet; use anyhow::Result; +use governor::{clock, state, RateLimiter}; use hyper::{Body, Response}; use serde::Serialize; use sha2::{Digest, Sha256}; +pub type RateLim = RateLimiter< + state::NotKeyed, + state::InMemoryState, + clock::QuantaClock, + governor::middleware::NoOpMiddleware, +>; + // Extract accounts, // returns: account, is lockup, master account pub fn get_accounts_and_lockups(accounts: &str) -> HashSet<(String, Option)> { diff --git a/src/main.rs b/src/main.rs index 01f5306..e60ed64 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ + use csv::Writer; use hyper::Body; +use kitwallet::KitWallet; use near_primitives::types::AccountId; use tower::ServiceBuilder; use tower_http::{ @@ -7,7 +9,7 @@ use tower_http::{ trace::TraceLayer, }; use tracing_loki::url::Url; -use tta::models::ReportRow; +use tta::{models::ReportRow}; use axum::{ extract::{Query, State}, @@ -18,11 +20,11 @@ use axum::{ Json, Router, }; -use chrono::DateTime; +use chrono::{DateTime}; use dotenvy::dotenv; use futures_util::future::join_all; -use near_jsonrpc_client::{JsonRpcClient, NEAR_MAINNET_ARCHIVAL_RPC_URL}; +use near_jsonrpc_client::{JsonRpcClient}; use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPoolOptions; use std::{ @@ -38,6 +40,7 @@ use tta_rust::{get_accounts_and_lockups, results_to_response}; use crate::tta::{ft_metadata::FtService, sql::sql_queries::SqlClient, tta_impl::safe_divide_u128}; +pub mod kitwallet; pub mod lockup; pub mod tta; @@ -105,14 +108,20 @@ fn init_tracing() -> anyhow::Result<()> { async fn router() -> anyhow::Result { let pool = PgPoolOptions::new() - .max_connections(30) + .max_connections(50) .connect(env!("DATABASE_URL")) .await?; let sql_client = SqlClient::new(pool); - let archival_near_client = JsonRpcClient::connect(NEAR_MAINNET_ARCHIVAL_RPC_URL); + // let archival_near_client = JsonRpcClient::connect("http://beta.rpc.mainnet.near.org"); + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(60 * 5)) + .build()?; + let archival_near_client = + JsonRpcClient::with(client).connect("http://beta.rpc.mainnet.near.org"); // let near_client = JsonRpcClient::connect(NEAR_MAINNET_RPC_URL); let ft_service = FtService::new(archival_near_client); + let kitwallet = KitWallet::new(); let semaphore = Arc::new(Semaphore::new(30)); let tta_service = TTA::new(sql_client.clone(), ft_service.clone(), semaphore); @@ -128,7 +137,9 @@ async fn router() -> anyhow::Result { .route("/likelyBlockId", get(get_closest_block_id)) .with_state(sql_client.clone()) .route("/balances", get(get_balances)) - .with_state((sql_client.clone(), ft_service.clone())) + .with_state((sql_client.clone(), ft_service.clone(), kitwallet.clone())) + .route("/balancesfull", post(get_balances_full)) + .with_state((sql_client.clone(), ft_service.clone(), kitwallet)) .route("/staking", get(get_staking_report)) .with_state((sql_client.clone(), ft_service.clone())) .route("/lockup", get(get_lockup_balances)) @@ -242,13 +253,14 @@ struct GetBalancesResultRow { pub end_block_id: u128, pub token_id: String, pub symbol: String, + pub lockup_of: Option, pub start_balance: f64, pub end_balance: f64, } async fn get_balances( Query(params): Query, - State((sql_client, ft_service)): State<(SqlClient, FtService)>, + State((sql_client, ft_service, kitwallet)): State<(SqlClient, FtService, KitWallet)>, ) -> Result, AppError> { let start_date: DateTime = DateTime::parse_from_rfc3339(¶ms.start_date) .unwrap() @@ -263,36 +275,42 @@ async fn get_balances( let end_block_id = sql_client.get_closest_block_id(end_nanos).await?; let accounts = get_accounts_and_lockups(¶ms.accounts); + let mut f = vec![]; + + for (a, b) in accounts.clone() { + f.push(a.clone()); + if let Some(b) = b { + f.push(b.clone()) + }; + } + + kitwallet.get_likely_tokens_for_accounts(f).await?; - let client = reqwest::Client::new(); let mut handles = vec![]; - for (account, _) in accounts { - let client = client.clone(); + for (account, lockup_of) in accounts { let ft_service = ft_service.clone(); let start_block_id = start_block_id; let end_block_id = end_block_id; + let start_date = start_date; + let end_date = end_date; + let kitwallet = kitwallet.clone(); let handle = spawn(async move { - info!("Getting balances for {}", account); + info!( + "Getting balances for {}, dates: start {} end {}", + account, start_date, end_date + ); let mut rows: Vec = vec![]; - let likely_tokens = client - .get(format!( - "https://api.kitwallet.app/account/{account}/likelyTokens" - )) - .send() - .await? - .json::>() - .await?; - info!("Account {} likely tokens: {:?}", account, likely_tokens); - + let likely_tokens = kitwallet.get_likely_tokens(account.clone()).await?; let token_handles: Vec<_> = likely_tokens .iter() .map(|token| { let token = token.clone(); let account = account.clone(); let ft_service = ft_service.clone(); + let lockup_of = lockup_of.clone(); async move { let metadata = match ft_service.assert_ft_metadata(&token).await { Ok(v) => v, @@ -331,6 +349,7 @@ async fn get_balances( end_balance, token_id: token.clone(), symbol: metadata.symbol, + lockup_of, }; Ok(record) } @@ -377,6 +396,7 @@ async fn get_balances( end_balance: end_near_balance.0, token_id: "NEAR".to_string(), symbol: "NEAR".to_string(), + lockup_of, }; rows.push(record); @@ -402,6 +422,180 @@ async fn get_balances( Ok(r) } +#[derive(Debug, Deserialize)] +struct GetBalancesFull { + pub start_date: String, + pub end_date: String, + pub accounts: Vec, +} + +#[derive(Debug, Serialize, Clone)] +struct GetBalancesFullResultRow { + pub account: String, + pub date: String, + pub block_id: u128, + pub token_id: String, + pub symbol: String, + pub lockup_of: Option, + pub balance: f64, +} + +#[tracing::instrument(skip(sql_client, ft_service, kitwallet))] +async fn get_balances_full( + State((sql_client, ft_service, kitwallet)): State<(SqlClient, FtService, KitWallet)>, + Json(params): Json, +) -> Result, AppError> { + let start_date: DateTime = DateTime::parse_from_rfc3339(¶ms.start_date) + .unwrap() + .into(); + let end_date: DateTime = DateTime::parse_from_rfc3339(¶ms.end_date) + .unwrap() + .into(); + let accounts = params.accounts.join(","); + let accounts = get_accounts_and_lockups(accounts.as_str()); + let mut f = vec![]; + + for (a, b) in &accounts { + f.push(a.clone()); + if let Some(b) = b { + f.push(b.clone()) + }; + } + error!("test"); + + let likely_tokens = kitwallet.get_likely_tokens_for_accounts(f).await?; + + // put all days between start and end in all_dates. + let all_dates = { + let mut dates = vec![]; + let mut date = start_date; + while date <= end_date { + dates.push(date); + date += chrono::Duration::days(1); + } + dates + }; + + let block_ids = sql_client + .get_closest_block_ids( + all_dates + .iter() + .map(|d| d.timestamp_nanos() as u128) + .collect(), + ) + .await?; + let mut handles = vec![]; + + for (idx, date) in all_dates.iter().enumerate() { + let date = *date; + let idx = idx; + let block_id = block_ids[idx]; + + for (account, lockup_of) in &accounts { + let ft_service = ft_service.clone(); + let likely_tokens = likely_tokens.get(account).unwrap().clone(); + let account = account.clone(); + let lockup_of = lockup_of.clone(); + + // sleep 1 ms + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + + let handle = spawn(async move { + let mut rows: Vec = vec![]; + + let token_handles: Vec<_> = likely_tokens + .iter() + .map(|token| { + let token = token.clone(); + let account = account.clone(); + let ft_service = ft_service.clone(); + let lockup_of = lockup_of.clone(); + async move { + let metadata = match ft_service.assert_ft_metadata(&token).await { + Ok(v) => v, + Err(e) => { + debug!("{}: {}", account, e); + return Err(e); + } + }; + let balance = match ft_service + .assert_ft_balance(&token, &account, block_id as u64) + .await + { + Ok(v) => v, + Err(e) => { + debug!("{}: {}", account, e); + 0.0 + } + }; + + let record = GetBalancesFullResultRow { + account: account.clone(), + date: date.to_rfc3339(), + token_id: token.clone(), + symbol: metadata.symbol, + lockup_of: lockup_of.clone(), + block_id, + balance, + }; + Ok(record) + } + }) + .collect(); + + let token_results: Vec<_> = join_all(token_handles).await; + for result in token_results { + match result { + Ok(record) => rows.push(record), + Err(e) => { + debug!("Token fetch error: {:?}", e); + } + } + } + + let near_balance = + match ft_service.get_near_balance(&account, block_id as u64).await { + Ok(v) => v.0, + Err(e) => { + error!("{}: {}", account, e); + 0.0 + } + }; + + let record = GetBalancesFullResultRow { + account: account.clone(), + date: date.to_rfc3339(), + block_id, + balance: near_balance, + token_id: "NEAR".to_string(), + symbol: "NEAR".to_string(), + lockup_of: lockup_of.clone(), + }; + rows.push(record); + + anyhow::Ok(rows) + }); + handles.push(handle); + } + } + + let mut rows = vec![]; + join_all(handles).await.iter().for_each(|row| match row { + Ok(result) => match result { + Ok(res) => rows.extend(res.iter().cloned()), + Err(e) => { + error!("{:?}", e) + } + }, + Err(e) => { + warn!("{:?}", e) + } + }); + + let r = results_to_response(rows)?; + Ok(r) +} + #[derive(Debug, Deserialize)] struct DateAndAccounts { pub date: String, diff --git a/src/tta/ft_metadata.rs b/src/tta/ft_metadata.rs index 6eef0d9..8183f5a 100644 --- a/src/tta/ft_metadata.rs +++ b/src/tta/ft_metadata.rs @@ -1,8 +1,12 @@ use anyhow::{bail, Result}; -use governor::{clock, state, Quota, RateLimiter}; +use governor::{Quota, RateLimiter}; use lru::LruCache; -use near_jsonrpc_client::JsonRpcClient; -use near_jsonrpc_primitives::types::query::{QueryResponseKind, RpcQueryRequest, RpcQueryResponse}; +use near_jsonrpc_client::{ + JsonRpcClient, +}; +use near_jsonrpc_primitives::types::query::{ + QueryResponseKind, RpcQueryError, RpcQueryRequest, RpcQueryResponse, +}; use near_primitives::{ types::{ BlockId::Height, @@ -10,7 +14,7 @@ use near_primitives::{ Finality::{self}, FunctionArgs, }, - views::{AccountView, CallResult, QueryRequest}, + views::{CallResult, QueryRequest}, }; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -20,7 +24,8 @@ use std::{ sync::Arc, }; use tokio::{join, sync::RwLock}; -use tracing::{debug, info}; +use tracing::{debug, error}; +use tta_rust::RateLim; use std::hash::{Hash, Hasher}; @@ -62,19 +67,13 @@ pub struct FtMetadata { pub decimals: u8, } -type RateLim = RateLimiter< - state::NotKeyed, - state::InMemoryState, - clock::QuantaClock, - governor::middleware::NoOpMiddleware, ->; - #[derive(Debug, Clone)] pub struct FtService { pub ft_metadata_cache: Arc>>, pub ft_balances_cache: Arc>>, pub near_client: JsonRpcClient, pub archival_rate_limiter: Arc>, + pub likely_tokens: Arc>>>, } impl FtService { @@ -86,8 +85,9 @@ impl FtService { ))), near_client, archival_rate_limiter: Arc::new(RwLock::new(RateLimiter::direct(Quota::per_second( - NonZeroU32::new(5u32).unwrap(), + NonZeroU32::new(5_000_000u32).unwrap(), )))), + likely_tokens: Arc::new(RwLock::new(HashMap::new())), } } @@ -99,7 +99,7 @@ impl FtService { .await .contains_key(ft_token_id) { - self.archival_rate_limiter.write().await.until_ready().await; + // self.archival_rate_limiter.write().await.until_ready().await; let args = json!({}).to_string().into_bytes(); let result = match view_function_call( &self.near_client, @@ -141,6 +141,9 @@ impl FtService { account_id: &String, block_id: u64, ) -> Result { + if token_id == "kusama-airdrop.near" { + return Ok(0.0); + } if self .ft_balances_cache .clone() @@ -164,9 +167,8 @@ impl FtService { } let metadata = self.assert_ft_metadata(token_id).await.unwrap(); - self.archival_rate_limiter.write().await.until_ready().await; + // self.archival_rate_limiter.write().await.until_ready().await; let args = json!({ "account_id": account_id }).to_string().into_bytes(); - info!("Calling ft_balance_of"); let result = match view_function_call( &self.near_client, QueryRequest::CallFunction { @@ -206,8 +208,9 @@ impl FtService { Ok(amount) } + #[tracing::instrument(skip(self))] pub async fn get_near_balance(&self, account_id: &str, block_id: u64) -> Result<(f64, f64)> { - self.archival_rate_limiter.write().await.until_ready().await; + // self.archival_rate_limiter.write().await.until_ready().await; let RpcQueryResponse { kind, .. } = match self .near_client .call(RpcQueryRequest { @@ -220,12 +223,27 @@ impl FtService { { Ok(v) => v, Err(e) => { - bail!("Error calling ViewAccount: {:?}", e); + if let Some(w) = e.handler_error() { + match w { + RpcQueryError::UnknownAccount { .. } => { + if !account_id.ends_with("lockup.near") { + error!("Unknown Account: {:?}", e); // Here's the debug print for UnknownAccount + } + } + _ => { + error!("Error calling ViewAccount: {:?}, block_id: {}", e, block_id); + } + } + } else { + error!("Error calling ViewAccount: {:?}", e); + } + return Ok((0.0, 0.0)); } }; let view = match kind { QueryResponseKind::ViewAccount(view) => view, _ => { + error!("Received unexpected kind: {:?}", kind); bail!("Received unexpected kind: {:?}", kind); } }; @@ -405,6 +423,7 @@ impl FtService { } } +#[tracing::instrument(skip(client))] pub async fn view_function_call( client: &JsonRpcClient, request: QueryRequest, @@ -412,18 +431,35 @@ pub async fn view_function_call( ) -> anyhow::Result> { let RpcQueryResponse { kind, .. } = match client .call(RpcQueryRequest { - block_reference, - request: request.clone(), + block_reference: block_reference.clone(), + request, }) .await { Ok(v) => v, Err(e) => { - bail!( - "Error calling view_function_call: {:?}, request: {:?}", - e, - request - ); + if let Some(w) = e.handler_error() { + match w { + RpcQueryError::UnknownAccount { .. } => { + error!("Unknown Account: {:?}", e); + } + RpcQueryError::NoContractCode { .. } => { + error!("No Contract Code: {:?}", e); + } + RpcQueryError::ContractExecutionError { .. } => { + error!("Contract Execution Error: {:?}", e); + } + _ => { + error!( + "Error calling ViewAccount: {:?}, block_id: {:#?}", + e, block_reference + ); + } + } + } else { + error!("Error calling ViewAccount: {:?}", e); + } + bail!("Error calling ViewAccount: {:?}", e) } }; diff --git a/src/tta/sql/sql_queries.rs b/src/tta/sql/sql_queries.rs index ed04b3b..3106c09 100644 --- a/src/tta/sql/sql_queries.rs +++ b/src/tta/sql/sql_queries.rs @@ -5,7 +5,7 @@ use num_traits::cast::ToPrimitive; use sqlx::{types::Decimal, Pool, Postgres}; use tokio::sync::mpsc::Sender; use tokio_stream::StreamExt; -use tracing::{error, info, instrument}; +use tracing::{debug, error, info, instrument}; use crate::tta::sql::models::BlockId; @@ -351,6 +351,7 @@ impl SqlClient { #[instrument(skip(self))] pub async fn get_closest_block_id(&self, date: u128) -> Result { + debug!("calling DB"); let date_decimal = Decimal::from(date); let block = sqlx::query_as!( @@ -369,4 +370,48 @@ impl SqlClient { Ok(block.block_height.to_u128().unwrap()) } + + #[instrument(skip(self, dates))] + pub async fn get_closest_block_ids(&self, dates: Vec) -> Result> { + debug!("calling DB"); + // Convert dates to decimals + let dates_decimal: Vec = dates.iter().map(|&d| Decimal::from(d)).collect(); + + let result = sqlx::query_as!( + BlockIdWithDate, + r##" + WITH RECURSIVE timestamps_cte(date) AS ( + SELECT unnest($1::numeric[]) AS date + ) + SELECT + ts.date AS "input_date!", + ( + SELECT block_height + FROM blocks + WHERE block_timestamp >= ts.date + ORDER BY block_timestamp ASC + LIMIT 1 + ) AS "block_height!" + FROM timestamps_cte ts + WHERE ts.date = ANY($1::numeric[]) + "##, + &dates_decimal + ) + .fetch_all(&self.pool) + .await?; + + // Extract block_height from result and return + let block_ids: Vec = result + .into_iter() + .map(|r| r.block_height.to_u128().unwrap()) + .collect(); + + Ok(block_ids) + } +} + +#[derive(Debug, sqlx::FromRow)] +struct BlockIdWithDate { + input_date: Decimal, + block_height: Decimal, }