From 24ba17bb37d3f29bfd48459a534bf74f12170457 Mon Sep 17 00:00:00 2001 From: jorgecardleitao <149073281+jorgecardleitao@users.noreply.github.com> Date: Mon, 4 Mar 2024 08:15:14 +0100 Subject: [PATCH] Split legs file by year, added airports, focus on main airports (#49) --- examples/clean_cache.rs | 57 ---------------------- src/airports.rs | 2 +- src/bin/etl_legs.rs | 103 ++++++++++++++++++++++++++------------- src/bin/etl_positions.rs | 2 +- 4 files changed, 72 insertions(+), 92 deletions(-) delete mode 100644 examples/clean_cache.rs diff --git a/examples/clean_cache.rs b/examples/clean_cache.rs deleted file mode 100644 index 9f2d384..0000000 --- a/examples/clean_cache.rs +++ /dev/null @@ -1,57 +0,0 @@ -use clap::Parser; - -use flights::{fs_s3::ContainerClient, BlobStorageProvider}; -use futures::StreamExt; -use simple_logger::SimpleLogger; - -async fn delete(client: &ContainerClient) -> Result<(), Box> { - let tasks = client.list("position/icao_number=3b9b60").await?; - - log::info!("{}", tasks.len()); - let tasks = tasks - .into_iter() - .map(|blob| async move { client.delete(&blob).await }); - - futures::stream::iter(tasks) - // limit concurrent tasks - .buffered(200) - // continue if error - .map(|r| { - if let Err(e) = r { - log::error!("{e}"); - } - }) - .collect::>() - .await; - - Ok(()) -} - -#[derive(Parser, Debug)] -#[command(author, version)] -struct Cli { - /// The token to the remote storage - #[arg(long)] - access_key: String, - /// The token to the remote storage - #[arg(long)] - secret_access_key: String, - /// Optional country to fetch from (in ISO 3166); defaults to whole world - #[arg(long)] - country: Option, -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - SimpleLogger::new() - .with_level(log::LevelFilter::Info) - .init() - .unwrap(); - - let cli = Cli::parse(); - - let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await; - - delete(&client).await?; - Ok(()) -} diff --git a/src/airports.rs b/src/airports.rs index c5b0d55..0054ec4 100644 --- a/src/airports.rs +++ b/src/airports.rs @@ -36,7 +36,7 @@ pub async fn airports_cached() -> Result, Box>(); Ok(data) diff --git a/src/bin/etl_legs.rs b/src/bin/etl_legs.rs index ea5861b..649cb12 100644 --- a/src/bin/etl_legs.rs +++ b/src/bin/etl_legs.rs @@ -5,7 +5,7 @@ use std::{ }; use clap::Parser; -use flights::{Aircraft, AircraftModels, BlobStorageProvider, Leg}; +use flights::{Aircraft, AircraftModels, Airport, BlobStorageProvider, Leg}; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use serde::{de::DeserializeOwned, Serialize}; @@ -24,8 +24,10 @@ struct LegOut { end: time::OffsetDateTime, from_lat: f64, from_lon: f64, + from_airport: String, to_lat: f64, to_lon: f64, + to_airport: String, distance: f64, duration: f64, commercial_emissions_kg: usize, @@ -36,6 +38,7 @@ struct LegOut { struct Metadata { icao_months_to_process: usize, icao_months_processed: usize, + url: String, } async fn write_json( @@ -68,6 +71,7 @@ fn transform<'a>( legs: Vec, private_jets: &'a HashMap, Aircraft>, models: &'a AircraftModels, + airports: &'a [Airport], ) -> impl Iterator + 'a { legs.into_iter().map(|leg| { let aircraft = private_jets.get(icao_number).expect(icao_number); @@ -78,8 +82,10 @@ fn transform<'a>( end: leg.to().datetime(), from_lat: leg.from().latitude(), from_lon: leg.from().longitude(), + from_airport: flights::closest(leg.from().pos(), airports).name, to_lat: leg.to().latitude(), to_lon: leg.to().longitude(), + to_airport: flights::closest(leg.to().pos(), airports).name, distance: leg.distance(), duration: leg.duration().as_seconds_f64() / 60.0 / 60.0, commercial_emissions_kg: flights::emissions( @@ -163,6 +169,7 @@ async fn etl_task( month: time::Date, private_jets: &HashMap, Aircraft>, models: &AircraftModels, + airports: &[Airport], client: Option<&flights::fs_s3::ContainerClient>, ) -> Result<(), Box> { // extract @@ -173,6 +180,7 @@ async fn etl_task( flights::legs(positions.into_iter()), &private_jets, &models, + &airports, ); // load write(&icao_number, month, legs, client.unwrap()).await @@ -180,47 +188,66 @@ async fn etl_task( async fn aggregate( private_jets: Vec, - required: usize, - client: &impl BlobStorageProvider, + models: &AircraftModels, + airports: &[Airport], + client: &flights::fs_s3::ContainerClient, ) -> Result<(), Box> { let private_jets = private_jets .into_iter() - .map(|a| a.icao_number) - .collect::>(); + .map(|a| (a.icao_number.clone(), a)) + .collect::>(); let completed = flights::existing(DATABASE, client) .await? .into_iter() - .filter(|(icao, _)| private_jets.contains(icao)) + .filter(|(icao, _)| private_jets.contains_key(icao)) .collect::>(); - let tasks = completed - .iter() - .map(|(icao, date)| async move { read::(icao, *date, client).await }); - - log::info!("Gettings all legs"); - let legs = futures::stream::iter(tasks) - .buffered(20) - .try_collect::>() - .await? + // group completed by year + let by_year = completed .into_iter() - .flatten(); - - log::info!("Writing all legs"); - let key = format!("{DATABASE_ROOT}all.csv"); - write_csv(legs, &key, client).await?; - log::info!("Written {key}"); + .fold(HashMap::>::new(), |mut acc, v| { + acc.entry(v.1.year()) + .and_modify(|entries| { + entries.insert(v.clone()); + }) + .or_insert(HashSet::from([v])); + acc + }); + + // run tasks by year + let private_jets = &private_jets; + let models = ⊧ + let mut metadata = HashMap::::new(); + for (year, completed) in by_year { + let tasks = completed.iter().map(|(icao_number, date)| async move { + read::(icao_number, *date, client).await + }); + + log::info!("Gettings all legs for year={year}"); + let legs = futures::stream::iter(tasks) + .buffered(100) + .try_collect::>() + .await? + .into_iter() + .flatten(); + + log::info!("Writing all legs for year={year}"); + let key = format!("{DATABASE_ROOT}all/year={year}/data.csv"); + write_csv(legs, &key, client).await?; + log::info!("Written {key}"); + metadata.insert( + year, + Metadata { + icao_months_to_process: private_jets.len() * 12, + icao_months_processed: completed.len(), + url: format!("https://private-jets.fra1.digitaloceanspaces.com/{key}"), + }, + ); + } let key = format!("{DATABASE_ROOT}status.json"); - write_json( - client, - Metadata { - icao_months_to_process: required, - icao_months_processed: completed.len(), - }, - &key, - ) - .await?; + write_json(client, metadata, &key).await?; log::info!("status written"); Ok(()) } @@ -237,8 +264,9 @@ async fn main() -> Result<(), Box> { let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await; let models = flights::load_private_jet_models()?; + let airports = flights::airports_cached().await?; - let months = (2020..2024).cartesian_product(1..=12u8).count(); + let months = (2019..2024).cartesian_product(1..=12u8).count(); let private_jets = private_jets(Some(&client)).await?; let relevant_jets = private_jets .clone() @@ -276,9 +304,18 @@ async fn main() -> Result<(), Box> { let client = Some(&client); let relevant_jets = &relevant_jets; let models = ⊧ + let airports = &airports; let tasks = todo.into_iter().map(|(icao_number, month)| async move { - etl_task(icao_number, *month, &relevant_jets, &models, client).await + etl_task( + icao_number, + *month, + &relevant_jets, + &models, + &airports, + client, + ) + .await }); let _ = futures::stream::iter(tasks) @@ -286,5 +323,5 @@ async fn main() -> Result<(), Box> { .try_collect::>() .await?; - aggregate(private_jets, required, client.unwrap()).await + aggregate(private_jets, &models, &airports, client.unwrap()).await } diff --git a/src/bin/etl_positions.rs b/src/bin/etl_positions.rs index 1352f24..f27df7a 100644 --- a/src/bin/etl_positions.rs +++ b/src/bin/etl_positions.rs @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box> { let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await; - let months = (2020..2024) + let months = (2019..2024) .rev() .cartesian_product(1..=12u8) .map(|(year, month)| {