Skip to content

Commit

Permalink
Improved ETL
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecardleitao committed Feb 18, 2024
1 parent 3dda49f commit 17fb1dd
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 10 deletions.
38 changes: 30 additions & 8 deletions examples/etl_legs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
use clap::Parser;
use flights::{fs_s3, Aircraft, AircraftModels, BlobStorageProvider, Leg};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use serde::Serialize;
use simple_logger::SimpleLogger;

Expand Down Expand Up @@ -137,6 +138,26 @@ async fn existing(
.collect())
}

async fn private_jets(
client: Option<&flights::fs_s3::ContainerClient>,
country: Option<&str>,
) -> Result<Vec<Aircraft>, Box<dyn std::error::Error>> {
// load datasets to memory
let aircrafts = flights::load_aircrafts(client).await?;
let models = flights::load_private_jet_models()?;

Ok(aircrafts
.into_iter()
// its primary use is to be a private jet
.filter(|(_, a)| {
country
.map(|country| a.country.as_deref() == Some(country))
.unwrap_or(true)
})
.filter_map(|(_, a)| models.contains_key(&a.model).then_some(a))
.collect())
}

const ABOUT: &'static str = r#"Builds the database of all legs"#;

#[derive(Parser, Debug)]
Expand All @@ -161,19 +182,19 @@ async fn main() -> Result<(), Box<dyn Error>> {

let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await;

let aircrafts = flights::load_aircrafts(Some(&client)).await?;
let models = flights::load_private_jet_models()?;

let private_jets = aircrafts
let months = (2023..2024).cartesian_product(1..=12u8).count();
let private_jets = private_jets(Some(&client), None)
.await?
.into_iter()
// its primary use is to be a private jet
.filter_map(|(_, a)| models.contains_key(&a.model).then_some(a))
.map(|a| (a.icao_number.clone(), a))
.collect::<HashMap<_, _>>();
let required = private_jets.len() * 1 * 12;
let required = private_jets.len() * months;
log::info!("required : {}", required);

let ready = flights::existing_months_positions(&client).await?;
let ready = ready
let ready = flights::existing_months_positions(&client)
.await?
.into_iter()
.filter(|(icao, _)| private_jets.contains_key(icao))
.collect::<HashSet<_>>();
Expand All @@ -186,7 +207,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
.collect::<HashSet<_>>();
log::info!("completed: {}", completed.len());

let todo = ready.difference(&completed).collect::<Vec<_>>();
let mut todo = ready.difference(&completed).collect::<Vec<_>>();
todo.sort_unstable_by_key(|(icao, date)| (date, icao));
log::info!("todo : {}", todo.len());

let client = Some(&client);
Expand Down
15 changes: 13 additions & 2 deletions examples/export_legs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ struct Cli {
/// The token to the remote storage
#[arg(long)]
secret_access_key: String,
/// Optional country to fetch from (in ISO 3166); defaults to whole world
#[arg(long)]
country: Option<String>,
}

async fn private_jets(
client: Option<&flights::fs_s3::ContainerClient>,
country: Option<&str>,
) -> Result<Vec<Aircraft>, Box<dyn std::error::Error>> {
// load datasets to memory
let aircrafts = flights::load_aircrafts(client).await?;
Expand All @@ -30,6 +34,11 @@ async fn private_jets(
Ok(aircrafts
.into_iter()
// its primary use is to be a private jet
.filter(|(_, a)| {
country
.map(|country| a.country.as_deref() == Some(country))
.unwrap_or(true)
})
.filter_map(|(_, a)| models.contains_key(&a.model).then_some(a))
.collect())
}
Expand All @@ -46,12 +55,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await;

let months = (2023..2024)
.rev()
.cartesian_product(1..=12u8)
.map(|(year, month)| {
time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1)
.expect("day 1 never errors")
});
let private_jets = private_jets(Some(&client)).await?;
let private_jets = private_jets(Some(&client), cli.country.as_deref()).await?;
log::info!("jets : {}", private_jets.len());
let required = private_jets
.into_iter()
Expand All @@ -62,7 +72,8 @@ async fn main() -> Result<(), Box<dyn Error>> {

let completed = existing_months_positions(&client).await?;
log::info!("completed: {}", completed.len());
let todo = required.difference(&completed).collect::<HashSet<_>>();
let mut todo = required.difference(&completed).collect::<Vec<_>>();
todo.sort_unstable_by_key(|(icao, date)| (date, icao));
log::info!("todo : {}", todo.len());

let tasks = todo
Expand Down

0 comments on commit 17fb1dd

Please sign in to comment.