Skip to content

Commit

Permalink
feat: in-memory trading engine (WIP) (#48)
Browse files Browse the repository at this point in the history
* init

* get the base down

* pipelines, conditions and a draft of TradingEngine
  • Loading branch information
piotrostr authored Feb 4, 2025
1 parent 4b58738 commit 0bb624c
Show file tree
Hide file tree
Showing 14 changed files with 2,285 additions and 0 deletions.
1,743 changes: 1,743 additions & 0 deletions listen-trading-engine/Cargo.lock

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions listen-trading-engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "listen-trading-engine"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.95"
base64 = "0.22.1"
chrono = { version = "0.4.39", features = ["serde"] }
ctor = "0.2.9"
dotenv = "0.15.0"
reqwest = { version = "0.12.12", features = ["json"] }
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.138"
tokio = { version = "1.43.0", features = ["rt", "macros"] }
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
uuid = { version = "1.12.1", features = ["serde"] }
9 changes: 9 additions & 0 deletions listen-trading-engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pub mod trading_engine;

pub use trading_engine::TradingEngine;

#[ctor::ctor]
fn init() {
tracing_subscriber::fmt::init();
dotenv::dotenv().ok();
}
6 changes: 6 additions & 0 deletions listen-trading-engine/src/trading_engine/caip2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct Caip2;

impl Caip2 {
pub const SOLANA: &str = "solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp";
pub const ARBITRUM: &str = "eip155:42161";
}
2 changes: 2 additions & 0 deletions listen-trading-engine/src/trading_engine/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub const TEST_ADDRESS_EVM: &str = "0xCCC48877a33a2C14e40c82da843Cf4c607ABF770";
pub const TEST_ADDRESS_SOL: &str = "6fp9frQ16W3kTRGiBVvpMS2NzoixE4Y1MWqYrW9SvTAj";
27 changes: 27 additions & 0 deletions listen-trading-engine/src/trading_engine/evaluator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::collections::HashMap;

use super::pipeline::{Condition, ConditionType};
pub struct Evaluator;

impl Evaluator {
pub fn evaluate_conditions(conditions: &[Condition], prices: &HashMap<String, f64>) -> bool {
conditions
.iter()
.all(|c| Self::evaluate_condition(c, prices))
}

fn evaluate_condition(condition: &Condition, prices: &HashMap<String, f64>) -> bool {
match &condition.condition_type {
ConditionType::PriceAbove { asset, threshold } => {
prices.get(asset).map(|p| p >= threshold).unwrap_or(false)
}
ConditionType::PriceBelow { asset, threshold } => {
prices.get(asset).map(|p| p <= threshold).unwrap_or(false)
}
ConditionType::And(sub) => sub.iter().all(|c| Self::evaluate_condition(c, prices)),
ConditionType::Or(sub) => sub.iter().any(|c| Self::evaluate_condition(c, prices)),
// PercentageChange would require historical data tracking
_ => false,
}
}
}
147 changes: 147 additions & 0 deletions listen-trading-engine/src/trading_engine/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use super::order::Order;
use super::privy_config::PrivyConfig;
use super::types::{SignAndSendEvmTransactionParams, SignAndSendEvmTransactionRequest};
use super::types::{
SignAndSendTransactionParams, SignAndSendTransactionRequest, SignAndSendTransactionResponse,
};
use super::util::create_http_client;
use anyhow::{anyhow, Result};

pub struct Executor {
http_client: reqwest::Client,
}

impl Executor {
pub fn from_env() -> Result<Self> {
let privy_config = PrivyConfig::from_env()?;
let http_client = create_http_client(&privy_config);
Ok(Self { http_client })
}

pub async fn execute_order(&self, order: Order) -> Result<String> {
if order.is_solana() {
if order.solana_transaction.is_none() {
return Err(anyhow!("Solana transaction required for Solana order"));
}
self.execute_solana_transaction(
order.address,
order.solana_transaction.unwrap(),
order.caip2,
)
.await
} else {
if order.evm_transaction.is_none() {
return Err(anyhow!("EVM transaction required for EVM order"));
}
self.execute_evm_transaction(order.address, order.evm_transaction.unwrap(), order.caip2)
.await
}
}

async fn execute_evm_transaction(
&self,
address: String,
transaction: serde_json::Value,
caip2: String,
) -> Result<String> {
tracing::info!(?address, "Executing EVM transaction");
let request = SignAndSendEvmTransactionRequest {
address,
chain_type: "ethereum".to_string(),
method: "eth_sendTransaction".to_string(),
caip2,
params: SignAndSendEvmTransactionParams { transaction },
};

let response = self
.http_client
.post("https://auth.privy.io/api/v1/wallets/rpc")
.json(&request)
.send()
.await?;

if !response.status().is_success() {
return Err(anyhow!(
"Failed to send transaction: {}",
response.text().await?
));
}

let result: SignAndSendTransactionResponse = response.json().await?;
tracing::info!(
?result.method,
?result.data.hash,
?result.data.caip2,
"Transaction sent",
);
Ok(result.data.hash)
}

async fn execute_solana_transaction(
&self,
address: String,
transaction: String,
caip2: String,
) -> Result<String> {
tracing::info!(?address, "Executing Solana transaction");
let request = SignAndSendTransactionRequest {
address,
chain_type: "solana".to_string(),
method: "signAndSendTransaction".to_string(),
caip2,
params: SignAndSendTransactionParams {
transaction,
encoding: "base64".to_string(),
},
};

let response = self
.http_client
.post("https://api.privy.io/v1/wallets/rpc")
.json(&request)
.send()
.await?;

if !response.status().is_success() {
return Err(anyhow!(
"Failed to sign transaction: {}",
response.text().await?
));
}

let result: SignAndSendTransactionResponse = response.json().await?;
tracing::info!(
?result.method,
?result.data.hash,
?result.data.caip2,
"Transaction sent",
);
Ok(result.data.hash)
}
}

#[cfg(test)]
mod tests {
use crate::trading_engine::caip2::Caip2;
use crate::trading_engine::constants::*;
use crate::trading_engine::executor::Executor;
use crate::trading_engine::order::Order;

#[tokio::test]
async fn test_execute_order_eth() {
let engine = Executor::from_env().unwrap();
let order = Order {
user_id: "-".to_string(),
address: TEST_ADDRESS_EVM.to_string(),
caip2: Caip2::ARBITRUM.to_string(),
evm_transaction: Some(serde_json::json!({
"from": TEST_ADDRESS_EVM,
"to": TEST_ADDRESS_EVM,
"value": "0x111",
})),
solana_transaction: None,
};
let result = engine.execute_order(order).await.unwrap();
assert_eq!(result.len(), 66);
}
}
147 changes: 147 additions & 0 deletions listen-trading-engine/src/trading_engine/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
pub mod caip2;
pub mod constants;
pub mod evaluator;
pub mod executor;
pub mod order;
pub mod pipeline;
pub mod privy_config;
pub mod types;
pub mod util;

use std::collections::{HashMap, HashSet};
use tokio::sync::RwLock;

use anyhow::Result;
use uuid::Uuid;

use self::evaluator::Evaluator;
use self::pipeline::{Condition, ConditionType, Pipeline, Status};

pub struct TradingEngine {
executor: executor::Executor,

// Active pipelines indexed by UUID
active_pipelines: RwLock<HashMap<Uuid, Pipeline>>,

// Asset to pipeline index for efficient updates
asset_subscriptions: RwLock<HashMap<String, HashSet<Uuid>>>,

// Current market state
price_cache: RwLock<HashMap<String, f64>>,
}

impl TradingEngine {
pub fn from_env() -> Result<Self> {
Ok(Self {
executor: executor::Executor::from_env()?,
active_pipelines: RwLock::new(HashMap::new()),
asset_subscriptions: RwLock::new(HashMap::new()),
price_cache: RwLock::new(HashMap::new()),
})
}
pub async fn add_pipeline(&self, pipeline: Pipeline) -> Result<()> {
let mut active_pipelines = self.active_pipelines.write().await;
let mut asset_subscriptions = self.asset_subscriptions.write().await;

// Extract all assets mentioned in pipeline conditions
let assets = self.extract_assets(&pipeline).await;

// Update asset subscriptions
for asset in assets {
asset_subscriptions
.entry(asset)
.or_default()
.insert(pipeline.id);
}

active_pipelines.insert(pipeline.id, pipeline);
Ok(())
}

pub async fn handle_price_update(&self, asset: &str, price: f64) -> Result<()> {
// Update price cache
let mut cache = self.price_cache.write().await;
cache.insert(asset.to_string(), price);
drop(cache); // Release lock early

// Get affected pipelines
let subscriptions = self.asset_subscriptions.read().await;
if let Some(pipeline_ids) = subscriptions.get(asset) {
for pipeline_id in pipeline_ids {
if let Some(pipeline) = self.active_pipelines.write().await.get_mut(pipeline_id) {
self.evaluate_pipeline(pipeline).await?;
}
}
}

Ok(())
}

async fn evaluate_pipeline(&self, pipeline: &mut Pipeline) -> Result<()> {
let current_step_ids = pipeline.current_steps.clone();
let price_cache = self.price_cache.read().await.clone();

for step_id in current_step_ids {
if let Some(step) = pipeline.steps.get_mut(&step_id) {
if matches!(step.status, Status::Pending)
&& Evaluator::evaluate_conditions(&step.conditions, &price_cache)
{
// Execute order and update status
match self.executor.execute_order(step.order.clone()).await {
Ok(_) => {
step.status = Status::Completed;
pipeline.current_steps = step.next_steps.clone();
}
Err(e) => {
step.status = Status::Failed;
pipeline.status = Status::Failed;
tracing::error!(%step_id, error = %e, "Order execution failed");
}
}
}
}
}
// Check pipeline completion
if pipeline.current_steps.is_empty() {
pipeline.status = Status::Completed;
}

Ok(())
}

/// Extract all unique assets mentioned in pipeline conditions
async fn extract_assets(&self, pipeline: &Pipeline) -> HashSet<String> {
let mut assets = HashSet::new();
for step in pipeline.steps.values() {
self.collect_assets_from_condition(&step.conditions, &mut assets)
.await;
}
assets
}

async fn collect_assets_from_condition(
&self,
conditions: &[Condition],
assets: &mut HashSet<String>,
) {
let mut stack = Vec::new();
stack.extend(conditions.iter());

while let Some(condition) = stack.pop() {
match &condition.condition_type {
ConditionType::PriceAbove { asset, .. } => {
assets.insert(asset.clone());
}
ConditionType::PriceBelow { asset, .. } => {
assets.insert(asset.clone());
}
ConditionType::PercentageChange { asset, .. } => {
assets.insert(asset.clone());
}
ConditionType::And(sub_conditions) | ConditionType::Or(sub_conditions) => {
stack.extend(sub_conditions.iter());
}
}
}
}
}
16 changes: 16 additions & 0 deletions listen-trading-engine/src/trading_engine/order.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Order {
pub user_id: String,
pub address: String,
pub caip2: String,
pub evm_transaction: Option<serde_json::Value>,
pub solana_transaction: Option<String>, // base64
}

impl Order {
pub fn is_solana(&self) -> bool {
self.caip2.starts_with("solana")
}
}
Loading

0 comments on commit 0bb624c

Please sign in to comment.