diff --git a/Cargo.lock b/Cargo.lock index aaca63d..514a7b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -131,6 +131,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.2" @@ -158,12 +164,27 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.4.0" @@ -243,6 +264,15 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +[[package]] +name = "cpufeatures" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1" +dependencies = [ + "libc", +] + [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -262,6 +292,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "encoding_rs" version = "0.8.32" @@ -303,6 +353,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -330,9 +381,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ "futures-core", + "futures-sink", "futures-task", "pin-project-lite", "pin-utils", + "slab", +] + +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", ] [[package]] @@ -377,6 +440,31 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "headers" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" +dependencies = [ + "base64 0.13.1", + "bitflags", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.4.1" @@ -625,6 +713,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -645,6 +743,24 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -783,6 +899,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.22", +] + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -903,7 +1039,7 @@ version = "0.11.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" dependencies = [ - "base64", + "base64 0.21.2", "bytes", "encoding_rs", "futures-core", @@ -945,7 +1081,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -975,7 +1111,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" dependencies = [ - "base64", + "base64 0.21.2", ] [[package]] @@ -994,6 +1130,12 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.1.0" @@ -1064,6 +1206,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -1110,6 +1263,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "stock_cli" version = "0.1.0" @@ -1121,6 +1280,7 @@ dependencies = [ "clap", "time 0.3.22", "tokio", + "warp", "yahoo_finance_api", ] @@ -1317,6 +1477,29 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.6.10" @@ -1358,6 +1541,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-core", ] @@ -1377,6 +1561,40 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" +dependencies = [ + "base64 0.13.1", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" + +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -1436,6 +1654,37 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba431ef570df1287f7f8b07e376491ad54f84d26ac473489427231e1718e1f69" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http", + "hyper", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "rustls-pemfile", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-stream", + "tokio-tungstenite", + "tokio-util 0.7.8", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.10.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 76d025e..0f14a64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,9 @@ version = "0.1.0" chrono = { version = "0.4", features = ["serde"] } clap = {version = "3.1.8", features = ["derive"]} yahoo_finance_api = { version = "1.1" } +warp = "0.3" tokio = { version = "1.29", features= ["full"]} time = { version="0.3" } async-trait = { version = "0.1" } -actix = "0.11" +actix = "0.11.0" actix-rt = "2.2" diff --git a/src/actors/mod.rs b/src/actors/mod.rs index a3475d3..34c71e8 100644 --- a/src/actors/mod.rs +++ b/src/actors/mod.rs @@ -1,3 +1,5 @@ +use std::{collections::VecDeque, fs}; + use actix::{Actor, Context, Handler, Message, ResponseFuture}; use chrono::{DateTime, Utc}; use tokio::{fs::OpenOptions, io::AsyncWriteExt}; @@ -100,13 +102,78 @@ impl Handler for DataSaveActor { Box::pin(async move { let path = msg.path; let content = msg.content; + + let final_content = if fs::metadata(&path).is_ok() { + format!("{}\n", content) + } else { + format!( + "period start,symbol,price,change %,min,max,30d avg\n{}\n", + content + ) + }; let mut file = OpenOptions::new() .append(true) .create(true) .open(path) .await?; - file.write_all(content.as_bytes()).await?; + file.write_all(final_content.as_bytes()).await?; Ok(()) }) } } + +pub struct BufferActor { + buffer: VecDeque, + max_buffer_size: usize, +} + +impl BufferActor { + pub fn new(max_buffer_size: usize) -> Self { + Self { + buffer: VecDeque::with_capacity(max_buffer_size), + max_buffer_size, + } + } +} + +#[derive(Message)] +#[rtype(result = "Result<(), std::io::Error>")] +pub struct BufferMessage { + pub content: String, +} + +impl Actor for BufferActor { + type Context = Context; +} + +impl Handler for BufferActor { + type Result = Result<(), std::io::Error>; + fn handle(&mut self, msg: BufferMessage, _: &mut Self::Context) -> Self::Result { + if self.buffer.len() == self.max_buffer_size { + self.buffer.pop_back(); + } + self.buffer.push_front(msg.content); + Ok(()) + } +} + +#[derive(Message)] +#[rtype(result = "Result, std::io::Error>")] +pub struct BufferReadMessage { + pub n: usize, +} + +impl Handler for BufferActor { + type Result = Result, std::io::Error>; + fn handle(&mut self, msg: BufferReadMessage, _: &mut Self::Context) -> Self::Result { + let mut content = vec![]; + for _ in 0..msg.n { + if let Some(line) = self.buffer.pop_front() { + content.push(line); + } else { + break; + } + } + Ok(content) + } +} diff --git a/src/main.rs b/src/main.rs index 2b1231d..9a0d653 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,36 +1,49 @@ -use std::{error::Error, time::Duration}; +use std::{error::Error, time::Duration, usize}; -use actix::Actor; +use actix::{Actor, Addr}; use chrono::{DateTime, Utc}; use clap::Parser; use stock_cli::{ actors::{ - DataLoadActor, DataLoadMessage, DataProcessActor, DataProcessMessage, DataSaveActor, - DataSaveMessage, + BufferActor, BufferMessage, BufferReadMessage, DataLoadActor, DataLoadMessage, + DataProcessActor, DataProcessMessage, DataSaveActor, DataSaveMessage, }, types::Opts, }; use tokio::time; +use warp::Filter; #[actix_rt::main] async fn main() -> Result<(), Box> { let opts = Opts::parse(); let from: DateTime = opts.from.parse().expect("Couldn't parse 'from' date"); let to = Utc::now(); - let mut interval = time::interval(Duration::from_secs(30)); + let mut interval = time::interval(Duration::from_secs(10)); + let path: String = opts.source.parse().expect("Couldn't parse 'path'"); let save_path = path.replace(".txt", ".csv"); - let max_iters = opts.max_iterations; + // let max_iters = opts.max_iterations; // let content = fs::read_to_string(path).await?; // let mono_addr = MonoActor {}.start(); let data_load_addr = DataLoadActor {}.start(); let data_process_addr = DataProcessActor {}.start(); let data_save_addr = DataSaveActor {}.start(); + let buffer_actor_addr = BufferActor::new(20).start(); + + let buffer_addr = buffer_actor_addr.clone(); + let tail_handler = warp::path!("tail" / usize) + .and(with_db(buffer_addr)) + .and(warp::get()) + .and_then(buffer_read_handler); // a simple way to output a CSV header let mut iterations = 1; + tokio::spawn(async { + warp::serve(tail_handler).run(([127, 0, 0, 1], 8080)).await; + }); println!("period start,symbol,price,change %,min,max,30d avg"); + loop { // let mono_process = mono_addr // .send(MonoMessage { @@ -40,9 +53,10 @@ async fn main() -> Result<(), Box> { // }) // .await??; // println!("{}\niteration #{}", mono_process, iterations); - if iterations > max_iters { - break; - } + interval.tick().await; + // if iterations > max_iters { + // break; + // } let data_load_process = data_load_addr .send(DataLoadMessage { path: path.to_string(), @@ -51,21 +65,48 @@ async fn main() -> Result<(), Box> { let data_process_process = data_process_addr .send(DataProcessMessage { - content: data_load_process.to_string(), + content: data_load_process, from: from.clone(), to: to.clone(), }) .await??; + println!( + "{}\n ----------------------Iteration #{}--------------------------", + data_process_process, iterations, + ); + + buffer_actor_addr + .send(BufferMessage { + content: data_process_process.clone(), + }) + .await??; + data_save_addr .send(DataSaveMessage { - content: data_process_process.to_string(), + content: data_process_process, path: save_path.to_string(), }) .await??; iterations += 1; - - interval.tick().await; } - Ok(()) + + // Ok(()) } + +async fn buffer_read_handler(n: usize, db: Db) -> Result { + match db.send(BufferReadMessage { n }).await { + Ok(it) => { + return Ok(warp::reply::json::>( + &it.unwrap_or(Vec::default()), + )) + } + _ => return Err(warp::reject::not_found()), + }; +} + +fn with_db(db: Db) -> impl Filter + Clone { + warp::any().map(move || db.clone()) +} + +type Db = Addr; diff --git a/src/types/mod.rs b/src/types/mod.rs index 8935f73..388471a 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -17,10 +17,9 @@ pub struct Opts { /// Required start date for the period to fetch #[clap(short, long)] pub from: String, - - /// Optional number of max iterations to run - #[clap(short, long, default_value = "1")] - pub max_iterations: usize, + // Optional number of max iterations to run + // #[clap(short, long, default_value = "1")] + // pub max_iterations: usize, } #[derive(Debug)] diff --git a/src/utils/mod.rs b/src/utils/mod.rs index ff0e059..aea039c 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -42,7 +42,7 @@ pub async fn fetch_stock_data( ) -> Result { let csv_symbols = content.split(','); let mut handles = vec![]; - let mut lines = vec!["period start,symbol,price,change %,min,max,30d avg".to_string()]; + let mut lines = vec![]; for symbol in csv_symbols { handles.push(tokio::spawn(fetch_symbol( symbol.to_owned(), @@ -61,12 +61,12 @@ pub async fn fetch_stock_data( async fn fetch_symbol(symbol: String, from: DateTime, to: DateTime) -> String { let closes_fetched = fetch_closing_data(&symbol, &from, &to).await; if closes_fetched.is_err() { - return "Could not fetch closing data".to_string(); + return "N/A,N/A,N/A,N/A,N/A,N/A,N/A".to_string(); } let closes = closes_fetched.unwrap(); if closes.is_empty() { - return "Retrieved Series is is empty".to_string(); + return "N/A,N/A,N/A,N/A,N/A,N/A,N/A".to_string(); } // min/max of the period. unwrap() because those are Option types let max_signal = MaxPrice {};