Skip to content

Commit

Permalink
feat: Estimator Flow
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdubey521 committed Jun 14, 2024
1 parent 6d59779 commit af311fa
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 48 deletions.
5 changes: 4 additions & 1 deletion config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@ infra:
server:
port: 8080
host: 'localhost'
is_indexer: true
indexer_config:
is_indexer: true
indexer_update_topic: indexer_update
indexer_update_message: message
53 changes: 45 additions & 8 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ pub struct Config {
pub infra: InfraConfig,
// API Server Configuration
pub server: ServerConfig,
// Whether to run the routes indexer + algorithm on this node.
pub is_indexer: bool,
// Configuration for the indexer
pub indexer_config: IndexerConfig,
}

impl Config {
Expand Down Expand Up @@ -125,7 +125,7 @@ impl Config {
coingecko: raw_config.coingecko,
infra: raw_config.infra,
server: raw_config.server,
is_indexer: raw_config.is_indexer,
indexer_config: raw_config.indexer_config,
})
}
}
Expand Down Expand Up @@ -227,7 +227,7 @@ pub struct RawConfig {
pub covalent: CovalentConfig,
pub infra: InfraConfig,
pub server: ServerConfig,
pub is_indexer: bool,
pub indexer_config: IndexerConfig,
}

#[derive(Debug, Deserialize, Validate)]
Expand Down Expand Up @@ -271,6 +271,20 @@ impl Hash for BucketConfig {
}
}

impl PartialEq<Self> for BucketConfig {
fn eq(&self, other: &Self) -> bool {
let mut s1 = DefaultHasher::new();
let mut s2 = DefaultHasher::new();

self.hash(&mut s1);
other.hash(&mut s2);

s1.finish() == s2.finish()
}
}

impl Eq for BucketConfig {}

#[derive(Debug, Deserialize, Validate)]
pub struct ChainConfig {
// The chain id
Expand Down Expand Up @@ -387,6 +401,17 @@ pub struct ServerConfig {
pub host: String,
}

#[derive(Debug, Deserialize, Validate)]
pub struct IndexerConfig {
pub is_indexer: bool,

#[validate(min_length = 1)]
pub indexer_update_topic: String,

#[validate(min_length = 1)]
pub indexer_update_message: String,
}

#[cfg(test)]
mod tests {
use crate::config::{Config, ConfigError};
Expand Down Expand Up @@ -448,7 +473,10 @@ infra:
server:
port: 8080
host: 'localhost'
is_indexer: true
indexer_config:
is_indexer: true
indexer_update_topic: indexer_update
indexer_update_message: message
"#;
let config = Config::from_yaml_str(&config).unwrap();

Expand Down Expand Up @@ -509,7 +537,10 @@ is_indexer: true
assert_eq!(config.infra.redis_url, "redis://localhost:6379");
assert_eq!(config.infra.rabbitmq_url, "amqp://localhost:5672");
assert_eq!(config.infra.mongo_url, "mongodb://localhost:27017");
assert_eq!(config.is_indexer, true);

assert_eq!(config.indexer_config.is_indexer, true);
assert_eq!(config.indexer_config.indexer_update_topic, "indexer_update");
assert_eq!(config.indexer_config.indexer_update_message, "message");
}

#[test]
Expand Down Expand Up @@ -540,7 +571,10 @@ infra:
server:
port: 8080
host: 'localhost'
is_indexer: true
indexer_config:
is_indexer: true
indexer_update_topic: indexer_update
indexer_update_message: message
"#;

assert_eq!(
Expand Down Expand Up @@ -600,7 +634,10 @@ infra:
server:
port: 8080
host: 'localhost'
is_indexer: true
indexer_config:
is_indexer: true
indexer_update_topic: indexer_update
indexer_update_message: message
"#;

assert_eq!(
Expand Down
4 changes: 3 additions & 1 deletion crates/routing_engine/src/estimator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::Debug;

use serde::{Deserialize, Serialize};

pub use linear_regression_estimator::LinearRegressionEstimator;
Expand All @@ -11,7 +13,7 @@ pub struct DataPoint<Input, Output> {
}

pub trait Estimator<'de, Input, Output>: Serialize + Deserialize<'de> {
type Error;
type Error: Debug;

fn build(data: Vec<DataPoint<Input, Output>>) -> Result<Self, Self::Error>;

Expand Down
158 changes: 132 additions & 26 deletions crates/routing_engine/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,65 @@
use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher};

use derive_more::Display;
use futures::stream::StreamExt;

use config::BucketConfig;
use storage;

use crate::{CostType, estimator, Route, RouteError, source, token_price};
use crate::estimator::Estimator;

const SOURCE_FETCH_PER_BUCKET_RATE_LIMIT: usize = 10;
const BUCKET_PROCESSING_RATE_LIMIT: usize = 5;

struct Indexer<
'a,
'config,
Source: source::RouteSource,
ModelStore: storage::RoutingModelStore,
Producer: storage::MessageQueue,
TokenPriceProvider: token_price::TokenPriceProvider,
> {
config: &'a config::Config,
source: &'a Source,
model_store: &'a ModelStore,
message_producer: &'a Producer,
token_price_provider: &'a TokenPriceProvider,
config: &'config config::Config,
source: &'config Source,
model_store: &'config ModelStore,
message_producer: &'config Producer,
token_price_provider: &'config TokenPriceProvider,
}

const POINTS_COUNT_PER_BUCKET: u8 = 10;

impl<
'a,
'config,
RouteSource: source::RouteSource,
ModelStore: storage::RoutingModelStore,
Producer: storage::MessageQueue,
TokenPriceProvider: token_price::TokenPriceProvider,
> Indexer<'a, RouteSource, ModelStore, Producer, TokenPriceProvider>
> Indexer<'config, RouteSource, ModelStore, Producer, TokenPriceProvider>
{
fn new(
config: &'a config::Config,
source: &'a RouteSource,
model_store: &'a ModelStore,
message_producer: &'a Producer,
token_price_provider: &'a TokenPriceProvider,
config: &'config config::Config,
source: &'config RouteSource,
model_store: &'config ModelStore,
message_producer: &'config Producer,
token_price_provider: &'config TokenPriceProvider,
) -> Self {
Indexer { config, source, model_store, message_producer, token_price_provider }
}

fn generate_bucket_observation_points(bucket: &BucketConfig) -> Vec<f64> {
const POINTS_COUNT: u8 = 10;

(0..=(POINTS_COUNT - 1))
(0..POINTS_COUNT_PER_BUCKET)
.into_iter()
.map(|i| {
bucket.token_amount_from_usd
+ (i as f64) * (bucket.token_amount_to_usd - bucket.token_amount_from_usd)
/ (POINTS_COUNT as f64)
/ (POINTS_COUNT_PER_BUCKET as f64)
})
.collect()
}

async fn build_estimator<Estimator: estimator::Estimator<'a, f64, f64>>(
async fn build_estimator<'est_de, Estimator: estimator::Estimator<'est_de, f64, f64>>(
&self,
bucket: &'a BucketConfig,
bucket: &'config BucketConfig,
cost_type: &CostType,
) -> Result<Estimator, Estimator::Error> {
// Generate Data to "Train" Estimator
Expand Down Expand Up @@ -87,7 +91,7 @@ impl<

Ok::<
estimator::DataPoint<f64, f64>,
IndexerErrors<'a, TokenPriceProvider, RouteSource>,
IndexerErrors<'config, TokenPriceProvider, RouteSource, ModelStore>,
>(estimator::DataPoint {
x: input_value_in_usd,
y: fee_in_usd,
Expand All @@ -98,7 +102,7 @@ impl<
.collect::<Vec<
Result<
estimator::DataPoint<f64, f64>,
IndexerErrors<'a, TokenPriceProvider, RouteSource>,
IndexerErrors<'config, TokenPriceProvider, RouteSource, ModelStore>,
>,
>>()
.await
Expand All @@ -110,13 +114,111 @@ impl<
// Build the Estimator
Estimator::build(data_points)
}

async fn publish_estimator<
'est,
'est_de,
Estimator: estimator::Estimator<'est_de, f64, f64>,
>(
&self,
bucket_config: &'config BucketConfig,
estimator: &'est Estimator,
) -> Result<(), IndexerErrors<TokenPriceProvider, RouteSource, ModelStore>> {
let mut s = DefaultHasher::new();
bucket_config.hash(&mut s);

let key = s.finish();
let value = serde_json::to_string(estimator).unwrap();

Ok(self
.model_store
.set(key.to_string(), value)
.await
.map_err(IndexerErrors::PublishEstimatorError)?)
}

pub async fn run<'est_de, Estimator: estimator::Estimator<'est_de, f64, f64>>(
&self,
) -> Result<
HashMap<&'config BucketConfig, Estimator>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore>,
> {
// Build Estimators
let estimator_map: HashMap<&BucketConfig, Estimator> =
futures::stream::iter(self.config.buckets.iter())
.map(|bucket| async move {
// Build the Estimator
let estimator: Estimator = self.build_estimator(bucket, &CostType::Fee).await?;

Ok::<(&BucketConfig, Estimator), Estimator::Error>((bucket, estimator))
})
.buffer_unordered(BUCKET_PROCESSING_RATE_LIMIT)
.collect::<Vec<_>>()
.await
.into_iter()
.filter(|r| r.is_ok())
.map(|r| r.unwrap())
.collect();

// Publish Estimators
// todo: batch update?
let failed_publish_errors: Vec<_> = futures::stream::iter(estimator_map.iter())
.map(|(bucket, estimator)| self.publish_estimator(bucket, estimator))
.buffer_unordered(BUCKET_PROCESSING_RATE_LIMIT)
.collect::<Vec<_>>()
.await
.into_iter()
.filter_map(|result| {
if result.is_err() {
return match result.unwrap_err() {
IndexerErrors::PublishEstimatorError(err) => Some(err),
_ => None,
};
}
None
})
.collect();
if failed_publish_errors.len() > 0 {
return Err(IndexerErrors::PublishEstimatorErrors(failed_publish_errors));
}

// Broadcast a Message to other nodes to update their cache
self.message_producer
.publish(
&self.config.indexer_config.indexer_update_topic,
&self.config.indexer_config.indexer_update_message,
)
.await
.map_err(IndexerErrors::PublishIndexerUpdateMessageError)?;

Ok(estimator_map)
}
}

#[derive(Debug)]
enum IndexerErrors<'a, T: token_price::TokenPriceProvider, S: source::RouteSource> {
#[derive(Debug, Display)]
enum IndexerErrors<
'config,
T: token_price::TokenPriceProvider,
S: source::RouteSource,
R: storage::RoutingModelStore,
> {
#[display(fmt = "Route build error: {}", _0)]
RouteBuildError(RouteError),
TokenPriceProviderError(token_price::utils::Errors<'a, T::Error>),

#[display(fmt = "Token price provider error: {}", _0)]
TokenPriceProviderError(token_price::utils::Errors<'config, T::Error>),

#[display(fmt = "Route source error: {}", _0)]
RouteSourceError(S::FetchRouteCostError),

#[display(fmt = "Publish estimator error: {}", _0)]
PublishEstimatorError(R::Error),

#[display(fmt = "Publish estimator errors: {:?}", _0)]
PublishEstimatorErrors(Vec<R::Error>),

#[display(fmt = "Indexer update message error: {}", _0)]
PublishIndexerUpdateMessageError(String),
}

#[cfg(test)]
Expand All @@ -132,6 +234,7 @@ mod tests {
use crate::source::BungeeClient;
use crate::token_price::TokenPriceProvider;

#[derive(Debug)]
struct ModelStoreStub;
impl RoutingModelStore for ModelStoreStub {
type Error = ();
Expand Down Expand Up @@ -227,7 +330,10 @@ infra:
server:
port: 8080
host: 'localhost'
is_indexer: true
indexer_config:
is_indexer: true
indexer_update_topic: indexer_update
indexer_update_message: message
"#,
)
.unwrap();
Expand Down
5 changes: 4 additions & 1 deletion crates/routing_engine/src/source/bungee/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ infra:
server:
port: 8080
host: 'localhost'
is_indexer: true
indexer_config:
is_indexer: true
indexer_update_topic: indexer_update
indexer_update_message: message
"#,
)
.unwrap();
Expand Down
Loading

0 comments on commit af311fa

Please sign in to comment.