Skip to content

Commit

Permalink
Added airports to legs
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecardleitao committed Mar 4, 2024
1 parent 409256f commit dbe8f53
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/airports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn airports_cached() -> Result<Vec<Airport>, Box<dyn std::error::Error
let record: Airport = r.unwrap();
record
})
.filter(|airport| airport.type_ != "heliport")
.filter(|airport| airport.type_ == "medium_airport" || airport.type_ == "large_airport")
.collect::<Vec<_>>();

Ok(data)
Expand Down
49 changes: 30 additions & 19 deletions src/bin/etl_legs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use clap::Parser;
use flights::{Aircraft, AircraftModels, BlobStorageProvider, Leg};
use flights::{Aircraft, AircraftModels, Airport, BlobStorageProvider, Leg};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use serde::{de::DeserializeOwned, Serialize};
Expand All @@ -24,8 +24,10 @@ struct LegOut {
end: time::OffsetDateTime,
from_lat: f64,
from_lon: f64,
from_airport: String,
to_lat: f64,
to_lon: f64,
to_airport: String,
distance: f64,
duration: f64,
commercial_emissions_kg: usize,
Expand Down Expand Up @@ -69,6 +71,7 @@ fn transform<'a>(
legs: Vec<Leg>,
private_jets: &'a HashMap<Arc<str>, Aircraft>,
models: &'a AircraftModels,
airports: &'a [Airport],
) -> impl Iterator<Item = LegOut> + 'a {
legs.into_iter().map(|leg| {
let aircraft = private_jets.get(icao_number).expect(icao_number);
Expand All @@ -79,8 +82,10 @@ fn transform<'a>(
end: leg.to().datetime(),
from_lat: leg.from().latitude(),
from_lon: leg.from().longitude(),
from_airport: flights::closest(leg.from().pos(), airports).name,
to_lat: leg.to().latitude(),
to_lon: leg.to().longitude(),
to_airport: flights::closest(leg.to().pos(), airports).name,
distance: leg.distance(),
duration: leg.duration().as_seconds_f64() / 60.0 / 60.0,
commercial_emissions_kg: flights::emissions(
Expand Down Expand Up @@ -164,6 +169,7 @@ async fn etl_task(
month: time::Date,
private_jets: &HashMap<Arc<str>, Aircraft>,
models: &AircraftModels,
airports: &[Airport],
client: Option<&flights::fs_s3::ContainerClient>,
) -> Result<(), Box<dyn Error>> {
// extract
Expand All @@ -174,17 +180,18 @@ async fn etl_task(
flights::legs(positions.into_iter()),
&private_jets,
&models,
&airports,
);
// load
write(&icao_number, month, legs, client.unwrap()).await
}

async fn aggregate(
private_jets: Vec<Aircraft>,
models: &AircraftModels,
airports: &[Airport],
client: &flights::fs_s3::ContainerClient,
) -> Result<(), Box<dyn Error>> {
let models = flights::load_private_jet_models()?;

let private_jets = private_jets
.into_iter()
.map(|a| (a.icao_number.clone(), a))
Expand Down Expand Up @@ -214,13 +221,7 @@ async fn aggregate(
let mut metadata = HashMap::<i32, Metadata>::new();
for (year, completed) in by_year {
let tasks = completed.iter().map(|(icao_number, date)| async move {
let r = read::<LegOut>(icao_number, *date, client).await;
if let Err(_) = r {
etl_task(icao_number, *date, private_jets, models, Some(client)).await?;
return read::<LegOut>(icao_number, *date, client).await;
} else {
r
}
read::<LegOut>(icao_number, *date, client).await
});

log::info!("Gettings all legs for year={year}");
Expand All @@ -240,7 +241,7 @@ async fn aggregate(
Metadata {
icao_months_to_process: private_jets.len() * 12,
icao_months_processed: completed.len(),
url: format!("https://fra1.digitaloceanspaces.com/{key}"),
url: format!("https://private-jets.fra1.digitaloceanspaces.com/{key}"),
},
);
}
Expand All @@ -263,8 +264,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await;

let models = flights::load_private_jet_models()?;
let airports = flights::airports_cached().await?;

let months = (2020..2024).cartesian_product(1..=12u8).count();
let months = (2019..2024).cartesian_product(1..=12u8).count();
let private_jets = private_jets(Some(&client)).await?;
let relevant_jets = private_jets
.clone()
Expand All @@ -288,11 +290,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
.collect::<HashSet<_>>();
log::info!("ready : {}", ready.len());

let completed = flights::existing(DATABASE, &client)
.await?
.into_iter()
.filter(|(icao, _)| relevant_jets.contains_key(icao))
.collect::<HashSet<_>>();
let completed = HashSet::new(); /*flights::existing(DATABASE, &client)
.await?
.into_iter()
.filter(|(icao, _)| relevant_jets.contains_key(icao))
.collect::<HashSet<_>>();*/
log::info!("completed: {}", completed.len());

let mut todo = ready.difference(&completed).collect::<Vec<_>>();
Expand All @@ -302,15 +304,24 @@ async fn main() -> Result<(), Box<dyn Error>> {
let client = Some(&client);
let relevant_jets = &relevant_jets;
let models = &models;
let airports = &airports;

let tasks = todo.into_iter().map(|(icao_number, month)| async move {
etl_task(icao_number, *month, &relevant_jets, &models, client).await
etl_task(
icao_number,
*month,
&relevant_jets,
&models,
&airports,
client,
)
.await
});

let _ = futures::stream::iter(tasks)
.buffered(20)
.try_collect::<Vec<_>>()
.await?;

aggregate(private_jets, client.unwrap()).await
aggregate(private_jets, &models, &airports, client.unwrap()).await
}
2 changes: 1 addition & 1 deletion src/bin/etl_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await;

let months = (2020..2024)
let months = (2019..2024)
.rev()
.cartesian_product(1..=12u8)
.map(|(year, month)| {
Expand Down

0 comments on commit dbe8f53

Please sign in to comment.