Skip to content

Commit

Permalink
Improved analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 9, 2024
1 parent 1eeb860 commit 1d372d9
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 73 deletions.
43 changes: 27 additions & 16 deletions analysis.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import duckdb
import matplotlib.pyplot as plt

PATH = "results/legs/*/*/*.csv"
PERCENTAGE_DONE = "33.11"


def set_size(width: float, fraction=1):
Expand Down Expand Up @@ -46,15 +46,26 @@ def repartition():
duckdb.sql(
f"""
COPY (
SELECT * FROM read_csv_auto('{PATH}', header = true)
SELECT * FROM read_csv_auto('results/legs/*/*/*.csv', header = true)
)
TO 'results/legs.csv' (HEADER, DELIMITER ',')
"""
)


# repartition()
PATH = "results/legs.csv"
repartition()
print("repartition completed")

duckdb.sql(
f"""
CREATE TEMP TABLE "legs" AS (
SELECT
*
FROM read_csv_auto('results/legs.csv', header = true)
WHERE date_part('year', "start") = 2023
)
"""
)


def aircraft():
Expand Down Expand Up @@ -102,7 +113,7 @@ def ranked_legs():
f"""
SELECT
tail_number,COUNT(*)
FROM '{PATH}'
FROM "legs"
GROUP BY tail_number
ORDER BY COUNT(*) DESC
"""
Expand All @@ -122,7 +133,7 @@ def ranked_hours():
f"""
SELECT
tail_number,SUM(epoch("end" - "start") / 60 / 60) AS "flying_time_hour"
FROM '{PATH}'
FROM "legs"
GROUP BY tail_number
ORDER BY "flying_time_hour" DESC
"""
Expand All @@ -144,7 +155,7 @@ def histogram_hours():
f"""
SELECT
epoch("end" - "start") / 60 / 60 AS "flying_time_hour"
FROM '{PATH}'
FROM "legs"
"""
).fetchall()

Expand All @@ -170,7 +181,7 @@ def histogram_emissions():
f"""
SELECT
emissions_kg / 1000
FROM '{PATH}'
FROM "legs"
"""
).fetchall()

Expand Down Expand Up @@ -204,14 +215,14 @@ def other_stats():
f"""
SELECT
COUNT(DISTINCT tail_number), COUNT(*)
FROM '{PATH}'
FROM "legs"
"""
).fetchall()[0]

with open("results/state.tex", "w") as f:
f.write(
f"""\
\\newcommand{{\\percentagedone}}[0]{{{25.63}}}
\\newcommand{{\\percentagedone}}[0]{{{PERCENTAGE_DONE}}}
\\newcommand{{\\totalaircraft}}[0]{{{counts[0]}}}
\\newcommand{{\\totallegs}}[0]{{{counts[1]}}}
\\newcommand{{\\totaljets}}[0]{{{total_jets}}}
Expand All @@ -226,7 +237,7 @@ def per_month():
"month"
, SUM(epoch("end" - "start") / 60 / 60) / 1000 AS "flying_time_hour"
, SUM("emissions_kg") / 1000 / 1000 / 1000 AS "emissions_mega_tons"
FROM '{PATH}'
FROM "legs"
GROUP BY "month"
ORDER BY "month"
"""
Expand Down Expand Up @@ -264,7 +275,7 @@ def hours_per_model():
"model"
, SUM(epoch("end" - "start") / 60 / 60) / 1000 AS "flying_time_hour"
, SUM("emissions_kg") / 1000 / 1000 / 1000 AS "emissions_mega_tons"
FROM '{PATH}'
FROM "legs"
GROUP BY "model"
ORDER BY "emissions_mega_tons" DESC
"""
Expand Down Expand Up @@ -321,7 +332,7 @@ def distribution():
SELECT
epoch("end" - "start") / 60 / 60 AS "flying_time_k_hours"
, ST_Distance(ST_Point(from_lat,from_lon),ST_Point(to_lat,to_lon))
FROM '{PATH}'
FROM "legs"
USING SAMPLE 10%
"""
).fetchall()
Expand All @@ -330,14 +341,14 @@ def distribution():
x = [x[0] for x in x]

plt.figure(figsize=FIG_SIZE)
plt.plot(x, y, '.')
plt.plot(x, y, ".")
plt.xlabel("Flying time (hours)")
plt.ylabel("Distance (feet)")
plt.grid(linestyle="dotted")
plt.tight_layout()
plt.savefig("results/dist.png", dpi=300)


distribution()
# distribution()

# base_analysis()
base_analysis()
70 changes: 13 additions & 57 deletions examples/etl_legs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{collections::HashMap, error::Error, sync::Arc};

use flights::{Aircraft, AircraftModels, Leg, Position};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use simple_logger::SimpleLogger;

#[derive(serde::Serialize)]
Expand Down Expand Up @@ -109,71 +108,28 @@ async fn main() -> Result<(), Box<dyn Error>> {
})
.collect::<HashMap<_, _>>();

let months = (2023..2024)
.cartesian_product(1..=12u8)
.map(|(year, month)| {
time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1)
.expect("day 1 never errors")
});

let streams = months.map(|month| flights::existing_months_positions_future(&month, &client));
let existing = flights::existing_months_positions(&client).await?;
log::info!("existing: {}", existing.len());

let client = Some(&client);
let private_jets = &private_jets;
let models = &models;
let tasks = streams.map(|stream| async {
let stream = stream.then(|entries| async {
let missing = entries?.into_iter().filter(|(icao_number, month)| {
if exists(icao_number, month) {
log::info!("Skipping {} {}", icao_number, month);
false
} else {
log::info!("Computing {} {}", icao_number, month);
true
}
});
let tasks = missing
.map(|(icao_number, month)| async move {
let positions = flights::month_positions(month, &icao_number, client).await?;
let legs = compute_legs(&icao_number, month, positions)?;
write(&icao_number, month, legs, private_jets, models)?;
Ok::<_, Box<dyn Error>>(())
})
.collect::<Vec<_>>();
Ok::<_, Box<dyn Error>>(tasks)
});
// Iter<Stream<Result<Vec<Task>>>

/*
let missing = all.into_iter().flatten().filter(|(icao_number, month)| {
if exists(icao_number, month) {
log::info!("Skipping {} {}", icao_number, month);
false
} else {
log::info!("Computing {} {}", icao_number, month);
true
}
});
let mut processed = vec![];
for (icao_number, month) in missing {
let positions = flights::month_positions(month, &icao_number, client).await?;
let client = &client;
let tasks = existing
.into_iter()
.filter(|(icao, _)| private_jets.contains_key(icao))
.filter(|(icao, month)| !exists(icao, month))
.map(|(icao_number, month)| async move {
let positions = flights::month_positions(month, &icao_number, Some(&client)).await?;
let legs = compute_legs(&icao_number, month, positions)?;
write(&icao_number, month, legs, private_jets, models)?;
processed.push((icao_number, month));
}
*/
Ok::<_, Box<dyn Error>>(vec![1])
});
Ok::<_, Box<dyn Error>>(())
});

let processed = futures::stream::iter(tasks)
.buffered(20)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten()
.collect::<Vec<_>>();
println!("{} processed: {:?}", processed.len(), processed);
.await?;
log::info!("new processed: {}", processed.len());

Ok(())
}

0 comments on commit 1d372d9

Please sign in to comment.