Skip to content

Commit

Permalink
Add staking endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
PierreLeGuen committed Oct 17, 2023
1 parent 99a75cc commit 8f46e1f
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 17 deletions.
51 changes: 51 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::collections::HashSet;

use anyhow::Result;
use hyper::{Body, Response};
use serde::Serialize;
use sha2::{Digest, Sha256};

// Extract accounts,
// returns: account, is lockup, master account
pub fn get_accounts_and_lockups(accounts: &str) -> HashSet<(String, Option<String>)> {
let mut accounts: HashSet<(String, Option<String>)> = accounts
.split(',')
.map(String::from)
.filter(|account| account != "near" && account != "system")
.map(|account| (account, None))
.collect();

for a in accounts.clone() {
let lockup_account = get_associated_lockup(&a.0, "near");
accounts.insert((lockup_account, Some(a.0.clone())));
}

accounts
}

// Consolidate results and return a Response
pub fn results_to_response<T: Serialize>(results: Vec<T>) -> Result<Response<Body>, csv::Error> {
let mut wtr = csv::Writer::from_writer(Vec::new());
for row in results {
wtr.serialize(row)?;
}
wtr.flush()?;
Ok(Response::builder()
.header("Content-Type", "text/csv")
.body(Body::from(wtr.into_inner().unwrap()))
.unwrap())
}

pub fn get_associated_lockup(account_id: &str, master_account_id: &str) -> String {
format!(
"{}.lockup.{}",
&sha256(account_id)[0..40],
master_account_id
)
}

fn sha256(value: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(value.as_bytes());
format!("{:x}", hasher.finalize())
}
163 changes: 147 additions & 16 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use tokio::{spawn, sync::Semaphore};
use tracing::*;
use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter, FmtSubscriber};
use tta::tta_impl::TTA;
use tta_rust::{get_accounts_and_lockups, results_to_response};

use crate::tta::{ft_metadata::FtService, sql::sql_queries::SqlClient};

Expand Down Expand Up @@ -125,7 +126,9 @@ async fn router() -> anyhow::Result<Router> {
.route("/likelyBlockId", get(get_closest_block_id))
.with_state(sql_client.clone())
.route("/balances", get(get_balances))
.with_state((sql_client, ft_service.clone()))
.with_state((sql_client.clone(), ft_service.clone()))
.route("/staking", get(get_staking_report))
.with_state((sql_client, ft_service))
.layer(middleware))
}

Expand Down Expand Up @@ -255,17 +258,12 @@ async fn get_balances(
let start_block_id = sql_client.get_closest_block_id(start_nanos).await?;
let end_block_id = sql_client.get_closest_block_id(end_nanos).await?;

let accounts: HashSet<String> = params
.accounts
.split(',')
.map(String::from)
.filter(|account| account != "near" && account != "system")
.collect();
let accounts = get_accounts_and_lockups(&params.accounts);

let client = reqwest::Client::new();
let mut handles = vec![];

for account in accounts {
for (account, _) in accounts {
let client = client.clone();
let ft_service = ft_service.clone();
let start_block_id = start_block_id;
Expand Down Expand Up @@ -396,15 +394,148 @@ async fn get_balances(
}
});

let mut wtr = csv::Writer::from_writer(Vec::new());
for row in rows {
wtr.serialize(row).unwrap();
let r = results_to_response(rows)?;
Ok(r)
}

#[derive(Debug, Deserialize)]
struct GetStaking {
pub date: String,
pub accounts: String,
}

#[derive(Debug, Serialize, Clone)]
struct StakingReportRow {
pub account: String,
pub staking_pool: String,
pub amount_staked: f64,
pub amount_unstaked: f64,
pub ready_for_withdraw: bool,
pub lockup_of: Option<String>,
pub date: String,
pub block_id: u128,
}

#[derive(Debug, Deserialize, Clone)]
struct StakingDeposit {
pub deposit: String,
pub validator_id: String,
}

async fn get_staking_report(
Query(params): Query<GetStaking>,
State((sql_client, ft_service)): State<(SqlClient, FtService)>,
) -> Result<Response<Body>, AppError> {
let date: DateTime<chrono::Utc> = DateTime::parse_from_rfc3339(&params.date).unwrap().into();
let start_nanos = date.timestamp_nanos() as u128;

let block_id = sql_client.get_closest_block_id(start_nanos).await?;

let accounts = get_accounts_and_lockups(&params.accounts);

// todo add support for lockup accounts

let client = reqwest::Client::new();
let mut handles = vec![];

for (account, master_account) in accounts {
let client = client.clone();
let ft_service = ft_service.clone();
let block_id = block_id;

let handle = spawn(async move {
info!("Getting staking for {}", account);
let mut rows: Vec<StakingReportRow> = vec![];

let staking_deposits = client
.get(format!(
"https://api.kitwallet.app/staking-deposits/{account}"
))
.send()
.await?
.json::<Vec<StakingDeposit>>()
.await?;
info!(
"Account {} staking deposits: {:?}",
account, staking_deposits
);

let handles: Vec<_> = staking_deposits
.iter()
.map(|staking_deposit| {
let staking_deposit = staking_deposit.clone();
let account = account.clone();
let ft_service = ft_service.clone();
let master_account = master_account.clone();
async move {
let staking_details = match ft_service
.get_staking_details(
&staking_deposit.validator_id,
&account,
block_id as u64,
)
.await
{
Ok(v) => v,
Err(e) => {
debug!("{}: {}", account, e);
return Err(e);
}
};

if staking_details.0 == 0.0 && staking_details.1 == 0.0 {
return Ok(None);
}

let record = StakingReportRow {
account,
staking_pool: staking_deposit.validator_id.clone(),
amount_staked: staking_details.0,
amount_unstaked: staking_details.1,
ready_for_withdraw: staking_details.2,
lockup_of: master_account,
date: date.to_rfc3339(),
block_id,
};
Ok(Some(record))
}
})
.collect();

let results: Vec<_> = join_all(handles).await;
for result in results {
match result {
Ok(record) => {
if let Some(record) = record {
rows.push(record)
}
}
Err(e) => {
error!("staking error: {:?}", e);
}
}
}

anyhow::Ok(rows)
});
handles.push(handle);
}
wtr.flush()?;
let response = Response::builder()
.header("Content-Type", "text/csv")
.body(Body::from(wtr.into_inner().unwrap()))?;
Ok(response)

let mut rows = vec![];
join_all(handles).await.iter().for_each(|row| match row {
Ok(result) => match result {
Ok(res) => rows.extend(res.iter().cloned()),
Err(e) => {
println!("{:?}", e)
}
},
Err(e) => {
warn!("{:?}", e)
}
});

let r = results_to_response(rows)?;
Ok(r)
}

struct AppError(anyhow::Error);
Expand Down
118 changes: 117 additions & 1 deletion src/tta/ft_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{
num::{NonZeroU32, NonZeroUsize},
sync::Arc,
};
use tokio::sync::RwLock;
use tokio::{join, sync::RwLock};
use tracing::{debug, info};

use std::hash::{Hash, Hasher};
Expand Down Expand Up @@ -233,6 +233,122 @@ impl FtService {
let amount = safe_divide_u128(amount, 24);
Ok(amount)
}

pub async fn get_staking_details(
&self,
staking_pool: &str,
account_id: &str,
block_id: u64,
) -> Result<(f64, f64, bool)> {
let args = json!({ "account_id": account_id }).to_string().into_bytes();

let unstaked_balance_future = self.get_unstaked_balance(staking_pool, &args, block_id);
let staked_balance_future = self.get_staked_balance(staking_pool, &args, block_id);
let unstaked_balance_available_future =
self.is_unstaked_balance_available(staking_pool, &args, block_id);

let (unstaked_balance, staked_balance, unstaked_balance_available) = join!(
unstaked_balance_future,
staked_balance_future,
unstaked_balance_available_future
);

Ok((
safe_divide_u128(staked_balance?, 24),
safe_divide_u128(unstaked_balance?, 24),
unstaked_balance_available?,
))
}

async fn get_unstaked_balance(
&self,
staking_pool: &str,
args: &[u8],
block_id: u64,
) -> Result<u128> {
self.archival_rate_limiter.write().await.until_ready().await;
let result = view_function_call(
&self.near_client,
QueryRequest::CallFunction {
account_id: staking_pool.parse()?,
method_name: "get_account_unstaked_balance".to_string(),
args: FunctionArgs::from(args.to_vec()),
},
BlockReference::BlockId(Height(block_id)),
)
.await;

match result {
Ok(v) => Ok(serde_json::from_slice::<String>(&v)?.parse::<u128>()?),
Err(e) => {
bail!(
"Error getting staking details for staking pool: {}, error: {:?}",
staking_pool,
e
);
}
}
}

async fn get_staked_balance(
&self,
staking_pool: &str,
args: &[u8],
block_id: u64,
) -> Result<u128> {
self.archival_rate_limiter.write().await.until_ready().await;
let result = view_function_call(
&self.near_client,
QueryRequest::CallFunction {
account_id: staking_pool.parse()?,
method_name: "get_account_staked_balance".to_string(),
args: FunctionArgs::from(args.to_vec()),
},
BlockReference::BlockId(Height(block_id)),
)
.await;

match result {
Ok(v) => Ok(serde_json::from_slice::<String>(&v)?.parse::<u128>()?),
Err(e) => {
bail!(
"Error getting staking details for staking pool: {}, error: {:?}",
staking_pool,
e
);
}
}
}

async fn is_unstaked_balance_available(
&self,
staking_pool: &str,
args: &[u8],
block_id: u64,
) -> Result<bool> {
self.archival_rate_limiter.write().await.until_ready().await;
let result = view_function_call(
&self.near_client,
QueryRequest::CallFunction {
account_id: staking_pool.parse()?,
method_name: "is_account_unstaked_balance_available".to_string(),
args: FunctionArgs::from(args.to_vec()),
},
BlockReference::BlockId(Height(block_id)),
)
.await;

match result {
Ok(v) => Ok(serde_json::from_slice::<bool>(&v)?),
Err(e) => {
bail!(
"Error getting staking details for staking pool: {}, error: {:?}",
staking_pool,
e
);
}
}
}
}

pub async fn view_function_call(
Expand Down

0 comments on commit 8f46e1f

Please sign in to comment.