Skip to content

Commit

Permalink
Improved aircraft db
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecardleitao committed Mar 12, 2024
1 parent 8cbc109 commit 682e893
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 116 deletions.
140 changes: 73 additions & 67 deletions src/aircraft_db.rs → src/aircraft.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
//! Contains the implementation to extract the database of all aircrafts available in ADS-B exchange
//! The database contains "current" status.
use std::error::Error;
/// Contains the implementation to extract the database of all aircrafts available in ADS-B exchange
use std::{collections::HashMap, sync::Arc};

use async_recursion::async_recursion;
use reqwest;
use serde::{Deserialize, Serialize};
use serde_json;
use time::Date;

use crate::csv;
use crate::fs::BlobStorageProvider;
use crate::{fs, fs_s3, CountryIcaoRanges};

/// [`HashMap`] between tail number (e.g. "OY-TWM") and an [`Aircraft`]
Expand All @@ -27,17 +31,17 @@ pub struct Aircraft {
pub country: Option<Arc<str>>,
}

static DATABASE: &'static str = "db-20231106";

fn cache_file_path(prefix: &str) -> String {
format!("{DATABASE}/{prefix}.json")
fn file_path(date: Date) -> String {
format!("aircraft/db/date={date}/data.csv")
}

fn url(prefix: &str) -> String {
format!("https://globe.adsbexchange.com/{DATABASE}/{prefix}.js")
format!("https://globe.adsbexchange.com/db-current/{prefix}.js")
}

async fn aircrafts(prefix: &str) -> Result<Vec<u8>, reqwest::Error> {
/// Returns the current aircrafts from adsbexchange.com
/// on a specific prefix of ICAO.
async fn get_db_current(prefix: &str) -> Result<Vec<u8>, reqwest::Error> {
Ok(reqwest::get(url(prefix))
.await?
.bytes()
Expand All @@ -46,27 +50,10 @@ async fn aircrafts(prefix: &str) -> Result<Vec<u8>, reqwest::Error> {
}

/// Returns a map between tail number (e.g. "OYTWM": "45D2ED")
/// Caches to disk or remote storage the first time it is executed
async fn aircrafts_prefixed(
async fn db_current(
prefix: String,
client: Option<&fs_s3::ContainerClient>,
) -> Result<(String, HashMap<String, Vec<Option<String>>>), String> {
let blob_name = cache_file_path(&prefix);
let fetch = aircrafts(&prefix);

let data = match client {
Some(client) => fs::cached(&blob_name, fetch, client, fs::CacheAction::ReadFetchWrite)
.await
.map_err(|e| e.to_string())?,
None => fs::cached(
&blob_name,
fetch,
&fs::LocalDisk,
fs::CacheAction::ReadFetchWrite,
)
.await
.map_err(|e| e.to_string())?,
};
let data = get_db_current(&prefix).await.map_err(|e| e.to_string())?;

Ok((
prefix,
Expand All @@ -77,7 +64,6 @@ async fn aircrafts_prefixed(
#[async_recursion]
async fn children<'a: 'async_recursion>(
entries: &mut HashMap<String, Vec<Option<String>>>,
client: Option<&'a fs_s3::ContainerClient>,
) -> Result<Vec<(String, HashMap<String, Vec<Option<String>>>)>, String> {
let Some(entries) = entries.remove("children") else {
return Ok(Default::default());
Expand All @@ -87,51 +73,42 @@ async fn children<'a: 'async_recursion>(
entries
.into_iter()
.map(|x| x.unwrap())
.map(|x| aircrafts_prefixed(x, client)),
.map(|x| db_current(x)),
)
.await
.map_err(|e| e.to_string())?;

// recurse over all children
let mut _children = futures::future::try_join_all(
entries
.iter_mut()
.map(|entry| children(&mut entry.1, client)),
)
.await?;
let mut _children =
futures::future::try_join_all(entries.iter_mut().map(|entry| children(&mut entry.1)))
.await?;

entries.extend(_children.into_iter().flatten());
Ok(entries)
}

/// Returns [`Aircrafts`] known in [ADS-B exchange](https://globe.adsbexchange.com) as of 2023-11-06.
/// Returns [`Aircrafts`] known in [ADS-B exchange](https://globe.adsbexchange.com) as of now.
/// It returns ~0.5m aircrafts
/// # Implementation
/// This function is idempotent but not pure: it caches every https request either to disk or remote storage
/// to not penalize adsbexchange.com
pub async fn load_aircrafts(
client: Option<&fs_s3::ContainerClient>,
) -> Result<Aircrafts, Box<dyn Error>> {
async fn extract_aircrafts() -> Result<Vec<Aircraft>, Box<dyn Error>> {
let country_ranges = CountryIcaoRanges::new();

let prefixes = (b'A'..=b'F').chain(b'0'..b'9');
let prefixes = prefixes.map(|x| std::str::from_utf8(&[x]).unwrap().to_string());

let mut entries =
futures::future::try_join_all(prefixes.map(|x| aircrafts_prefixed(x, client))).await?;
let mut entries = futures::future::try_join_all(prefixes.map(|x| db_current(x))).await?;

let mut _children = futures::future::try_join_all(
entries
.iter_mut()
.map(|entry| children(&mut entry.1, client)),
)
.await?;
let mut _children =
futures::future::try_join_all(entries.iter_mut().map(|entry| children(&mut entry.1)))
.await?;

entries.extend(_children.into_iter().flatten());

Ok(entries
.into_iter()
.fold(HashMap::default(), |mut acc, (prefix, values)| {
.fold(vec![], |mut acc, (prefix, values)| {
let items = values
.into_iter()
.map(|(k, v)| (format!("{prefix}{k}"), v))
Expand All @@ -141,37 +118,66 @@ pub async fn load_aircrafts(
let model = std::mem::take(&mut data[3])?;
let country = country_ranges.country(&icao_number).unwrap();

Some((
tail_number.clone(),
Aircraft {
icao_number: icao_number.to_ascii_lowercase().into(),
tail_number,
type_designator,
model,
country: country.cloned(),
},
))
Some(Aircraft {
icao_number: icao_number.to_ascii_lowercase().into(),
tail_number,
type_designator,
model,
country: country.cloned(),
})
});
acc.extend(items);
acc
}))
}

async fn load(
aircraft: Vec<Aircraft>,
blob_name: &str,
client: Option<&fs_s3::ContainerClient>,
) -> Result<(), Box<dyn Error>> {
let contents = csv::serialize(aircraft.into_iter());
match client {
Some(client) => client.put(blob_name, contents).await?,
None => fs::LocalDisk.put(blob_name, contents).await?,
};
Ok(())
}

pub async fn etl_aircrafts(client: Option<&fs_s3::ContainerClient>) -> Result<(), Box<dyn Error>> {
let now = time::OffsetDateTime::now_utc().date();
let blob_name = file_path(now);
let aircraft = extract_aircrafts().await?;
load(aircraft, &blob_name, client).await
}

pub async fn read(
date: Date,
client: Option<&fs_s3::ContainerClient>,
) -> Result<Aircrafts, String> {
let path = file_path(date);
let data = match client {
Some(client) => client.maybe_get(&path).await.map_err(|e| e.to_string())?,
None => fs::LocalDisk
.maybe_get(&path)
.await
.map_err(|e| e.to_string())?,
};
let data = data.ok_or_else(|| format!("File {path} does not exist"))?;

Ok(super::csv::deserialize(&data)
.map(|x: Aircraft| (x.tail_number.clone(), x))
.collect())
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
async fn work() {
assert_eq!(
aircrafts_prefixed("A0".to_string(), None)
.await
.unwrap()
.1
.len(),
24465
);
// although important, this is an expensive call to run on every test => only run ad-hoc
//assert_eq!(aircrafts().unwrap().len(), 463747);
assert!(db_current("A0".to_string()).await.unwrap().1.len() > 20000);

//assert!(extract_aircrafts().await.unwrap().len() > 400000);
}
}
14 changes: 7 additions & 7 deletions src/bin/etl_legs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use std::{
};

use clap::Parser;
use flights::{Aircraft, AircraftModels, Airport, BlobStorageProvider, Leg};
use flights::{aircraft, AircraftModels, Airport, BlobStorageProvider, Leg};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use serde::{de::DeserializeOwned, Serialize};
use simple_logger::SimpleLogger;
use time::macros::date;

static DATABASE_ROOT: &'static str = "leg/v1/";
static DATABASE: &'static str = "leg/v1/data/";
Expand Down Expand Up @@ -69,7 +70,7 @@ async fn write_csv<B: BlobStorageProvider>(
fn transform<'a>(
icao_number: &'a Arc<str>,
legs: Vec<Leg>,
private_jets: &'a HashMap<Arc<str>, Aircraft>,
private_jets: &'a HashMap<Arc<str>, aircraft::Aircraft>,
models: &'a AircraftModels,
airports: &'a [Airport],
) -> impl Iterator<Item = LegOut> + 'a {
Expand Down Expand Up @@ -136,14 +137,13 @@ async fn read<D: DeserializeOwned>(

async fn private_jets(
client: Option<&flights::fs_s3::ContainerClient>,
) -> Result<Vec<Aircraft>, Box<dyn std::error::Error>> {
) -> Result<Vec<aircraft::Aircraft>, Box<dyn std::error::Error>> {
// load datasets to memory
let aircrafts = flights::load_aircrafts(client).await?;
let aircrafts = aircraft::read(date!(2023 - 11 - 06), client).await?;
let models = flights::load_private_jet_models()?;

Ok(aircrafts
.into_iter()
// its primary use is to be a private jet
.filter_map(|(_, a)| models.contains_key(&a.model).then_some(a))
.collect())
}
Expand All @@ -167,7 +167,7 @@ struct Cli {
async fn etl_task(
icao_number: &Arc<str>,
month: time::Date,
private_jets: &HashMap<Arc<str>, Aircraft>,
private_jets: &HashMap<Arc<str>, aircraft::Aircraft>,
models: &AircraftModels,
airports: &[Airport],
client: Option<&flights::fs_s3::ContainerClient>,
Expand All @@ -187,7 +187,7 @@ async fn etl_task(
}

async fn aggregate(
private_jets: Vec<Aircraft>,
private_jets: Vec<aircraft::Aircraft>,
models: &AircraftModels,
airports: &[Airport],
client: &flights::fs_s3::ContainerClient,
Expand Down
9 changes: 5 additions & 4 deletions src/bin/etl_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use clap::Parser;
use futures::StreamExt;
use itertools::Itertools;
use simple_logger::SimpleLogger;
use time::macros::date;

use flights::{existing_months_positions, Aircraft};
use flights::{aircraft, existing_months_positions};

const ABOUT: &'static str = r#"Builds the database of all private jet positions from 2023"#;

Expand All @@ -26,20 +27,20 @@ struct Cli {
async fn private_jets(
client: Option<&flights::fs_s3::ContainerClient>,
country: Option<&str>,
) -> Result<Vec<Aircraft>, Box<dyn std::error::Error>> {
) -> Result<Vec<aircraft::Aircraft>, Box<dyn std::error::Error>> {
// load datasets to memory
let aircrafts = flights::load_aircrafts(client).await?;
let aircrafts = aircraft::read(date!(2023 - 11 - 06), client).await?;
let models = flights::load_private_jet_models()?;

Ok(aircrafts
.into_iter()
// its primary use is to be a private jet
.filter_map(|(_, a)| models.contains_key(&a.model).then_some(a))
.filter(|(_, a)| {
country
.map(|country| a.country.as_deref() == Some(country))
.unwrap_or(true)
})
.filter_map(|(_, a)| models.contains_key(&a.model).then_some(a))
.collect())
}

Expand Down
Loading

0 comments on commit 682e893

Please sign in to comment.