From 4a6d4ef2148a8096182098b1bde8819e93bb3c76 Mon Sep 17 00:00:00 2001 From: Jorge Date: Thu, 8 Feb 2024 16:51:23 +0000 Subject: [PATCH] Finished migration --- examples/cache_state.rs | 28 +++++++++++----------- examples/export_legs.rs | 51 ++++++++++++++++++++++++++++++----------- src/trace_month.rs | 11 ++++----- 3 files changed, 56 insertions(+), 34 deletions(-) diff --git a/examples/cache_state.rs b/examples/cache_state.rs index 72ae31d..6b88d6d 100644 --- a/examples/cache_state.rs +++ b/examples/cache_state.rs @@ -21,33 +21,31 @@ async fn private_jets( #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { + let client = flights::fs_s3::anonymous_client().await; + let months = (2023..2024) .cartesian_product(1..=12u8) .map(|(year, month)| { time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1) .expect("day 1 never errors") - }) - .collect::>(); - - let client = flights::fs_s3::anonymous_client().await; - - let existing = flights::existing_months_positions(&client).await?; - + }); let private_jets = private_jets(Some(&client)).await?; - println!("jets : {}", private_jets.len()); + println!("jets : {}", private_jets.len()); let required = private_jets .into_iter() - .map(|a| a.icao_number.clone()) - .cartesian_product(months.into_iter()) + .map(|a| a.icao_number) + .cartesian_product(months) .collect::>(); + println!("required : {}", required.len()); - let computed = required.intersection(&existing).count(); - println!("required: {}", required.len()); - println!("finished: {}", computed); + let completed = flights::existing_months_positions(&client).await?; + println!("completed: {}", completed.len()); println!( - "progress: {:.2}%", - (computed as f64) / (required.len() as f64) * 100.0 + "progress : {:.2}%", + (completed.len() as f64) / (required.len() as f64) * 100.0 ); + let todo = required.intersection(&completed).collect::>(); + println!("todo : {}", todo.len()); Ok(()) } diff --git a/examples/export_legs.rs b/examples/export_legs.rs index a8dffdd..9bcd159 100644 --- a/examples/export_legs.rs +++ b/examples/export_legs.rs @@ -1,13 +1,11 @@ -use std::error::Error; +use std::{collections::HashSet, error::Error}; use clap::Parser; use futures::StreamExt; use itertools::Itertools; use simple_logger::SimpleLogger; -use flights::{ - existing_months_positions, load_aircrafts, load_private_jet_models, month_positions, -}; +use flights::{existing_months_positions, Aircraft}; const ABOUT: &'static str = r#"Builds the database of all private jet positions from 2023"#; @@ -22,6 +20,20 @@ struct Cli { secret_access_key: String, } +async fn private_jets( + client: Option<&flights::fs_s3::ContainerClient>, +) -> Result, Box> { + // load datasets to memory + let aircrafts = flights::load_aircrafts(client).await?; + let models = flights::load_private_jet_models()?; + + Ok(aircrafts + .into_iter() + // its primary use is to be a private jet + .filter_map(|(_, a)| models.contains_key(&a.model).then_some(a)) + .collect()) +} + #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { SimpleLogger::new() @@ -30,23 +42,36 @@ async fn main() -> Result<(), Box> { .unwrap(); let cli = Cli::parse(); + let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await; - // load datasets to memory - //let aircrafts = load_aircrafts(Some(&client)).await?; - //let models = load_private_jet_models()?; + let months = (2023..2024) + .cartesian_product(1..=12u8) + .map(|(year, month)| { + time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1) + .expect("day 1 never errors") + }); + let private_jets = private_jets(Some(&client)).await?; + log::info!("jets : {}", private_jets.len()); + let required = private_jets + .into_iter() + .map(|a| a.icao_number) + .cartesian_product(months) + .collect::>(); + log::info!("required : {}", required.len()); let completed = existing_months_positions(&client).await?; - log::info!("already computed: {}", completed.len()); + log::info!("completed: {}", completed.len()); + let todo = required.intersection(&completed).collect::>(); + log::info!("todo : {}", todo.len()); - let a = Some(&client); - let tasks = completed + let tasks = todo .into_iter() - .map(|(icao, date)| async move { month_positions(date.clone(), &icao, a).await }); + .map(|(icao_number, month)| flights::month_positions(*month, icao_number, Some(&client))); futures::stream::iter(tasks) // limit concurrent tasks - .buffered(100) + .buffered(10) // continue if error .map(|r| { if let Err(e) = r { @@ -55,5 +80,5 @@ async fn main() -> Result<(), Box> { }) .collect::>() .await; - return Ok(()); + Ok(()) } diff --git a/src/trace_month.rs b/src/trace_month.rs index 4169154..2eb2a1d 100644 --- a/src/trace_month.rs +++ b/src/trace_month.rs @@ -7,10 +7,9 @@ use super::Position; use crate::{cached_aircraft_positions, fs, fs_s3}; static DATABASE: &'static str = "position"; -static OLD_DATABASE: &'static str = "trace"; -fn blob_name_to_pk(blob: &str) -> (Arc, time::Date) { - let bla = &blob[OLD_DATABASE.len() + "/icao_number=".len()..]; +fn blob_name_to_pk(db: &str, blob: &str) -> (Arc, time::Date) { + let bla = &blob[db.len() + "/icao_number=".len()..]; let end = bla.find("/").unwrap(); let icao = &bla[..end]; let date_start = end + "/month=".len(); @@ -128,7 +127,7 @@ pub async fn existing_months_positions( .client .list_objects_v2() .bucket(&client.bucket) - .prefix(format!("{OLD_DATABASE}/")) + .prefix(format!("{DATABASE}/")) .into_paginator() .send() .try_collect() @@ -140,7 +139,7 @@ pub async fn existing_months_positions( .contents() .iter() .filter_map(|blob| blob.key()) - .map(|blob| blob_name_to_pk(&blob)) + .map(|blob| blob_name_to_pk(DATABASE, &blob)) .collect::>() }) .flatten() @@ -158,7 +157,7 @@ mod test { let icao: Arc = "aa".into(); let month = date!(2022 - 02 - 01); assert_eq!( - blob_name_to_pk(&pk_to_blob_name(icao.as_ref(), &month)), + blob_name_to_pk(DATABASE, &pk_to_blob_name(icao.as_ref(), &month)), (icao, month) ) }