From a20d16c763599450b05cc5bad94209147d9302fd Mon Sep 17 00:00:00 2001 From: Param Pal Singh Date: Sat, 8 Jul 2023 09:00:13 +0200 Subject: [PATCH] async streaming on a schedule --- src/main.rs | 100 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 74 insertions(+), 26 deletions(-) diff --git a/src/main.rs b/src/main.rs index 444b3b8..122a003 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,9 @@ +use ::time::OffsetDateTime; use async_trait::async_trait; use chrono::prelude::*; use clap::Parser; use std::io::{Error, ErrorKind}; -use time::OffsetDateTime; +use tokio::{fs, join, time}; use yahoo_finance_api as yahoo; #[derive(Parser, Debug)] @@ -172,39 +173,86 @@ async fn fetch_closing_data( Ok(vec![]) } } - #[tokio::main] -async fn main() -> std::io::Result<()> { +async fn main() { let opts = Opts::parse(); let from: DateTime = opts.from.parse().expect("Couldn't parse 'from' date"); let to = Utc::now(); + let csv_content = fs::read_to_string("sp500.may.2020.txt") + .await + .expect("Couldn't read symbols file"); // a simple way to output a CSV header + // + let mut interval = time::interval(time::Duration::from_secs(30)); + println!("period start,symbol,price,change %,min,max,30d avg"); - for symbol in opts.symbols.split(',') { - let closes = fetch_closing_data(&symbol, &from, &to).await?; - if !closes.is_empty() { - // min/max of the period. unwrap() because those are Option types - let period_max: f64 = max(&closes).unwrap(); - let period_min: f64 = min(&closes).unwrap(); - let last_price = *closes.last().unwrap_or(&0.0); - let (_, pct_change) = price_diff(&closes).unwrap_or((0.0, 0.0)); - let sma = n_window_sma(30, &closes).unwrap_or_default(); - - // a simple way to output CSV data - println!( - "{},{},${:.2},{:.2}%,${:.2},${:.2},${:.2}", - from.to_rfc3339(), - symbol, - last_price, - pct_change * 100.0, - period_min, - period_max, - sma.last().unwrap_or(&0.0) - ); - } + loop { + fetch_stock_data(csv_content.as_str(), &from, &to).await; + interval.tick().await; + } +} + +async fn fetch_stock_data(content: &str, from: &DateTime, to: &DateTime) { + let csv_symbols = content.split(','); + let mut handles = vec![]; + + for symbol in csv_symbols { + handles.push(tokio::spawn(fetch_symbol( + symbol.to_owned(), + from.clone(), + to.clone(), + ))); + } + + for handle in handles { + handle.await.unwrap(); + } +} + +async fn fetch_symbol(symbol: String, from: DateTime, to: DateTime) { + let closes_fetched = fetch_closing_data(&symbol, &from, &to).await; + if closes_fetched.is_err() { + eprintln!("Error fetch closing data for {}", symbol); + return; + } + let closes = closes_fetched.unwrap(); + + if !closes.is_empty() { + // min/max of the period. unwrap() because those are Option types + let max_signal = MaxPrice {}; + + let min_signal = MinPrice {}; + + let last_price = *closes.last().unwrap_or(&0.0); + + let price_diff_signal = PriceDifference {}; + + // .await + // .unwrap_or((0.0, 0.0)); + let sma_signal = WindowedSMA { window_size: 30 }; + + let (period_max, period_min, price_diff, sma) = join!( + max_signal.calculate(&closes), + min_signal.calculate(&closes), + price_diff_signal.calculate(&closes), + sma_signal.calculate(&closes) + ); + + let (_, pct_change) = price_diff.unwrap_or((0.0, 0.0)); + + // a simple way to output CSV data + println!( + "{},{},${:.2},{:.2}%,${:.2},${:.2},${:.2}", + from.to_rfc3339(), + symbol, + last_price, + pct_change * 100.0, + period_min.unwrap(), + period_max.unwrap(), + sma.unwrap().last().unwrap_or(&0.0) + ); } - Ok(()) } #[cfg(test)]