diff --git a/.gitignore b/.gitignore index 1728eda..114b535 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /Cargo.lock database/ +venv/ diff --git a/Cargo.toml b/Cargo.toml index 5808a3c..17a18ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,3 +45,4 @@ clap = { version = "4.4.6", features = ["derive"] } tokio = {version="1.0", features=["rt", "macros", "rt-multi-thread"]} simple_logger = "*" num-format = "*" +itertools = "*" diff --git a/examples/export_legs.rs b/examples/export_legs.rs new file mode 100644 index 0000000..4a8d0e7 --- /dev/null +++ b/examples/export_legs.rs @@ -0,0 +1,90 @@ +use std::error::Error; + +use clap::Parser; +use futures::StreamExt; +use itertools::Itertools; +use simple_logger::SimpleLogger; + +use flights::{load_aircrafts, load_private_jet_types}; + +#[derive(clap::ValueEnum, Debug, Clone)] +enum Backend { + Disk, + Azure, +} + +const ABOUT: &'static str = r#"Builds the database of all private jet positions from 2023"#; + +#[derive(Parser, Debug)] +#[command(author, version, about = ABOUT)] +struct Cli { + /// The Azure token + #[arg(short, long)] + azure_sas_token: Option, + #[arg(short, long, value_enum, default_value_t=Backend::Azure)] + backend: Backend, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + SimpleLogger::new() + .with_level(log::LevelFilter::Warn) + .init() + .unwrap(); + + let cli = Cli::parse(); + + // optionally initialize Azure client + let client = match (cli.backend, cli.azure_sas_token.clone()) { + (Backend::Disk, None) => None, + (Backend::Azure, None) => Some(flights::fs_azure::initialize_anonymous( + "privatejets", + "data", + )), + (_, Some(token)) => Some(flights::fs_azure::initialize_sas( + &token, + "privatejets", + "data", + )?), + }; + + // load datasets to memory + let aircrafts = load_aircrafts(client.as_ref()).await?; + let types = load_private_jet_types()?; + + let private_jets = aircrafts + .values() + // its primary use is to be a private jet + .filter(|a| types.contains_key(&a.model)) + .collect::>(); + + let months = (2023..2024) + .cartesian_product(1..=12u8) + .map(|(year, month)| { + time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1) + .expect("day 1 never errors") + }) + .collect::>(); + + let required = private_jets + .into_iter() + .cartesian_product(months.into_iter()) + .collect::>(); + + let tasks = required.into_iter().map(|(aircraft, month)| { + flights::month_positions(month, &aircraft.icao_number, client.as_ref()) + }); + + futures::stream::iter(tasks) + // limit concurrent tasks + .buffered(10) + // continue if error + .map(|r| { + if let Err(e) = r { + log::error!("{e}"); + } + }) + .collect::>() + .await; + Ok(()) +} diff --git a/src/fs_azure.rs b/src/fs_azure.rs index 03fedcd..e45c235 100644 --- a/src/fs_azure.rs +++ b/src/fs_azure.rs @@ -5,7 +5,7 @@ pub use azure_storage_blobs::prelude::ContainerClient as _ContainerClient; use crate::fs::BlobStorageProvider; pub struct ContainerClient { - client: _ContainerClient, + pub client: _ContainerClient, can_put: bool, } @@ -77,16 +77,28 @@ pub(crate) async fn cached_call, std: fetch: F, action: crate::fs::CacheAction, client: Option<&ContainerClient>, -) -> Result, Box> { +) -> Result, std::io::Error> { let Some(client) = client else { - return Ok(crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await?); + return Ok( + crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action) + .await + .map_err(std::io::Error::other)?, + ); }; - let Some(data) = client.maybe_get(blob_name).await? else { + let Some(data) = client + .maybe_get(blob_name) + .await + .map_err(std::io::Error::other)? + else { return Ok(if !client.can_put() { - crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await? + crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action) + .await + .map_err(std::io::Error::other)? } else { - crate::fs::cached(&blob_name, fetch, client, action).await? + crate::fs::cached(&blob_name, fetch, client, action) + .await + .map_err(std::io::Error::other)? }); }; Ok(data) diff --git a/src/icao_to_trace.rs b/src/icao_to_trace.rs index 561d405..3bb5824 100644 --- a/src/icao_to_trace.rs +++ b/src/icao_to_trace.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::error::Error; use std::sync::Arc; use futures::{StreamExt, TryStreamExt}; @@ -84,8 +83,8 @@ async fn globe_history(icao: &str, date: &time::Date) -> Result, std::io headers.insert("Sec-Fetch-Site", "same-origin".parse().unwrap()); headers.insert("TE", "trailers".parse().unwrap()); - // Retry up to 3 times with increasing intervals between attempts. - let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3); + // Retry up to 5 times with increasing intervals between attempts. + let retry_policy = ExponentialBackoff::builder().build_with_max_retries(5); let client = ClientBuilder::new(reqwest::Client::new()) .with(RetryTransientMiddleware::new_with_policy(retry_policy)) .build(); @@ -125,7 +124,7 @@ async fn globe_history_cached( icao: &str, date: &time::Date, client: Option<&fs_azure::ContainerClient>, -) -> Result, Box> { +) -> Result, std::io::Error> { let blob_name = cache_file_path(icao, date); let action = fs::CacheAction::from_date(&date); let fetch = globe_history(&icao, date); @@ -149,7 +148,7 @@ pub async fn trace_cached( icao: &str, date: &time::Date, client: Option<&fs_azure::ContainerClient>, -) -> Result, Box> { +) -> Result, std::io::Error> { let data = globe_history_cached(icao, date, client).await?; let mut value = serde_json::from_slice::(&data)?; @@ -171,7 +170,7 @@ pub async fn positions( icao_number: &str, date: time::Date, client: Option<&fs_azure::ContainerClient>, -) -> Result, Box> { +) -> Result, std::io::Error> { use time::ext::NumericalDuration; let icao: Arc = icao_number.to_string().into(); trace_cached(icao_number, &date, client) @@ -214,7 +213,7 @@ pub(crate) async fn cached_aircraft_positions( to: Date, icao_number: &str, client: Option<&super::fs_azure::ContainerClient>, -) -> Result>, Box> { +) -> Result>, std::io::Error> { let dates = super::DateIter { from, to, @@ -222,7 +221,7 @@ pub(crate) async fn cached_aircraft_positions( }; let tasks = dates.map(|date| async move { - Result::<_, Box>::Ok(( + Result::<_, std::io::Error>::Ok(( date.clone(), positions(icao_number, date, client) .await? @@ -237,4 +236,4 @@ pub(crate) async fn cached_aircraft_positions( .await } -pub use crate::trace_month::aircraft_positions; +pub use crate::trace_month::*; diff --git a/src/trace_month.rs b/src/trace_month.rs index 9a8c88e..453c15b 100644 --- a/src/trace_month.rs +++ b/src/trace_month.rs @@ -35,8 +35,8 @@ fn get_month(current: &time::Date) -> (time::Date, time::Date) { (first_of_month, first_of_next_month) } -async fn month_positions( - month: &time::Date, +pub async fn month_positions( + month: time::Date, icao_number: &str, client: Option<&super::fs_azure::ContainerClient>, ) -> Result>, Box> { @@ -49,9 +49,7 @@ async fn month_positions( // returns positions in the month, cached let fetch = async { - let positions = cached_aircraft_positions(from, to, icao_number, client) - .await - .unwrap(); + let positions = cached_aircraft_positions(from, to, icao_number, client).await?; let positions = positions .into_iter() @@ -94,7 +92,7 @@ pub async fn aircraft_positions( let tasks = months .into_iter() - .map(|month| async move { month_positions(&month, icao_number, client).await }); + .map(|month| async move { month_positions(month, icao_number, client).await }); let positions = futures::stream::iter(tasks) // limit concurrent tasks