From dbe8f53a4553f08159bc82327103b766df7a903c Mon Sep 17 00:00:00 2001 From: Jorge Date: Mon, 4 Mar 2024 07:00:53 +0000 Subject: [PATCH] Added airports to legs --- src/airports.rs | 2 +- src/bin/etl_legs.rs | 49 ++++++++++++++++++++++++---------------- src/bin/etl_positions.rs | 2 +- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/src/airports.rs b/src/airports.rs index c5b0d55..0054ec4 100644 --- a/src/airports.rs +++ b/src/airports.rs @@ -36,7 +36,7 @@ pub async fn airports_cached() -> Result, Box>(); Ok(data) diff --git a/src/bin/etl_legs.rs b/src/bin/etl_legs.rs index b443168..06a2be4 100644 --- a/src/bin/etl_legs.rs +++ b/src/bin/etl_legs.rs @@ -5,7 +5,7 @@ use std::{ }; use clap::Parser; -use flights::{Aircraft, AircraftModels, BlobStorageProvider, Leg}; +use flights::{Aircraft, AircraftModels, Airport, BlobStorageProvider, Leg}; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use serde::{de::DeserializeOwned, Serialize}; @@ -24,8 +24,10 @@ struct LegOut { end: time::OffsetDateTime, from_lat: f64, from_lon: f64, + from_airport: String, to_lat: f64, to_lon: f64, + to_airport: String, distance: f64, duration: f64, commercial_emissions_kg: usize, @@ -69,6 +71,7 @@ fn transform<'a>( legs: Vec, private_jets: &'a HashMap, Aircraft>, models: &'a AircraftModels, + airports: &'a [Airport], ) -> impl Iterator + 'a { legs.into_iter().map(|leg| { let aircraft = private_jets.get(icao_number).expect(icao_number); @@ -79,8 +82,10 @@ fn transform<'a>( end: leg.to().datetime(), from_lat: leg.from().latitude(), from_lon: leg.from().longitude(), + from_airport: flights::closest(leg.from().pos(), airports).name, to_lat: leg.to().latitude(), to_lon: leg.to().longitude(), + to_airport: flights::closest(leg.to().pos(), airports).name, distance: leg.distance(), duration: leg.duration().as_seconds_f64() / 60.0 / 60.0, commercial_emissions_kg: flights::emissions( @@ -164,6 +169,7 @@ async fn etl_task( month: time::Date, private_jets: &HashMap, Aircraft>, models: &AircraftModels, + airports: &[Airport], client: Option<&flights::fs_s3::ContainerClient>, ) -> Result<(), Box> { // extract @@ -174,6 +180,7 @@ async fn etl_task( flights::legs(positions.into_iter()), &private_jets, &models, + &airports, ); // load write(&icao_number, month, legs, client.unwrap()).await @@ -181,10 +188,10 @@ async fn etl_task( async fn aggregate( private_jets: Vec, + models: &AircraftModels, + airports: &[Airport], client: &flights::fs_s3::ContainerClient, ) -> Result<(), Box> { - let models = flights::load_private_jet_models()?; - let private_jets = private_jets .into_iter() .map(|a| (a.icao_number.clone(), a)) @@ -214,13 +221,7 @@ async fn aggregate( let mut metadata = HashMap::::new(); for (year, completed) in by_year { let tasks = completed.iter().map(|(icao_number, date)| async move { - let r = read::(icao_number, *date, client).await; - if let Err(_) = r { - etl_task(icao_number, *date, private_jets, models, Some(client)).await?; - return read::(icao_number, *date, client).await; - } else { - r - } + read::(icao_number, *date, client).await }); log::info!("Gettings all legs for year={year}"); @@ -240,7 +241,7 @@ async fn aggregate( Metadata { icao_months_to_process: private_jets.len() * 12, icao_months_processed: completed.len(), - url: format!("https://fra1.digitaloceanspaces.com/{key}"), + url: format!("https://private-jets.fra1.digitaloceanspaces.com/{key}"), }, ); } @@ -263,8 +264,9 @@ async fn main() -> Result<(), Box> { let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await; let models = flights::load_private_jet_models()?; + let airports = flights::airports_cached().await?; - let months = (2020..2024).cartesian_product(1..=12u8).count(); + let months = (2019..2024).cartesian_product(1..=12u8).count(); let private_jets = private_jets(Some(&client)).await?; let relevant_jets = private_jets .clone() @@ -288,11 +290,11 @@ async fn main() -> Result<(), Box> { .collect::>(); log::info!("ready : {}", ready.len()); - let completed = flights::existing(DATABASE, &client) - .await? - .into_iter() - .filter(|(icao, _)| relevant_jets.contains_key(icao)) - .collect::>(); + let completed = HashSet::new(); /*flights::existing(DATABASE, &client) + .await? + .into_iter() + .filter(|(icao, _)| relevant_jets.contains_key(icao)) + .collect::>();*/ log::info!("completed: {}", completed.len()); let mut todo = ready.difference(&completed).collect::>(); @@ -302,9 +304,18 @@ async fn main() -> Result<(), Box> { let client = Some(&client); let relevant_jets = &relevant_jets; let models = ⊧ + let airports = &airports; let tasks = todo.into_iter().map(|(icao_number, month)| async move { - etl_task(icao_number, *month, &relevant_jets, &models, client).await + etl_task( + icao_number, + *month, + &relevant_jets, + &models, + &airports, + client, + ) + .await }); let _ = futures::stream::iter(tasks) @@ -312,5 +323,5 @@ async fn main() -> Result<(), Box> { .try_collect::>() .await?; - aggregate(private_jets, client.unwrap()).await + aggregate(private_jets, &models, &airports, client.unwrap()).await } diff --git a/src/bin/etl_positions.rs b/src/bin/etl_positions.rs index 1352f24..f27df7a 100644 --- a/src/bin/etl_positions.rs +++ b/src/bin/etl_positions.rs @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box> { let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await; - let months = (2020..2024) + let months = (2019..2024) .rev() .cartesian_product(1..=12u8) .map(|(year, month)| {