diff --git a/README.md b/README.md index 9af0d1e..f97a6a2 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,16 @@ cargo run --example country -- --from=2024-01-13 --to=2024-01-21 --country=portu # Story about German private jets that flew in 2023, where secret is on a file cargo run --example country -- --from=2023-01-01 --to=2024-01-01 --country=germany --access-key=DO00AUDGL32QLFKV8CEP --secret-access-key=$(cat secrets.txt) + +# Build database of positions `[2020, 2023]` +cargo run --release --example etl_positions -- --access-key=DO00AUDGL32QLFKV8CEP --secret-access-key=$(cat secrets.txt) +# they are available at +# https://private-jets.fra1.digitaloceanspaces.com/position/icao_number={icao}/month={year}-{month}/data.json + +# Build database of legs `[2020, 2023]` (over existing positions computed by `etl_positions`) +cargo run --release --example etl_legs -- --access-key=DO00AUDGL32QLFKV8CEP --secret-access-key=$(cat secrets.txt) +# they are available at +# https://private-jets.fra1.digitaloceanspaces.com/leg/v1/data/icao_number={icao}/month={year}-{month}/data.csv ``` ## Methodology diff --git a/examples/etl_legs.rs b/examples/etl_legs.rs index e0a47af..78572f6 100644 --- a/examples/etl_legs.rs +++ b/examples/etl_legs.rs @@ -8,7 +8,7 @@ use clap::Parser; use flights::{Aircraft, AircraftModels, BlobStorageProvider, Leg}; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; -use serde::Serialize; +use serde::{de::DeserializeOwned, Serialize}; use simple_logger::SimpleLogger; static DATABASE_ROOT: &'static str = "leg/v1/"; @@ -16,6 +16,7 @@ static DATABASE: &'static str = "leg/v1/data/"; #[derive(serde::Serialize, serde::Deserialize)] struct LegOut { + icao_number: String, tail_number: String, model: String, #[serde(with = "time::serde::rfc3339")] @@ -46,9 +47,7 @@ async fn write_json( let mut bytes: Vec = Vec::new(); serde_json::to_writer(&mut bytes, &d).map_err(std::io::Error::other)?; - Ok(client - .put(&format!("{DATABASE_ROOT}{key}.json"), bytes) - .await?) + Ok(client.put(key, bytes).await?) } async fn write_csv( @@ -65,17 +64,16 @@ async fn write_csv( Ok(()) } -async fn write( - icao_number: &Arc, - month: time::Date, +fn transform<'a>( + icao_number: &'a Arc, legs: Vec, - private_jets: &HashMap, Aircraft>, - models: &AircraftModels, - client: &impl BlobStorageProvider, -) -> Result<(), Box> { - let legs = legs.into_iter().map(|leg| { + private_jets: &'a HashMap, Aircraft>, + models: &'a AircraftModels, +) -> impl Iterator + 'a { + legs.into_iter().map(|leg| { let aircraft = private_jets.get(icao_number).expect(icao_number); LegOut { + icao_number: icao_number.to_string(), tail_number: aircraft.tail_number.to_string(), model: aircraft.model.to_string(), start: leg.from().datetime(), @@ -96,8 +94,15 @@ async fn write( leg.duration(), ) as usize, } - }); + }) +} +async fn write( + icao_number: &Arc, + month: time::Date, + legs: impl Iterator, + client: &impl BlobStorageProvider, +) -> Result<(), Box> { let key = format!( "{DATABASE}icao_number={icao_number}/month={}/data.csv", flights::month_to_part(&month) @@ -108,11 +113,11 @@ async fn write( Ok(()) } -async fn read( +async fn read( icao_number: &Arc, month: time::Date, client: &impl BlobStorageProvider, -) -> Result, Box> { +) -> Result, Box> { let key = format!( "{DATABASE}icao_number={icao_number}/month={}/data.csv", flights::month_to_part(&month) @@ -120,17 +125,13 @@ async fn read( let content = client.maybe_get(&key).await?.expect("File to be present"); csv::Reader::from_reader(&content[..]) - .deserialize() - .map(|x| { - let record: LegOut = x?; - Ok(record) - }) + .deserialize::() + .map(|x| Ok(x?)) .collect() } async fn private_jets( client: Option<&flights::fs_s3::ContainerClient>, - country: Option<&str>, ) -> Result, Box> { // load datasets to memory let aircrafts = flights::load_aircrafts(client).await?; @@ -139,11 +140,6 @@ async fn private_jets( Ok(aircrafts .into_iter() // its primary use is to be a private jet - .filter(|(_, a)| { - country - .map(|country| a.country.as_deref() == Some(country)) - .unwrap_or(true) - }) .filter_map(|(_, a)| models.contains_key(&a.model).then_some(a)) .collect()) } @@ -164,6 +160,73 @@ struct Cli { country: Option, } +async fn etl_task( + icao_number: &Arc, + month: time::Date, + private_jets: &HashMap, Aircraft>, + models: &AircraftModels, + client: Option<&flights::fs_s3::ContainerClient>, +) -> Result<(), Box> { + // extract + let positions = flights::month_positions(month, &icao_number, client).await?; + // transform + let legs = transform( + &icao_number, + flights::legs(positions.into_iter()), + &private_jets, + &models, + ); + // load + write(&icao_number, month, legs, client.unwrap()).await +} + +async fn aggregate( + private_jets: Vec, + required: usize, + client: &impl BlobStorageProvider, +) -> Result<(), Box> { + let private_jets = private_jets + .into_iter() + .map(|a| a.icao_number) + .collect::>(); + + let completed = flights::existing(DATABASE, client) + .await? + .into_iter() + .filter(|(icao, _)| private_jets.contains(icao)) + .collect::>(); + + let tasks = completed + .iter() + .map(|(icao, date)| async move { read::(icao, *date, client).await }); + + log::info!("Gettings all legs"); + let legs = futures::stream::iter(tasks) + .buffered(20) + .try_collect::>() + .await? + .into_iter() + .flatten(); + + log::info!("Writing all legs"); + let key = format!("{DATABASE_ROOT}all.csv"); + write_csv(legs, &key, client).await?; + log::info!("Written {key}"); + + let key = format!("{DATABASE_ROOT}status.json"); + write_json( + client, + Metadata { + icao_months_to_process: required, + icao_months_processed: completed.len(), + }, + &key, + ) + .await?; + log::info!("status written"); + Ok(()) +} + #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { SimpleLogger::new() @@ -178,9 +241,17 @@ async fn main() -> Result<(), Box> { let models = flights::load_private_jet_models()?; let months = (2020..2024).cartesian_product(1..=12u8).count(); - let relevant_jets = private_jets(Some(&client), cli.country.as_deref()) - .await? + let private_jets = private_jets(Some(&client)).await?; + let relevant_jets = private_jets + .clone() .into_iter() + // in the country + .filter(|a| { + cli.country + .as_deref() + .map(|country| a.country.as_deref() == Some(country)) + .unwrap_or(true) + }) .map(|a| (a.icao_number.clone(), a)) .collect::>(); let required = relevant_jets.len() * months; @@ -209,17 +280,7 @@ async fn main() -> Result<(), Box> { let models = ⊧ let tasks = todo.into_iter().map(|(icao_number, month)| async move { - let positions = flights::month_positions(*month, &icao_number, client).await?; - let legs = flights::legs(positions.into_iter()); - write( - &icao_number, - *month, - legs, - &relevant_jets, - &models, - client.unwrap(), - ) - .await + etl_task(icao_number, *month, &relevant_jets, &models, client).await }); let _ = futures::stream::iter(tasks) @@ -227,45 +288,5 @@ async fn main() -> Result<(), Box> { .try_collect::>() .await?; - let private_jets = private_jets(client, None) - .await? - .into_iter() - .map(|a| (a.icao_number.clone(), a)) - .collect::>(); - - let client = client.unwrap(); - let completed = flights::existing(DATABASE, client) - .await? - .into_iter() - .filter(|(icao, _)| private_jets.contains_key(icao)) - .collect::>(); - - let tasks = completed - .iter() - .map(|(icao, date)| async move { read(icao, *date, client).await }); - - log::info!("Gettings all legs"); - let legs = futures::stream::iter(tasks) - .buffered(20) - .try_collect::>() - .await? - .into_iter() - .flatten(); - - let key = format!("{DATABASE_ROOT}all.csv"); - write_csv(legs, &key, client).await?; - log::info!("Written {key}"); - - write_json( - client, - Metadata { - icao_months_to_process: required, - icao_months_processed: completed.len(), - }, - "status", - ) - .await?; - log::info!("status written"); - - Ok(()) + aggregate(private_jets, required, client.unwrap()).await } diff --git a/examples/export_legs.rs b/examples/etl_positions.rs similarity index 100% rename from examples/export_legs.rs rename to examples/etl_positions.rs diff --git a/methodology.md b/methodology.md index d61cf75..7d81738 100644 --- a/methodology.md +++ b/methodology.md @@ -28,19 +28,19 @@ in extracting the database of all aircrafts in https://globe.adsbexchange.com. Details are available in the source code, [src/aircraft_db.rs](./src/aircraft_db.rs). -### M-2: Identify aircraft types whose primary use is to be a private flying +### M-2: Identify aircraft types whose primary use is to be a private use This was performed by a human, and consisted in going through different aircraft manufacturers' websites and identifying the aircrafts that were advertised as used -for private flying. +for private use. For example, `Dassault Falcon 2000` is advertised as a -private jet on https://www.dassaultfalcon.com/aircraft/overview-of-the-fleet/. +private use on https://www.dassaultfalcon.com/aircraft/overview-of-the-fleet/. This is stored in [`./src/models.csv`](./src/models.csv). **NOTE**: not all uses of a model whose primary use is to be a private jet is -private jet. For example, models are sometimes used for emergency services. +for private use. For example, models are sometimes used for emergency services. ### M-3: Identify ICAO number's route in a day @@ -55,8 +55,8 @@ Source code is available at [src/icao_to_trace.rs](./src/icao_to_trace.rs). ### M-4: Identify legs of a route -This is performed automatically by the computer program. A leg is defined in this methodology -has a continuous sequence of ADS-B positions in time where the aircraft is flying. +This is performed automatically by the solution. A leg is defined in this methodology +as a continuous sequence of ADS-B positions in time where the aircraft is flying. The aircraft at a given segment between two ADS-B positions is considered grounded (not flying) when any of: 1. both positions are on the ground @@ -132,7 +132,7 @@ This was performed by a human, and consisted in extracting the ownership of the tail number from website https://www.danishaircraft.dk. For example `OY-CKK` results in 3 records, whose most recent, `OY-CKK(3)`, is registered -to owned by `Kirkbi Invest A/S`. +to be owned by `Kirkbi Invest A/S`. This is stored in [`./src/owners.csv`](./src/owners.csv).