Skip to content

Commit

Permalink
async streaming on a schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
Parry-97 committed Jul 8, 2023
1 parent 16a5088 commit a20d16c
Showing 1 changed file with 74 additions and 26 deletions.
100 changes: 74 additions & 26 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use ::time::OffsetDateTime;
use async_trait::async_trait;
use chrono::prelude::*;
use clap::Parser;
use std::io::{Error, ErrorKind};
use time::OffsetDateTime;
use tokio::{fs, join, time};
use yahoo_finance_api as yahoo;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -172,39 +173,86 @@ async fn fetch_closing_data(
Ok(vec![])
}
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
async fn main() {
let opts = Opts::parse();
let from: DateTime<Utc> = opts.from.parse().expect("Couldn't parse 'from' date");
let to = Utc::now();

let csv_content = fs::read_to_string("sp500.may.2020.txt")
.await
.expect("Couldn't read symbols file");
// a simple way to output a CSV header
//
let mut interval = time::interval(time::Duration::from_secs(30));

println!("period start,symbol,price,change %,min,max,30d avg");
for symbol in opts.symbols.split(',') {
let closes = fetch_closing_data(&symbol, &from, &to).await?;
if !closes.is_empty() {
// min/max of the period. unwrap() because those are Option types
let period_max: f64 = max(&closes).unwrap();
let period_min: f64 = min(&closes).unwrap();
let last_price = *closes.last().unwrap_or(&0.0);
let (_, pct_change) = price_diff(&closes).unwrap_or((0.0, 0.0));
let sma = n_window_sma(30, &closes).unwrap_or_default();

// a simple way to output CSV data
println!(
"{},{},${:.2},{:.2}%,${:.2},${:.2},${:.2}",
from.to_rfc3339(),
symbol,
last_price,
pct_change * 100.0,
period_min,
period_max,
sma.last().unwrap_or(&0.0)
);
}
loop {
fetch_stock_data(csv_content.as_str(), &from, &to).await;
interval.tick().await;
}
}

async fn fetch_stock_data(content: &str, from: &DateTime<Utc>, to: &DateTime<Utc>) {
let csv_symbols = content.split(',');
let mut handles = vec![];

for symbol in csv_symbols {
handles.push(tokio::spawn(fetch_symbol(
symbol.to_owned(),
from.clone(),
to.clone(),
)));
}

for handle in handles {
handle.await.unwrap();
}
}

async fn fetch_symbol(symbol: String, from: DateTime<Utc>, to: DateTime<Utc>) {
let closes_fetched = fetch_closing_data(&symbol, &from, &to).await;
if closes_fetched.is_err() {
eprintln!("Error fetch closing data for {}", symbol);
return;
}
let closes = closes_fetched.unwrap();

if !closes.is_empty() {
// min/max of the period. unwrap() because those are Option types
let max_signal = MaxPrice {};

let min_signal = MinPrice {};

let last_price = *closes.last().unwrap_or(&0.0);

let price_diff_signal = PriceDifference {};

// .await
// .unwrap_or((0.0, 0.0));
let sma_signal = WindowedSMA { window_size: 30 };

let (period_max, period_min, price_diff, sma) = join!(
max_signal.calculate(&closes),
min_signal.calculate(&closes),
price_diff_signal.calculate(&closes),
sma_signal.calculate(&closes)
);

let (_, pct_change) = price_diff.unwrap_or((0.0, 0.0));

// a simple way to output CSV data
println!(
"{},{},${:.2},{:.2}%,${:.2},${:.2},${:.2}",
from.to_rfc3339(),
symbol,
last_price,
pct_change * 100.0,
period_min.unwrap(),
period_max.unwrap(),
sma.unwrap().last().unwrap_or(&0.0)
);
}
Ok(())
}

#[cfg(test)]
Expand Down

0 comments on commit a20d16c

Please sign in to comment.