Skip to content

Commit

Permalink
Migrate to a simpler in-memory format (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecardleitao authored Feb 8, 2024
1 parent 62460f2 commit 80aece9
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 156 deletions.
28 changes: 13 additions & 15 deletions examples/cache_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,31 @@ async fn private_jets(

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = flights::fs_s3::anonymous_client().await;

let months = (2023..2024)
.cartesian_product(1..=12u8)
.map(|(year, month)| {
time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1)
.expect("day 1 never errors")
})
.collect::<Vec<_>>();

let client = flights::fs_s3::anonymous_client().await;

let existing = flights::existing_months_positions(&client).await?;

});
let private_jets = private_jets(Some(&client)).await?;
println!("jets : {}", private_jets.len());
println!("jets : {}", private_jets.len());
let required = private_jets
.into_iter()
.map(|a| a.icao_number.clone())
.cartesian_product(months.into_iter())
.map(|a| a.icao_number)
.cartesian_product(months)
.collect::<HashSet<_>>();
println!("required : {}", required.len());

let computed = required.intersection(&existing).count();
println!("required: {}", required.len());
println!("finished: {}", computed);
let completed = flights::existing_months_positions(&client).await?;
println!("completed: {}", completed.len());
println!(
"progress: {:.2}%",
(computed as f64) / (required.len() as f64) * 100.0
"progress : {:.2}%",
(completed.len() as f64) / (required.len() as f64) * 100.0
);
let todo = required.intersection(&completed).collect::<HashSet<_>>();
println!("todo : {}", todo.len());

Ok(())
}
6 changes: 0 additions & 6 deletions examples/country.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,6 @@ async fn legs(
client: Option<&flights::fs_s3::ContainerClient>,
) -> Result<Vec<Leg>, Box<dyn Error>> {
let positions = flights::aircraft_positions(from, to, icao_number, client).await?;
let mut positions = positions
.into_iter()
.map(|(_, p)| p)
.flatten()
.collect::<Vec<_>>();
positions.sort_unstable_by_key(|p| p.datetime());

log::info!("Computing legs {}", icao_number);
let legs = flights::legs(positions.into_iter());
Expand Down
51 changes: 31 additions & 20 deletions examples/export_legs.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::error::Error;
use std::{collections::HashSet, error::Error};

use clap::Parser;
use futures::StreamExt;
use itertools::Itertools;
use simple_logger::SimpleLogger;

use flights::{existing_months_positions, load_aircrafts, load_private_jet_models};
use flights::{existing_months_positions, Aircraft};

const ABOUT: &'static str = r#"Builds the database of all private jet positions from 2023"#;

Expand All @@ -20,43 +20,54 @@ struct Cli {
secret_access_key: String,
}

async fn private_jets(
client: Option<&flights::fs_s3::ContainerClient>,
) -> Result<Vec<Aircraft>, Box<dyn std::error::Error>> {
// load datasets to memory
let aircrafts = flights::load_aircrafts(client).await?;
let models = flights::load_private_jet_models()?;

Ok(aircrafts
.into_iter()
// its primary use is to be a private jet
.filter_map(|(_, a)| models.contains_key(&a.model).then_some(a))
.collect())
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
SimpleLogger::new()
.with_level(log::LevelFilter::Warn)
.with_level(log::LevelFilter::Info)
.init()
.unwrap();

let cli = Cli::parse();

let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await;

// load datasets to memory
let aircrafts = load_aircrafts(Some(&client)).await?;
let models = load_private_jet_models()?;

let completed = existing_months_positions(&client).await?;
log::info!("already computed: {}", completed.len());

let private_jets = aircrafts
.values()
// its primary use is to be a private jet
.filter(|a| models.contains_key(&a.model));

let months = (2023..2024)
.cartesian_product(1..=12u8)
.map(|(year, month)| {
time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1)
.expect("day 1 never errors")
});

let private_jets = private_jets(Some(&client)).await?;
log::info!("jets : {}", private_jets.len());
let required = private_jets
.into_iter()
.map(|a| a.icao_number)
.cartesian_product(months)
.filter(|(a, date)| !completed.contains(&(a.icao_number.clone(), *date)));
.collect::<HashSet<_>>();
log::info!("required : {}", required.len());

let completed = existing_months_positions(&client).await?;
log::info!("completed: {}", completed.len());
let todo = required.intersection(&completed).collect::<HashSet<_>>();
log::info!("todo : {}", todo.len());

let tasks = required.map(|(aircraft, month)| {
flights::month_positions(month, &aircraft.icao_number, Some(&client))
});
let tasks = todo
.into_iter()
.map(|(icao_number, month)| flights::month_positions(*month, icao_number, Some(&client)));

futures::stream::iter(tasks)
// limit concurrent tasks
Expand Down
6 changes: 0 additions & 6 deletions examples/period.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
log::info!("ICAO number: {}", icao);

let positions = flights::aircraft_positions(from, to, icao, client.as_ref()).await?;
let mut positions = positions
.into_iter()
.map(|(_, p)| p)
.flatten()
.collect::<Vec<_>>();
positions.sort_unstable_by_key(|p| p.datetime());

let legs = flights::legs(positions.into_iter());
log::info!("number_of_legs: {}", legs.len());
Expand Down
3 changes: 2 additions & 1 deletion src/fs_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt::Display;
use aws_credential_types::provider::ProvideCredentials;
use aws_sdk_s3::{
config::Credentials, error::SdkError, operation::head_object::HeadObjectError,
primitives::ByteStream,
primitives::ByteStream, types::ObjectCannedAcl,
};

use crate::fs::BlobStorageProvider;
Expand Down Expand Up @@ -82,6 +82,7 @@ async fn put(client: &ContainerClient, blob_name: &str, content: Vec<u8>) -> Res
.put_object()
.bucket(&client.bucket)
.key(blob_name)
.acl(ObjectCannedAcl::PublicRead)
.body(stream)
.send()
.await
Expand Down
24 changes: 9 additions & 15 deletions src/icao_to_trace.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::collections::HashMap;
use std::sync::Arc;

use futures::{StreamExt, TryStreamExt};
use rand::Rng;
use reqwest::header;
Expand Down Expand Up @@ -171,12 +168,10 @@ pub async fn positions(
client: Option<&fs_s3::ContainerClient>,
) -> Result<impl Iterator<Item = Position>, std::io::Error> {
use time::ext::NumericalDuration;
let icao: Arc<str> = icao_number.to_string().into();
trace_cached(icao_number, &date, client)
.await
.map(move |trace| {
trace.into_iter().filter_map(move |entry| {
let icao = icao.clone();
let time_seconds = entry[0].as_f64().unwrap();
let time = time::Time::MIDNIGHT + time_seconds.seconds();
let datetime = PrimitiveDateTime::new(date.clone(), time);
Expand All @@ -185,21 +180,20 @@ pub async fn positions(
entry[3]
.as_str()
.and_then(|x| {
(x == "ground").then_some(Position::Grounded {
icao: icao.clone(),
(x == "ground").then_some(Position {
datetime,
latitude,
longitude,
altitude: None,
})
})
.or_else(|| {
entry[3].as_f64().and_then(|altitude| {
Some(Position::Flying {
icao: icao.clone(),
Some(Position {
datetime,
latitude,
longitude,
altitude,
altitude: Some(altitude),
})
})
})
Expand All @@ -212,27 +206,27 @@ pub(crate) async fn cached_aircraft_positions(
to: Date,
icao_number: &str,
client: Option<&fs_s3::ContainerClient>,
) -> Result<HashMap<Date, Vec<Position>>, std::io::Error> {
) -> Result<Vec<Position>, std::io::Error> {
let dates = super::DateIter {
from,
to,
increment: time::Duration::days(1),
};

let tasks = dates.map(|date| async move {
Result::<_, std::io::Error>::Ok((
date.clone(),
Result::<_, std::io::Error>::Ok(
positions(icao_number, date, client)
.await?
.collect::<Vec<_>>(),
))
)
});

futures::stream::iter(tasks)
// limit concurrent tasks
.buffered(5)
.try_collect()
.try_collect::<Vec<_>>()
.await
.map(|x| x.into_iter().flatten().collect())
}

pub use crate::trace_month::*;
17 changes: 3 additions & 14 deletions src/legs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ impl Leg {
}

fn grounded_heuristic(prev_position: &Position, position: &Position) -> bool {
let is_flying = matches!(
(&prev_position, &position),
(Position::Flying { .. }, Position::Flying { .. })
| (Position::Flying { .. }, Position::Grounded { .. })
| (Position::Grounded { .. }, Position::Flying { .. })
);
let is_flying = prev_position.flying() || position.flying();
let lost_close_to_ground = position.datetime() - prev_position.datetime()
> time::Duration::minutes(5)
&& (position.altitude() < 10000.0 || prev_position.altitude() < 10000.0);
Expand All @@ -61,17 +56,11 @@ fn grounded_heuristic(prev_position: &Position, position: &Position) -> bool {

/// Implementation of the definition of landed in [M-4](../methodology.md).
fn landed(prev_position: &Position, position: &Position) -> bool {
matches!(
(&prev_position, &position),
(Position::Flying { .. }, Position::Grounded { .. })
) || grounded_heuristic(prev_position, position)
(prev_position.flying() && position.grounded()) || grounded_heuristic(prev_position, position)
}

fn is_grounded(prev_position: &Position, position: &Position) -> bool {
matches!(
(&prev_position, &position),
(Position::Grounded { .. }, Position::Grounded { .. })
) || grounded_heuristic(prev_position, position)
(prev_position.grounded() && position.grounded()) || grounded_heuristic(prev_position, position)
}

/// Returns a set of [`Leg`]s from a sequence of [`Position`]s.
Expand Down
54 changes: 18 additions & 36 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ mod owners;
mod private_emissions;
mod trace_month;

use std::sync::Arc;

pub use aircraft_db::*;
pub use aircraft_models::*;
pub use aircraft_owners::*;
Expand All @@ -30,59 +28,43 @@ pub use private_emissions::*;

/// A position of an aircraft
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum Position {
/// Aircraft transponder declares the aircraft is grounded
Grounded {
icao: Arc<str>,
datetime: time::PrimitiveDateTime,
latitude: f64,
longitude: f64,
},
/// Aircraft transponder declares the aircraft is flying at a given altitude
Flying {
icao: Arc<str>,
datetime: time::PrimitiveDateTime,
latitude: f64,
longitude: f64,
altitude: f64,
},
pub struct Position {
datetime: time::PrimitiveDateTime,
latitude: f64,
longitude: f64,
/// None means on the ground
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
altitude: Option<f64>,
}

impl Position {
pub fn icao(&self) -> &Arc<str> {
match self {
Position::Flying { icao, .. } | Position::Grounded { icao, .. } => icao,
}
pub fn flying(&self) -> bool {
self.altitude.is_some()
}

pub fn grounded(&self) -> bool {
self.altitude.is_none()
}

pub fn latitude(&self) -> f64 {
match *self {
Position::Flying { latitude, .. } | Position::Grounded { latitude, .. } => latitude,
}
self.latitude
}

pub fn longitude(&self) -> f64 {
match *self {
Position::Flying { longitude, .. } | Position::Grounded { longitude, .. } => longitude,
}
self.longitude
}

pub fn pos(&self) -> (f64, f64) {
(self.latitude(), self.longitude())
}

pub fn altitude(&self) -> f64 {
match *self {
Position::Flying { altitude, .. } => altitude,
Position::Grounded { .. } => 0.0,
}
self.altitude.unwrap_or(0.0)
}

pub fn datetime(&self) -> time::PrimitiveDateTime {
match *self {
Position::Flying { datetime, .. } => datetime,
Position::Grounded { datetime, .. } => datetime,
}
self.datetime
}

/// Returns the distance to another [`Position`] in km
Expand Down
Loading

0 comments on commit 80aece9

Please sign in to comment.