Skip to content

Commit

Permalink
Split legs file by year, added airports, focus on main airports (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecardleitao authored Mar 4, 2024
1 parent 0da3d64 commit 24ba17b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 92 deletions.
57 changes: 0 additions & 57 deletions examples/clean_cache.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/airports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn airports_cached() -> Result<Vec<Airport>, Box<dyn std::error::Error
let record: Airport = r.unwrap();
record
})
.filter(|airport| airport.type_ != "heliport")
.filter(|airport| airport.type_ == "medium_airport" || airport.type_ == "large_airport")
.collect::<Vec<_>>();

Ok(data)
Expand Down
103 changes: 70 additions & 33 deletions src/bin/etl_legs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use clap::Parser;
use flights::{Aircraft, AircraftModels, BlobStorageProvider, Leg};
use flights::{Aircraft, AircraftModels, Airport, BlobStorageProvider, Leg};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use serde::{de::DeserializeOwned, Serialize};
Expand All @@ -24,8 +24,10 @@ struct LegOut {
end: time::OffsetDateTime,
from_lat: f64,
from_lon: f64,
from_airport: String,
to_lat: f64,
to_lon: f64,
to_airport: String,
distance: f64,
duration: f64,
commercial_emissions_kg: usize,
Expand All @@ -36,6 +38,7 @@ struct LegOut {
struct Metadata {
icao_months_to_process: usize,
icao_months_processed: usize,
url: String,
}

async fn write_json(
Expand Down Expand Up @@ -68,6 +71,7 @@ fn transform<'a>(
legs: Vec<Leg>,
private_jets: &'a HashMap<Arc<str>, Aircraft>,
models: &'a AircraftModels,
airports: &'a [Airport],
) -> impl Iterator<Item = LegOut> + 'a {
legs.into_iter().map(|leg| {
let aircraft = private_jets.get(icao_number).expect(icao_number);
Expand All @@ -78,8 +82,10 @@ fn transform<'a>(
end: leg.to().datetime(),
from_lat: leg.from().latitude(),
from_lon: leg.from().longitude(),
from_airport: flights::closest(leg.from().pos(), airports).name,
to_lat: leg.to().latitude(),
to_lon: leg.to().longitude(),
to_airport: flights::closest(leg.to().pos(), airports).name,
distance: leg.distance(),
duration: leg.duration().as_seconds_f64() / 60.0 / 60.0,
commercial_emissions_kg: flights::emissions(
Expand Down Expand Up @@ -163,6 +169,7 @@ async fn etl_task(
month: time::Date,
private_jets: &HashMap<Arc<str>, Aircraft>,
models: &AircraftModels,
airports: &[Airport],
client: Option<&flights::fs_s3::ContainerClient>,
) -> Result<(), Box<dyn Error>> {
// extract
Expand All @@ -173,54 +180,74 @@ async fn etl_task(
flights::legs(positions.into_iter()),
&private_jets,
&models,
&airports,
);
// load
write(&icao_number, month, legs, client.unwrap()).await
}

async fn aggregate(
private_jets: Vec<Aircraft>,
required: usize,
client: &impl BlobStorageProvider,
models: &AircraftModels,
airports: &[Airport],
client: &flights::fs_s3::ContainerClient,
) -> Result<(), Box<dyn Error>> {
let private_jets = private_jets
.into_iter()
.map(|a| a.icao_number)
.collect::<HashSet<_>>();
.map(|a| (a.icao_number.clone(), a))
.collect::<HashMap<_, _>>();

let completed = flights::existing(DATABASE, client)
.await?
.into_iter()
.filter(|(icao, _)| private_jets.contains(icao))
.filter(|(icao, _)| private_jets.contains_key(icao))
.collect::<HashSet<_>>();

let tasks = completed
.iter()
.map(|(icao, date)| async move { read::<LegOut>(icao, *date, client).await });

log::info!("Gettings all legs");
let legs = futures::stream::iter(tasks)
.buffered(20)
.try_collect::<Vec<_>>()
.await?
// group completed by year
let by_year = completed
.into_iter()
.flatten();

log::info!("Writing all legs");
let key = format!("{DATABASE_ROOT}all.csv");
write_csv(legs, &key, client).await?;
log::info!("Written {key}");
.fold(HashMap::<i32, HashSet<_>>::new(), |mut acc, v| {
acc.entry(v.1.year())
.and_modify(|entries| {
entries.insert(v.clone());
})
.or_insert(HashSet::from([v]));
acc
});

// run tasks by year
let private_jets = &private_jets;
let models = &models;
let mut metadata = HashMap::<i32, Metadata>::new();
for (year, completed) in by_year {
let tasks = completed.iter().map(|(icao_number, date)| async move {
read::<LegOut>(icao_number, *date, client).await
});

log::info!("Gettings all legs for year={year}");
let legs = futures::stream::iter(tasks)
.buffered(100)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten();

log::info!("Writing all legs for year={year}");
let key = format!("{DATABASE_ROOT}all/year={year}/data.csv");
write_csv(legs, &key, client).await?;
log::info!("Written {key}");
metadata.insert(
year,
Metadata {
icao_months_to_process: private_jets.len() * 12,
icao_months_processed: completed.len(),
url: format!("https://private-jets.fra1.digitaloceanspaces.com/{key}"),
},
);
}

let key = format!("{DATABASE_ROOT}status.json");
write_json(
client,
Metadata {
icao_months_to_process: required,
icao_months_processed: completed.len(),
},
&key,
)
.await?;
write_json(client, metadata, &key).await?;
log::info!("status written");
Ok(())
}
Expand All @@ -237,8 +264,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await;

let models = flights::load_private_jet_models()?;
let airports = flights::airports_cached().await?;

let months = (2020..2024).cartesian_product(1..=12u8).count();
let months = (2019..2024).cartesian_product(1..=12u8).count();
let private_jets = private_jets(Some(&client)).await?;
let relevant_jets = private_jets
.clone()
Expand Down Expand Up @@ -276,15 +304,24 @@ async fn main() -> Result<(), Box<dyn Error>> {
let client = Some(&client);
let relevant_jets = &relevant_jets;
let models = &models;
let airports = &airports;

let tasks = todo.into_iter().map(|(icao_number, month)| async move {
etl_task(icao_number, *month, &relevant_jets, &models, client).await
etl_task(
icao_number,
*month,
&relevant_jets,
&models,
&airports,
client,
)
.await
});

let _ = futures::stream::iter(tasks)
.buffered(20)
.try_collect::<Vec<_>>()
.await?;

aggregate(private_jets, required, client.unwrap()).await
aggregate(private_jets, &models, &airports, client.unwrap()).await
}
2 changes: 1 addition & 1 deletion src/bin/etl_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await;

let months = (2020..2024)
let months = (2019..2024)
.rev()
.cartesian_product(1..=12u8)
.map(|(year, month)| {
Expand Down

0 comments on commit 24ba17b

Please sign in to comment.