From 1d372d9033858158e61505d020f928abf8179457 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Sun, 4 Feb 2024 23:27:18 +0100 Subject: [PATCH] Improved analysis --- analysis.py | 43 +++++++++++++++++---------- examples/etl_legs.rs | 70 ++++++++------------------------------------ 2 files changed, 40 insertions(+), 73 deletions(-) diff --git a/analysis.py b/analysis.py index 4982cd0..99c88f5 100644 --- a/analysis.py +++ b/analysis.py @@ -1,7 +1,7 @@ import duckdb import matplotlib.pyplot as plt -PATH = "results/legs/*/*/*.csv" +PERCENTAGE_DONE = "33.11" def set_size(width: float, fraction=1): @@ -46,15 +46,26 @@ def repartition(): duckdb.sql( f""" COPY ( - SELECT * FROM read_csv_auto('{PATH}', header = true) + SELECT * FROM read_csv_auto('results/legs/*/*/*.csv', header = true) ) TO 'results/legs.csv' (HEADER, DELIMITER ',') """ ) -# repartition() -PATH = "results/legs.csv" +repartition() +print("repartition completed") + +duckdb.sql( + f""" +CREATE TEMP TABLE "legs" AS ( +SELECT + * +FROM read_csv_auto('results/legs.csv', header = true) +WHERE date_part('year', "start") = 2023 +) +""" +) def aircraft(): @@ -102,7 +113,7 @@ def ranked_legs(): f""" SELECT tail_number,COUNT(*) - FROM '{PATH}' + FROM "legs" GROUP BY tail_number ORDER BY COUNT(*) DESC """ @@ -122,7 +133,7 @@ def ranked_hours(): f""" SELECT tail_number,SUM(epoch("end" - "start") / 60 / 60) AS "flying_time_hour" - FROM '{PATH}' + FROM "legs" GROUP BY tail_number ORDER BY "flying_time_hour" DESC """ @@ -144,7 +155,7 @@ def histogram_hours(): f""" SELECT epoch("end" - "start") / 60 / 60 AS "flying_time_hour" - FROM '{PATH}' + FROM "legs" """ ).fetchall() @@ -170,7 +181,7 @@ def histogram_emissions(): f""" SELECT emissions_kg / 1000 - FROM '{PATH}' + FROM "legs" """ ).fetchall() @@ -204,14 +215,14 @@ def other_stats(): f""" SELECT COUNT(DISTINCT tail_number), COUNT(*) - FROM '{PATH}' + FROM "legs" """ ).fetchall()[0] with open("results/state.tex", "w") as f: f.write( f"""\ -\\newcommand{{\\percentagedone}}[0]{{{25.63}}} +\\newcommand{{\\percentagedone}}[0]{{{PERCENTAGE_DONE}}} \\newcommand{{\\totalaircraft}}[0]{{{counts[0]}}} \\newcommand{{\\totallegs}}[0]{{{counts[1]}}} \\newcommand{{\\totaljets}}[0]{{{total_jets}}} @@ -226,7 +237,7 @@ def per_month(): "month" , SUM(epoch("end" - "start") / 60 / 60) / 1000 AS "flying_time_hour" , SUM("emissions_kg") / 1000 / 1000 / 1000 AS "emissions_mega_tons" - FROM '{PATH}' + FROM "legs" GROUP BY "month" ORDER BY "month" """ @@ -264,7 +275,7 @@ def hours_per_model(): "model" , SUM(epoch("end" - "start") / 60 / 60) / 1000 AS "flying_time_hour" , SUM("emissions_kg") / 1000 / 1000 / 1000 AS "emissions_mega_tons" - FROM '{PATH}' + FROM "legs" GROUP BY "model" ORDER BY "emissions_mega_tons" DESC """ @@ -321,7 +332,7 @@ def distribution(): SELECT epoch("end" - "start") / 60 / 60 AS "flying_time_k_hours" , ST_Distance(ST_Point(from_lat,from_lon),ST_Point(to_lat,to_lon)) -FROM '{PATH}' +FROM "legs" USING SAMPLE 10% """ ).fetchall() @@ -330,7 +341,7 @@ def distribution(): x = [x[0] for x in x] plt.figure(figsize=FIG_SIZE) - plt.plot(x, y, '.') + plt.plot(x, y, ".") plt.xlabel("Flying time (hours)") plt.ylabel("Distance (feet)") plt.grid(linestyle="dotted") @@ -338,6 +349,6 @@ def distribution(): plt.savefig("results/dist.png", dpi=300) -distribution() +# distribution() -# base_analysis() +base_analysis() diff --git a/examples/etl_legs.rs b/examples/etl_legs.rs index 2ad5cbd..dfa5953 100644 --- a/examples/etl_legs.rs +++ b/examples/etl_legs.rs @@ -2,7 +2,6 @@ use std::{collections::HashMap, error::Error, sync::Arc}; use flights::{Aircraft, AircraftModels, Leg, Position}; use futures::{StreamExt, TryStreamExt}; -use itertools::Itertools; use simple_logger::SimpleLogger; #[derive(serde::Serialize)] @@ -109,71 +108,28 @@ async fn main() -> Result<(), Box> { }) .collect::>(); - let months = (2023..2024) - .cartesian_product(1..=12u8) - .map(|(year, month)| { - time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1) - .expect("day 1 never errors") - }); - - let streams = months.map(|month| flights::existing_months_positions_future(&month, &client)); + let existing = flights::existing_months_positions(&client).await?; + log::info!("existing: {}", existing.len()); - let client = Some(&client); let private_jets = &private_jets; let models = ⊧ - let tasks = streams.map(|stream| async { - let stream = stream.then(|entries| async { - let missing = entries?.into_iter().filter(|(icao_number, month)| { - if exists(icao_number, month) { - log::info!("Skipping {} {}", icao_number, month); - false - } else { - log::info!("Computing {} {}", icao_number, month); - true - } - }); - let tasks = missing - .map(|(icao_number, month)| async move { - let positions = flights::month_positions(month, &icao_number, client).await?; - let legs = compute_legs(&icao_number, month, positions)?; - write(&icao_number, month, legs, private_jets, models)?; - Ok::<_, Box>(()) - }) - .collect::>(); - Ok::<_, Box>(tasks) - }); - // Iter>> - - /* - let missing = all.into_iter().flatten().filter(|(icao_number, month)| { - if exists(icao_number, month) { - log::info!("Skipping {} {}", icao_number, month); - false - } else { - log::info!("Computing {} {}", icao_number, month); - true - } - }); - - let mut processed = vec![]; - for (icao_number, month) in missing { - let positions = flights::month_positions(month, &icao_number, client).await?; + let client = &client; + let tasks = existing + .into_iter() + .filter(|(icao, _)| private_jets.contains_key(icao)) + .filter(|(icao, month)| !exists(icao, month)) + .map(|(icao_number, month)| async move { + let positions = flights::month_positions(month, &icao_number, Some(&client)).await?; let legs = compute_legs(&icao_number, month, positions)?; write(&icao_number, month, legs, private_jets, models)?; - processed.push((icao_number, month)); - } - */ - Ok::<_, Box>(vec![1]) - }); + Ok::<_, Box>(()) + }); let processed = futures::stream::iter(tasks) .buffered(20) .try_collect::>() - .await? - .into_iter() - .flatten() - .collect::>(); - println!("{} processed: {:?}", processed.len(), processed); + .await?; + log::info!("new processed: {}", processed.len()); Ok(()) }