Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add address indexer #6

Merged
merged 5 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 311 additions & 44 deletions src-tauri/Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ reqwest = "0.12.7"
tar = "0.4.41"
thiserror = "1.0.63"
sha256 = "1.5.0"
futures = "0.3.30"
rusqlite = "0.32.1"
jsonrpsee = { version = "0.24.4", features = ["async-client", "client-core", "http-client"] }
tokio = "1.40.0"
base64 = "0.22.1"

[features]
# This feature is used for production builds or when a dev server is not specified, DO NOT REMOVE!!
Expand All @@ -30,5 +35,6 @@ full-node= []

[dev-dependencies]
mockito = "1.5.0"
serde_json = "1"
tempdir = "0.3.7"
tokio = { version = "1.40.0", features = ["macros", "test-util"] }
25 changes: 25 additions & 0 deletions src-tauri/src/address_index/block_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use super::types::Block;
use futures::stream::Stream;
use std::pin::Pin;

pub trait BlockSource {
fn get_blocks(
&mut self,
) -> crate::error::Result<Pin<Box<dyn Stream<Item = Block> + '_ + Send>>>;
}

#[cfg(test)]
pub mod test {
use super::super::types::{test::get_test_blocks, Block};
use super::*;

pub struct MockBlockSource;

impl BlockSource for MockBlockSource {
fn get_blocks(
&mut self,
) -> crate::error::Result<Pin<Box<dyn Stream<Item = Block> + '_ + Send>>> {
Ok(Box::pin(futures::stream::iter(get_test_blocks())))
}
}
}
45 changes: 45 additions & 0 deletions src-tauri/src/address_index/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use super::types::Tx;

pub trait Database {
async fn get_address_txids(&self, address: &str) -> crate::error::Result<Vec<String>>;
async fn store_tx(&mut self, tx: &Tx) -> crate::error::Result<()>;
/**
* Override if there is a more efficient way to store multiple txs at the same time
*/
async fn store_txs<I>(&mut self, txs: I) -> crate::error::Result<()>
where
I: Iterator<Item = Tx>,
{
for tx in txs {
self.store_tx(&tx).await?;
}
Ok(())
}
}

#[cfg(test)]
pub mod test {
use super::*;
use std::collections::HashMap;

#[derive(Default)]
pub struct MockDB {
address_map: HashMap<String, Vec<String>>,
}

impl Database for MockDB {
async fn get_address_txids(&self, address: &str) -> crate::error::Result<Vec<String>> {
Ok(self.address_map.get(address).unwrap_or(&vec![]).clone())
}

async fn store_tx(&mut self, tx: &Tx) -> crate::error::Result<()> {
for address in &tx.addresses {
self.address_map
.entry(address.clone())
.and_modify(|vec| vec.push(tx.txid.clone()))
.or_insert(vec![tx.txid.clone()]);
}
Ok(())
}
}
}
72 changes: 72 additions & 0 deletions src-tauri/src/address_index/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
pub mod block_source;
pub mod database;
pub mod pivx_rpc;
pub mod sql_lite;
pub mod types;

use block_source::BlockSource;
use database::Database;
use futures::StreamExt;

pub struct AddressIndex<D: Database, B: BlockSource> {
database: D,
block_source: B,
}

impl<D: Database + Send, B: BlockSource + Send> AddressIndex<D, B> {
pub async fn sync(&mut self) -> crate::error::Result<()> {
println!("Starting sync");
let mut stream = self.block_source.get_blocks()?.chunks(10_000);
while let Some(blocks) = stream.next().await {
self.database
.store_txs(blocks.into_iter().flat_map(|block| block.txs.into_iter()))
.await?;
}
Ok(())
}
pub fn new(database: D, block_source: B) -> Self {
Self {
database,
block_source,
}
}
async fn get_address_txids(&self, address: &str) -> crate::error::Result<Vec<String>> {
self.database.get_address_txids(address).await
}
}

#[cfg(test)]
mod test {
use super::block_source::test::MockBlockSource;
use super::database::test::MockDB;
use super::*;

#[tokio::test]
async fn syncs_correctly() -> crate::error::Result<()> {
let mock_db = MockDB::default();
let block_source = MockBlockSource;
let mut address_index = AddressIndex::new(mock_db, block_source);
address_index.sync().await?;
assert_eq!(
address_index.get_address_txids("address1").await?,
vec!["txid1", "txid2", "txid3"]
);
assert_eq!(
address_index.get_address_txids("address2").await?,
vec!["txid1"]
);
assert_eq!(
address_index.get_address_txids("address4").await?,
vec!["txid2"]
);
assert_eq!(
address_index.get_address_txids("address5").await?,
vec!["txid3"]
);
assert_eq!(
address_index.get_address_txids("address6").await?,
Vec::<String>::new()
);
Ok(())
}
}
95 changes: 95 additions & 0 deletions src-tauri/src/address_index/pivx_rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use super::block_source::BlockSource;
use super::types::Block;
use base64::prelude::*;
use futures::stream::Stream;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::HttpClient;
use jsonrpsee::rpc_params;
use reqwest::header::{HeaderMap, HeaderValue};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub struct PIVXRpc {
client: HttpClient,
}

struct BlockStream {
client: HttpClient,
current_block: u64,
current_future: Option<Pin<Box<dyn Future<Output = Option<Block>> + Send>>>,
}

impl BlockStream {
async fn get_next_block(client: HttpClient, current_block: u64) -> Option<Block> {
println!("current block: {}", current_block);
let hash: String = client
.request("getblockhash", rpc_params![current_block])
.await
.unwrap();
let block: Result<Block, _> = client.request("getblock", rpc_params![hash, 2]).await;
if let Err(ref err) = &block {
eprintln!("{}", err);
}
block.ok()
}

pub fn new(client: HttpClient) -> Self {
Self {
client,
current_block: 0,
current_future: None,
}
}
}

impl Stream for BlockStream {
type Item = Block;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(ref mut future) = &mut self.current_future {
let poll = Pin::as_mut(future).poll(cx);
match poll {
Poll::Ready(i) => {
self.current_future = None;
Poll::Ready(i)
}
Poll::Pending => Poll::Pending,
}
} else {
self.as_mut().current_block = self.current_block + 1;
let new_future = Box::pin(Self::get_next_block(
self.client.clone(),
self.current_block,
));
self.current_future = Some(new_future);
self.poll_next(cx)
}
}
}

impl PIVXRpc {
pub async fn new(url: &str) -> crate::error::Result<Self> {
let mut headers = HeaderMap::new();
let credentials = format!("{}:{}", crate::RPC_USERNAME, crate::RPC_PASSWORD);
headers.insert(
"Authorization",
// TODO: remove unwrap
HeaderValue::from_str(&format!("Basic {}", BASE64_STANDARD.encode(credentials)))
.unwrap(),
);
Ok(PIVXRpc {
client: HttpClient::builder().set_headers(headers).build(url)?,
})
}
}

impl BlockSource for PIVXRpc {
fn get_blocks(
&mut self,
) -> crate::error::Result<Pin<Box<dyn Stream<Item = Block> + Send + '_>>> {
let block_stream = BlockStream::new(self.client.clone());

Ok(Box::pin(block_stream))
}
}
120 changes: 120 additions & 0 deletions src-tauri/src/address_index/sql_lite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::path::PathBuf;

use super::database::Database;
use super::types::Tx;
use rusqlite::{params, Connection};

pub struct SqlLite {
connection: Connection,
}

impl SqlLite {
pub async fn new(path: PathBuf) -> crate::error::Result<Self> {
tauri::async_runtime::spawn_blocking(move || {
let connection = Connection::open(path)?;
connection.execute_batch("
BEGIN;
CREATE TABLE IF NOT EXISTS transactions(txid TEXT NOT NULL, address TEXT NOT NULL, PRIMARY KEY (txid, address));
CREATE INDEX IF NOT EXISTS idx_address ON transactions (address);
COMMIT;
")?;
Ok(Self{connection})
}).await?
}
}

impl Database for SqlLite {
async fn get_address_txids(&self, address: &str) -> crate::error::Result<Vec<String>> {
let mut stmt = self
.connection
.prepare("SELECT txid FROM transactions WHERE address=?1")?;
let mut rows = stmt.query([address])?;
let mut txids = vec![];
while let Some(row) = rows.next()? {
let txid: String = row.get(0)?;
txids.push(txid);
}
Ok(txids)
}
async fn store_tx(&mut self, tx: &Tx) -> crate::error::Result<()> {
let txid = &tx.txid;
let mut stmt = self
.connection
.prepare("INSERT OR IGNORE INTO transactions (txid, address) VALUES (?1, ?2);")?;
for address in &tx.addresses {
stmt.execute(params![txid, &address])?;
}
Ok(())
}

async fn store_txs<I>(&mut self, txs: I) -> crate::error::Result<()>
where
I: Iterator<Item = Tx>,
{
let connection = self.connection.transaction()?;
for tx in txs {
let txid = &tx.txid;
for address in &tx.addresses {
connection.execute(
"INSERT OR IGNORE INTO transactions (txid, address) VALUES (?1, ?2);",
params![txid, &address],
)?;
}
}
connection.commit()?;
Ok(())
}
}

#[cfg(test)]
mod test {
use super::super::types::test::get_test_blocks;
use super::*;
use tempdir::TempDir;

async fn test_address_retrival(sql_lite: &SqlLite) -> crate::error::Result<()> {
assert_eq!(
sql_lite.get_address_txids("address1").await?,
vec!["txid1", "txid2", "txid3"]
);
assert_eq!(sql_lite.get_address_txids("address2").await?, vec!["txid1"]);
assert_eq!(sql_lite.get_address_txids("address4").await?, vec!["txid2"]);
assert_eq!(sql_lite.get_address_txids("address5").await?, vec!["txid3"]);
assert_eq!(
sql_lite.get_address_txids("address6").await?,
Vec::<String>::new()
);
Ok(())
}

#[tokio::test]
async fn test_sqlite() -> crate::error::Result<()> {
let temp_dir = TempDir::new("sqlite-test")?;
let mut sql_lite = SqlLite::new(temp_dir.path().join("test.sqlite")).await?;
let test_blocks = get_test_blocks();
for block in test_blocks {
for tx in block.txs {
sql_lite.store_tx(&tx).await?;
}
}
test_address_retrival(&sql_lite).await?;
Ok(())
}

#[tokio::test]
async fn test_sqlite_batch() -> crate::error::Result<()> {
let temp_dir = TempDir::new("sqlite-test-batch")?;
let mut sql_lite = SqlLite::new(temp_dir.path().join("test.sqlite")).await?;
let test_blocks = get_test_blocks();
sql_lite
.store_txs(
test_blocks
.into_iter()
.flat_map(|block| block.txs.into_iter()),
)
.await?;

test_address_retrival(&sql_lite).await?;
Ok(())
}
}
Loading
Loading