diff --git a/examples/etl_legs.rs b/examples/etl_legs.rs new file mode 100644 index 0000000..2e15edc --- /dev/null +++ b/examples/etl_legs.rs @@ -0,0 +1,248 @@ +use std::{ + collections::{HashMap, HashSet}, + error::Error, + sync::Arc, +}; + +use clap::Parser; +use flights::{fs_s3, Aircraft, AircraftModels, BlobStorageProvider, Leg}; +use futures::{StreamExt, TryStreamExt}; +use serde::Serialize; +use simple_logger::SimpleLogger; + +static DATABASE_ROOT: &'static str = "leg/v1/"; +static DATABASE: &'static str = "leg/v1/data/"; + +#[derive(serde::Serialize, serde::Deserialize)] +struct LegOut { + tail_number: String, + model: String, + start: String, + end: String, + from_lat: f64, + from_lon: f64, + to_lat: f64, + to_lon: f64, + distance: f64, + duration: f64, + commercial_emissions_kg: usize, + emissions_kg: usize, +} + +#[derive(serde::Serialize)] +struct Metadata { + icao_months_to_process: usize, + icao_months_processed: usize, +} + +async fn write_json( + client: &fs_s3::ContainerClient, + d: impl Serialize, + key: &str, +) -> Result<(), Box> { + let mut bytes: Vec = Vec::new(); + serde_json::to_writer(&mut bytes, &d).map_err(std::io::Error::other)?; + + Ok(client + .put(&format!("{DATABASE_ROOT}{key}.json"), bytes) + .await?) +} + +async fn write_csv( + items: impl Iterator, + key: &str, + client: &fs_s3::ContainerClient, +) -> Result<(), Box> { + let mut wtr = csv::Writer::from_writer(vec![]); + for leg in items { + wtr.serialize(leg).unwrap() + } + let data_csv = wtr.into_inner().unwrap(); + client.put(&key, data_csv).await?; + Ok(()) +} + +async fn write( + icao_number: &Arc, + month: time::Date, + legs: Vec, + private_jets: &HashMap, Aircraft>, + models: &AircraftModels, + client: &fs_s3::ContainerClient, +) -> Result<(), Box> { + let legs = legs.into_iter().map(|leg| { + let aircraft = private_jets.get(icao_number).expect(icao_number); + LegOut { + tail_number: aircraft.tail_number.to_string(), + model: aircraft.model.to_string(), + start: leg.from().datetime().to_string(), + end: leg.to().datetime().to_string(), + from_lat: leg.from().latitude(), + from_lon: leg.from().longitude(), + to_lat: leg.to().latitude(), + to_lon: leg.to().longitude(), + distance: leg.distance(), + duration: leg.duration().as_seconds_f64() / 60.0 / 60.0, + commercial_emissions_kg: flights::emissions( + leg.from().pos(), + leg.to().pos(), + flights::Class::First, + ) as usize, + emissions_kg: flights::leg_co2e_kg( + models.get(&aircraft.model).expect(&aircraft.model).gph as f64, + leg.duration(), + ) as usize, + } + }); + + let key = format!( + "{DATABASE}icao_number={icao_number}/month={}/data.csv", + flights::month_to_part(&month) + ); + + write_csv(legs, &key, client).await?; + log::info!("Written {} {}", icao_number, month); + log::info!("Written {} {}", icao_number, month); + Ok(()) +} + +async fn read( + icao_number: &Arc, + month: time::Date, + client: &fs_s3::ContainerClient, +) -> Result, Box> { + let key = format!( + "{DATABASE}icao_number={icao_number}/month={}/data.csv", + flights::month_to_part(&month) + ); + let content = client.maybe_get(&key).await?.expect("File to be present"); + + csv::Reader::from_reader(&content[..]) + .deserialize() + .map(|x| { + let record: LegOut = x?; + Ok(record) + }) + .collect() +} + +async fn existing( + client: &flights::fs_s3::ContainerClient, +) -> Result, time::Date)>, flights::fs_s3::Error> { + Ok(client + .list(DATABASE) + .await? + .into_iter() + .map(|blob| flights::blob_name_to_pk(DATABASE, &blob)) + .collect()) +} + +const ABOUT: &'static str = r#"Builds the database of all legs"#; + +#[derive(Parser, Debug)] +#[command(author, version, about = ABOUT)] +struct Cli { + /// The token to the remote storage + #[arg(long)] + access_key: String, + /// The token to the remote storage + #[arg(long)] + secret_access_key: String, +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<(), Box> { + SimpleLogger::new() + .with_level(log::LevelFilter::Info) + .init() + .unwrap(); + + let cli = Cli::parse(); + + let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await; + + let aircrafts = flights::load_aircrafts(Some(&client)).await?; + let models = flights::load_private_jet_models()?; + + let private_jets = aircrafts + .into_iter() + // its primary use is to be a private jet + .filter_map(|(_, a)| models.contains_key(&a.model).then_some(a)) + .map(|a| (a.icao_number.clone(), a)) + .collect::>(); + let required = private_jets.len() * 1 * 12; + + let ready = flights::existing_months_positions(&client).await?; + let ready = ready + .into_iter() + .filter(|(icao, _)| private_jets.contains_key(icao)) + .collect::>(); + log::info!("ready : {}", ready.len()); + + let completed = existing(&client) + .await? + .into_iter() + .filter(|(icao, _)| private_jets.contains_key(icao)) + .collect::>(); + log::info!("completed: {}", completed.len()); + + let todo = ready.difference(&completed).collect::>(); + log::info!("todo : {}", todo.len()); + + let client = Some(&client); + let private_jets = &private_jets; + let models = ⊧ + + let tasks = todo.into_iter().map(|(icao_number, month)| async move { + let positions = flights::month_positions(*month, &icao_number, client).await?; + let legs = flights::legs(positions.into_iter()); + write( + &icao_number, + *month, + legs, + &private_jets, + &models, + client.as_ref().unwrap(), + ) + .await + }); + + let processed = futures::stream::iter(tasks) + .buffered(20) + .try_collect::>() + .await? + .len(); + + write_json( + client.unwrap(), + Metadata { + icao_months_to_process: required, + icao_months_processed: processed + completed.len(), + }, + "status", + ) + .await?; + + let client = client.unwrap(); + let completed = existing(&client) + .await? + .into_iter() + .filter(|(icao, _)| private_jets.contains_key(icao)) + .collect::>(); + + let tasks = completed + .into_iter() + .map(|(icao, date)| async move { read(&icao, date, client).await }); + + let legs = futures::stream::iter(tasks) + .buffered(20) + .try_collect::>() + .await? + .into_iter() + .flatten(); + + let key = format!("{DATABASE_ROOT}all.csv"); + write_csv(legs, &key, client).await?; + + Ok(()) +} diff --git a/src/fs.rs b/src/fs.rs index 0f6dcf2..41a0f93 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -6,6 +6,7 @@ pub trait BlobStorageProvider { type Error: std::error::Error + Send; async fn maybe_get(&self, blob_name: &str) -> Result>, Self::Error>; async fn put(&self, blob_name: &str, contents: Vec) -> Result<(), Self::Error>; + async fn list(&self, prefix: &str) -> Result, Self::Error>; fn can_put(&self) -> bool; } @@ -35,6 +36,11 @@ impl BlobStorageProvider for LocalDisk { Ok(()) } + #[must_use] + async fn list(&self, _prefix: &str) -> Result, Self::Error> { + todo!() + } + fn can_put(&self) -> bool { true } diff --git a/src/fs_s3.rs b/src/fs_s3.rs index eed299d..7b2c836 100644 --- a/src/fs_s3.rs +++ b/src/fs_s3.rs @@ -76,6 +76,10 @@ async fn get(client: &ContainerClient, blob_name: &str) -> Result, Error async fn put(client: &ContainerClient, blob_name: &str, content: Vec) -> Result<(), Error> { let stream = ByteStream::from(content); + let content_type = blob_name + .ends_with(".json") + .then_some("application/json") + .unwrap_or("application/csv"); client .client @@ -84,6 +88,7 @@ async fn put(client: &ContainerClient, blob_name: &str, content: Vec) -> Res .key(blob_name) .acl(ObjectCannedAcl::PublicRead) .body(stream) + .content_type(content_type) .send() .await .map_err(|e| Error::from(format!("{e:?}"))) @@ -176,6 +181,30 @@ impl BlobStorageProvider for ContainerClient { put(&self, blob_name, contents).await } + #[must_use] + async fn list(&self, prefix: &str) -> Result, Self::Error> { + Ok(self + .client + .list_objects_v2() + .bucket(&self.bucket) + .prefix(prefix) + .into_paginator() + .send() + .try_collect() + .await + .map_err(|e| Error::from(e.to_string()))? + .into_iter() + .map(|response| { + response + .contents() + .iter() + .filter_map(|blob| blob.key().map(|x| x.to_string())) + .collect::>() + }) + .flatten() + .collect()) + } + fn can_put(&self) -> bool { self.can_put } diff --git a/src/trace_month.rs b/src/trace_month.rs index 90577f8..7f64efb 100644 --- a/src/trace_month.rs +++ b/src/trace_month.rs @@ -4,12 +4,12 @@ use futures::{StreamExt, TryStreamExt}; use time::Date; use super::Position; -use crate::{cached_aircraft_positions, fs, fs_s3}; +use crate::{cached_aircraft_positions, fs, fs_s3, BlobStorageProvider}; -static DATABASE: &'static str = "position"; +static DATABASE: &'static str = "position/"; -fn blob_name_to_pk(db: &str, blob: &str) -> (Arc, time::Date) { - let bla = &blob[db.len() + "/icao_number=".len()..]; +pub fn blob_name_to_pk(db: &str, blob: &str) -> (Arc, time::Date) { + let bla = &blob[db.len() + "icao_number=".len()..]; let end = bla.find("/").unwrap(); let icao = &bla[..end]; let date_start = end + "/month=".len(); @@ -29,11 +29,14 @@ fn blob_name_to_pk(db: &str, blob: &str) -> (Arc, time::Date) { ) } -fn pk_to_blob_name(icao: &str, date: &time::Date) -> String { +pub fn month_to_part(date: &time::Date) -> String { + format!("{}-{:02}", date.year(), date.month() as u8) +} + +pub fn pk_to_blob_name(db: &str, icao: &str, date: &time::Date) -> String { format!( - "{DATABASE}/icao_number={icao}/month={}-{:02}/data.json", - date.year(), - date.month() as u8 + "{db}icao_number={icao}/month={}/data.json", + month_to_part(date) ) } @@ -61,7 +64,7 @@ pub async fn month_positions( ) -> Result, std::io::Error> { log::info!("month_positions({month},{icao_number})"); assert_eq!(month.day(), 1); - let blob_name = pk_to_blob_name(&icao_number, &month); + let blob_name = pk_to_blob_name(DATABASE, &icao_number, &month); let (from, to) = get_month(&month); let action = fs::CacheAction::from_date(&to); @@ -128,25 +131,10 @@ pub async fn existing_months_positions( client: &fs_s3::ContainerClient, ) -> Result, time::Date)>, fs_s3::Error> { Ok(client - .client - .list_objects_v2() - .bucket(&client.bucket) - .prefix(format!("{DATABASE}/")) - .into_paginator() - .send() - .try_collect() - .await - .map_err(|e| fs_s3::Error::from(e.to_string()))? + .list(DATABASE) + .await? .into_iter() - .map(|response| { - response - .contents() - .iter() - .filter_map(|blob| blob.key()) - .map(|blob| blob_name_to_pk(DATABASE, &blob)) - .collect::>() - }) - .flatten() + .map(|blob| blob_name_to_pk(DATABASE, &blob)) .collect()) } @@ -161,7 +149,7 @@ mod test { let icao: Arc = "aa".into(); let month = date!(2022 - 02 - 01); assert_eq!( - blob_name_to_pk(DATABASE, &pk_to_blob_name(icao.as_ref(), &month)), + blob_name_to_pk(DATABASE, &pk_to_blob_name(DATABASE, icao.as_ref(), &month)), (icao, month) ) }