Skip to content

Commit

Permalink
Finished migration
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecardleitao committed Feb 8, 2024
1 parent 5bd1003 commit 4a6d4ef
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 34 deletions.
28 changes: 13 additions & 15 deletions examples/cache_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,31 @@ async fn private_jets(

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = flights::fs_s3::anonymous_client().await;

let months = (2023..2024)
.cartesian_product(1..=12u8)
.map(|(year, month)| {
time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1)
.expect("day 1 never errors")
})
.collect::<Vec<_>>();

let client = flights::fs_s3::anonymous_client().await;

let existing = flights::existing_months_positions(&client).await?;

});
let private_jets = private_jets(Some(&client)).await?;
println!("jets : {}", private_jets.len());
println!("jets : {}", private_jets.len());
let required = private_jets
.into_iter()
.map(|a| a.icao_number.clone())
.cartesian_product(months.into_iter())
.map(|a| a.icao_number)
.cartesian_product(months)
.collect::<HashSet<_>>();
println!("required : {}", required.len());

let computed = required.intersection(&existing).count();
println!("required: {}", required.len());
println!("finished: {}", computed);
let completed = flights::existing_months_positions(&client).await?;
println!("completed: {}", completed.len());
println!(
"progress: {:.2}%",
(computed as f64) / (required.len() as f64) * 100.0
"progress : {:.2}%",
(completed.len() as f64) / (required.len() as f64) * 100.0
);
let todo = required.intersection(&completed).collect::<HashSet<_>>();
println!("todo : {}", todo.len());

Ok(())
}
51 changes: 38 additions & 13 deletions examples/export_legs.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::error::Error;
use std::{collections::HashSet, error::Error};

use clap::Parser;
use futures::StreamExt;
use itertools::Itertools;
use simple_logger::SimpleLogger;

use flights::{
existing_months_positions, load_aircrafts, load_private_jet_models, month_positions,
};
use flights::{existing_months_positions, Aircraft};

const ABOUT: &'static str = r#"Builds the database of all private jet positions from 2023"#;

Expand All @@ -22,6 +20,20 @@ struct Cli {
secret_access_key: String,
}

async fn private_jets(
client: Option<&flights::fs_s3::ContainerClient>,
) -> Result<Vec<Aircraft>, Box<dyn std::error::Error>> {
// load datasets to memory
let aircrafts = flights::load_aircrafts(client).await?;
let models = flights::load_private_jet_models()?;

Ok(aircrafts
.into_iter()
// its primary use is to be a private jet
.filter_map(|(_, a)| models.contains_key(&a.model).then_some(a))
.collect())
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
SimpleLogger::new()
Expand All @@ -30,23 +42,36 @@ async fn main() -> Result<(), Box<dyn Error>> {
.unwrap();

let cli = Cli::parse();

let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await;

// load datasets to memory
//let aircrafts = load_aircrafts(Some(&client)).await?;
//let models = load_private_jet_models()?;
let months = (2023..2024)
.cartesian_product(1..=12u8)
.map(|(year, month)| {
time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1)
.expect("day 1 never errors")
});
let private_jets = private_jets(Some(&client)).await?;
log::info!("jets : {}", private_jets.len());
let required = private_jets
.into_iter()
.map(|a| a.icao_number)
.cartesian_product(months)
.collect::<HashSet<_>>();
log::info!("required : {}", required.len());

let completed = existing_months_positions(&client).await?;
log::info!("already computed: {}", completed.len());
log::info!("completed: {}", completed.len());
let todo = required.intersection(&completed).collect::<HashSet<_>>();
log::info!("todo : {}", todo.len());

let a = Some(&client);
let tasks = completed
let tasks = todo
.into_iter()
.map(|(icao, date)| async move { month_positions(date.clone(), &icao, a).await });
.map(|(icao_number, month)| flights::month_positions(*month, icao_number, Some(&client)));

futures::stream::iter(tasks)
// limit concurrent tasks
.buffered(100)
.buffered(10)
// continue if error
.map(|r| {
if let Err(e) = r {
Expand All @@ -55,5 +80,5 @@ async fn main() -> Result<(), Box<dyn Error>> {
})
.collect::<Vec<_>>()
.await;
return Ok(());
Ok(())
}
11 changes: 5 additions & 6 deletions src/trace_month.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use super::Position;
use crate::{cached_aircraft_positions, fs, fs_s3};

static DATABASE: &'static str = "position";
static OLD_DATABASE: &'static str = "trace";

fn blob_name_to_pk(blob: &str) -> (Arc<str>, time::Date) {
let bla = &blob[OLD_DATABASE.len() + "/icao_number=".len()..];
fn blob_name_to_pk(db: &str, blob: &str) -> (Arc<str>, time::Date) {
let bla = &blob[db.len() + "/icao_number=".len()..];
let end = bla.find("/").unwrap();
let icao = &bla[..end];
let date_start = end + "/month=".len();
Expand Down Expand Up @@ -128,7 +127,7 @@ pub async fn existing_months_positions(
.client
.list_objects_v2()
.bucket(&client.bucket)
.prefix(format!("{OLD_DATABASE}/"))
.prefix(format!("{DATABASE}/"))
.into_paginator()
.send()
.try_collect()
Expand All @@ -140,7 +139,7 @@ pub async fn existing_months_positions(
.contents()
.iter()
.filter_map(|blob| blob.key())
.map(|blob| blob_name_to_pk(&blob))
.map(|blob| blob_name_to_pk(DATABASE, &blob))
.collect::<Vec<_>>()
})
.flatten()
Expand All @@ -158,7 +157,7 @@ mod test {
let icao: Arc<str> = "aa".into();
let month = date!(2022 - 02 - 01);
assert_eq!(
blob_name_to_pk(&pk_to_blob_name(icao.as_ref(), &month)),
blob_name_to_pk(DATABASE, &pk_to_blob_name(icao.as_ref(), &month)),
(icao, month)
)
}
Expand Down

0 comments on commit 4a6d4ef

Please sign in to comment.