Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): Add in-memory-store and pruning-delay parameters #490

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ name = "lumina"
path = "src/main.rs"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
blockstore.workspace = true
celestia-rpc = { workspace = true, features = ["p2p"] }
celestia-types.workspace = true
libp2p.workspace = true
Expand Down
85 changes: 64 additions & 21 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,26 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Context, Result};
use blockstore::EitherBlockstore;
use celestia_rpc::prelude::*;
use celestia_rpc::Client;
use clap::{value_parser, Parser};
use directories::ProjectDirs;
use libp2p::multiaddr::{Multiaddr, Protocol};
use lumina_node::blockstore::RedbBlockstore;
use lumina_node::blockstore::{InMemoryBlockstore, RedbBlockstore};
use lumina_node::events::NodeEvent;
use lumina_node::network::Network;
use lumina_node::node::Node;
use lumina_node::store::{RedbStore, Store};
use lumina_node::node::{Node, MIN_PRUNING_DELAY, MIN_SAMPLING_WINDOW};
use lumina_node::store::{EitherStore, InMemoryStore, RedbStore, Store as _};
use tokio::task::spawn_blocking;
use tracing::info;
use tracing::warn;

const CELESTIA_LOCAL_BRIDGE_RPC_ADDR: &str = "ws://localhost:36658";

type Blockstore = EitherBlockstore<InMemoryBlockstore, RedbBlockstore>;
type Store = EitherStore<InMemoryStore, RedbStore>;

#[derive(Debug, Parser)]
pub(crate) struct Params {
/// Network to connect.
Expand All @@ -36,34 +40,51 @@ pub(crate) struct Params {
pub(crate) bootnodes: Vec<Multiaddr>,

/// Persistent header store path.
#[arg(short, long = "store")]
#[arg(short, long)]
pub(crate) store: Option<PathBuf>,

/// Sampling window size, defines maximum age of headers considered for syncing and sampling.
/// Headers older than sampling window by more than an hour are eligible for pruning.
#[arg(long = "sampling-window", verbatim_doc_comment)]
/// Use in-memory store.
#[arg(long)]
pub(crate) in_memory_store: bool,

/// Sampling window defines maximum age of a block considered for syncing and sampling.
#[arg(long)]
#[clap(value_parser = parse_duration::parse)]
pub(crate) sampling_window: Option<Duration>,

/// Pruning delay defines how much time the pruner should wait after sampling window in
/// order to prune the block.
#[arg(long)]
#[clap(value_parser = parse_duration::parse)]
pub(crate) pruning_delay: Option<Duration>,
}

pub(crate) async fn run(args: Params) -> Result<()> {
info!("Initializing store");
let db = open_db(args.store, args.network.id()).await?;
let store = RedbStore::new(db.clone()).await?;
let blockstore = RedbBlockstore::new(db);

let stored_ranges = store.get_stored_header_ranges().await?;
if stored_ranges.is_empty() {
info!("Initialised new store");
let (blockstore, store) = if args.in_memory_store {
open_in_memory_stores()
} else {
info!("Initialised store, present headers: {stored_ranges}");
}
open_db_stores(args.store, args.network.id()).await?
};

let mut node_builder = Node::builder()
.store(store)
.blockstore(blockstore)
.network(args.network.clone());

if let Some(sampling_window) = args.sampling_window {
node_builder = node_builder.sampling_window(sampling_window);
} else if args.in_memory_store {
// In-memory stores are memory hungry, so we lower sampling window.
node_builder = node_builder.sampling_window(MIN_SAMPLING_WINDOW);
}

if let Some(pruning_delay) = args.pruning_delay {
node_builder = node_builder.pruning_delay(pruning_delay);
} else if args.in_memory_store {
// In-memory stores are memory hungry, so we lower pruning window.
node_builder = node_builder.pruning_delay(MIN_PRUNING_DELAY);
}

if args.bootnodes.is_empty() {
if args.network.is_custom() {
let bootnodes = fetch_bridge_multiaddrs(CELESTIA_LOCAL_BRIDGE_RPC_ADDR).await?;
Expand All @@ -77,10 +98,6 @@ pub(crate) async fn run(args: Params) -> Result<()> {
node_builder = node_builder.listen(args.listen_addrs);
}

if let Some(sampling_window) = args.sampling_window {
node_builder = node_builder.sampling_window(sampling_window);
}

let (_node, mut events) = node_builder
.start_subscribed()
.await
Expand All @@ -98,6 +115,32 @@ pub(crate) async fn run(args: Params) -> Result<()> {
Ok(())
}

fn open_in_memory_stores() -> (Blockstore, Store) {
info!("Initializing in-memory store");
let store = InMemoryStore::new();
let blockstore = InMemoryBlockstore::new();
(EitherBlockstore::Left(blockstore), EitherStore::Left(store))
}

async fn open_db_stores(path: Option<PathBuf>, network_id: &str) -> Result<(Blockstore, Store)> {
info!("Initializing store");
let db = open_db(path, network_id).await?;
let store = RedbStore::new(db.clone()).await?;
let blockstore = RedbBlockstore::new(db);

let stored_ranges = store.get_stored_header_ranges().await?;
if stored_ranges.is_empty() {
info!("Initialised new store",);
} else {
info!("Initialised store, present headers: {stored_ranges}");
}

Ok((
EitherBlockstore::Right(blockstore),
EitherStore::Right(store),
))
}

async fn open_db(path: Option<PathBuf>, network_id: &str) -> Result<Arc<redb::Database>> {
let network_id = network_id.to_owned();

Expand Down
Loading