Skip to content

Commit

Permalink
Added multi-threading and remote store
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Nov 21, 2023
1 parent 148fbe9 commit da6fdb6
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 73 deletions.
11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", default_features = false }

# perform requests to the internet
reqwest = {version="*", features = ["blocking", "gzip"]}
reqwest = {version="*", features = ["gzip"]}

# create random string for cookies
rand = {version="*", default_features = false, features = ["std", "std_rng", "getrandom"]}
Expand All @@ -24,6 +24,15 @@ geoutils = {version="*", default_features = false}
# read airport names
csv = {version="*", default_features = false}

# azure integration
azure_storage = "*"
azure_storage_blobs = "*"
azure_core = "*"
futures = "0.3"
bytes = "1.5"
async-recursion = "1.0"

[dev-dependencies]
tinytemplate = "1.1"
clap = { version = "4.4.6", features = ["derive"] }
tokio = {version="1.0", features=["rt", "macros", "rt-multi-thread"]}
30 changes: 24 additions & 6 deletions examples/period.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::error::Error;

use clap::Parser;

use flights::{
emissions, load_aircraft_owners, load_aircrafts, load_owners, Aircraft, Class, Company, Fact,
};
Expand Down Expand Up @@ -34,10 +36,25 @@ fn render(context: &Context) -> Result<(), Box<dyn Error>> {
Ok(())
}

fn main() -> Result<(), Box<dyn Error>> {
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Cli {
/// The Azure token
#[arg(short, long)]
azure_sas_token: Option<String>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let cli = Cli::parse();

let client = cli
.azure_sas_token
.map(|token| flights::fs_azure::initialize(&token, "privatejets", "data").unwrap());

let owners = load_owners()?;
let aircraft_owners = load_aircraft_owners()?;
let aircrafts = load_aircrafts()?;
let aircrafts = load_aircrafts(client.as_ref()).await?;

let to = time::OffsetDateTime::now_utc().date() - time::Duration::days(1);
let from = to - time::Duration::days(90);
Expand Down Expand Up @@ -70,10 +87,11 @@ fn main() -> Result<(), Box<dyn Error>> {
increment: time::Duration::days(1),
};

let mut positions = vec![];
for date in iter {
positions.extend(flights::positions(icao, &date, 1000.0)?);
}
let iter = iter.map(|date| flights::positions(icao, date, 1000.0, client.as_ref()));

let positions = futures::future::try_join_all(iter).await?;
let mut positions = positions.into_iter().flatten().collect::<Vec<_>>();
positions.sort_unstable_by_key(|x| x.datetime());

let legs = flights::legs(positions.into_iter());
let legs = legs
Expand Down
18 changes: 10 additions & 8 deletions examples/single_day.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ struct Cli {
date: String,
}

pub fn flight_date(
async fn flight_date(
tail_number: &str,
date: &time::Date,
date: time::Date,
owners: &Owners,
aircraft_owners: &AircraftOwners,
aircrafts: &Aircrafts,
) -> Result<Vec<Event>, Box<dyn Error>> {
let airports = airports_cached()?;
let airports = airports_cached().await?;
let aircraft_owner = aircraft_owners
.get(tail_number)
.ok_or_else(|| Into::<Box<dyn Error>>::into("Owner of tail number not found"))?;
Expand All @@ -69,7 +69,7 @@ pub fn flight_date(
let icao = &aircraft.icao_number;
println!("ICAO number: {}", icao);

let positions = positions(icao, date, 1000.0)?;
let positions = positions(icao, date, 1000.0, None).await?;
let legs = legs(positions);

println!("Number of legs: {}", legs.len());
Expand Down Expand Up @@ -138,14 +138,15 @@ fn process_leg(
Ok(())
}

pub fn main() -> Result<(), Box<dyn Error>> {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let cli = Cli::parse();

std::fs::create_dir_all("database")?;

let owners = load_owners()?;
let aircraft_owners = load_aircraft_owners()?;
let aircrafts = load_aircrafts()?;
let aircrafts = load_aircrafts(None).await?;

let dane_emissions_kg = Fact {
claim: 5100,
Expand All @@ -160,11 +161,12 @@ pub fn main() -> Result<(), Box<dyn Error>> {

let mut events = flight_date(
&cli.tail_number,
&date,
date,
&owners,
&aircraft_owners,
&aircrafts,
)?;
)
.await?;

if events.len() == 2 && events[0].from_airport == events[1].to_airport {
let mut event = events.remove(0);
Expand Down
113 changes: 82 additions & 31 deletions src/aircraft_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
use std::collections::HashMap;
use std::error::Error;

use async_recursion::async_recursion;
use reqwest;
use serde::{Deserialize, Serialize};
use serde_json;

use crate::fs_azure;

/// [`HashMap`] between tail number (e.g. "OY-TWM") and an [`Aircraft`]
pub type Aircrafts = HashMap<String, Aircraft>;

Expand All @@ -26,69 +29,110 @@ static DIRECTORY: &'static str = "database";
fn cache_file_path(prefix: &str) -> String {
format!("{DIRECTORY}/{DATABASE}/{prefix}.json")
}

fn source(prefix: &str) -> String {
format!("https://globe.adsbexchange.com/{DATABASE}/{prefix}.js")
}

/// Returns a map between tail number (e.g. "OYTWM": "45D2ED")
/// Returns a map between tail number (e.g. "OYTWM": "45D2ED")
/// Caches to disk the first time it is executed
fn aircrafts_prefixed(
async fn aircrafts_prefixed_azure(
prefix: &str,
) -> Result<HashMap<String, Vec<Option<String>>>, Box<dyn Error>> {
client: &fs_azure::ContainerClient,
) -> Result<Vec<u8>, Box<dyn Error>> {
let path = &cache_file_path(prefix);

let data = if !fs_azure::exists(client, path).await? {
let source = &source(prefix);
let req = reqwest::get(source).await?;
let data = req.bytes().await?;
fs_azure::put(client, &path, data.clone()).await?;
data.into()
} else {
fs_azure::get(client, path).await?
};
Ok(data)
}

/// Returns a map between tail number (e.g. "OYTWM": "45D2ED")
/// Caches to disk the first time it is executed
async fn aircrafts_prefixed_fs(prefix: &str) -> Result<Vec<u8>, Box<dyn Error>> {
let path = &cache_file_path(prefix);
if !std::path::Path::new(path).exists() {
let source = &source(prefix);
let req = reqwest::blocking::get(source)?;
let data = req.text()?;
let req = reqwest::get(source).await?;
let data = req.text().await?;
std::fs::create_dir_all(format!("{DIRECTORY}/{DATABASE}"))?;
std::fs::write(path, data)?;
}

let data = std::fs::read(path)?;
Ok(serde_json::from_slice(&data)?)
Ok(std::fs::read(path)?)
}

fn children(
/// Returns a map between tail number (e.g. "OYTWM": "45D2ED")
/// Caches to disk the first time it is executed
async fn aircrafts_prefixed(
prefix: String,
client: Option<&fs_azure::ContainerClient>,
) -> Result<(String, HashMap<String, Vec<Option<String>>>), String> {
let data = match client {
Some(client) => aircrafts_prefixed_azure(&prefix, client).await,
None => aircrafts_prefixed_fs(&prefix).await,
}
.map_err(|e| e.to_string())?;
Ok((
prefix,
serde_json::from_slice(&data).map_err(|e| e.to_string())?,
))
}

#[async_recursion]
async fn children<'a: 'async_recursion>(
entries: &mut HashMap<String, Vec<Option<String>>>,
) -> Result<Vec<(String, HashMap<String, Vec<Option<String>>>)>, Box<dyn Error>> {
client: Option<&'a fs_azure::ContainerClient>,
) -> Result<Vec<(String, HashMap<String, Vec<Option<String>>>)>, String> {
let Some(entries) = entries.remove("children") else {
return Ok(Default::default());
};

let mut entries = entries
.into_iter()
.map(|x| x.unwrap())
.map(|x| aircrafts_prefixed(&x).map(|r| (x, r)))
.collect::<Result<Vec<_>, _>>()?;
let mut entries = futures::future::try_join_all(
entries
.into_iter()
.map(|x| x.unwrap())
.map(|x| aircrafts_prefixed(x, client)),
)
.await
.map_err(|e| e.to_string())?;

// recurse over all children
let children = entries
.iter_mut()
.map(|(_, ref mut r)| children(r))
.collect::<Result<Vec<_>, _>>()?;
let mut _children = vec![];
for entry in entries.iter_mut() {
_children.extend(children(&mut entry.1, client).await?)
}

entries.extend(children.into_iter().flatten());
entries.extend(_children);
Ok(entries)
}

/// Returns [`Aircrafts`] known in [ADS-B exchange](https://globe.adsbexchange.com) as of 2023-11-06.
/// It returns ~0.5m aircrafts
/// # Implementation
/// This function is idempotent but not pure: it caches every https request to disk to not penalize adsbexchange.com
pub fn load_aircrafts() -> Result<Aircrafts, Box<dyn Error>> {
pub async fn load_aircrafts(
client: Option<&fs_azure::ContainerClient>,
) -> Result<Aircrafts, Box<dyn Error>> {
let prefixes = (b'A'..=b'F').chain(b'0'..b'9');
let prefixes = prefixes.map(|x| std::str::from_utf8(&[x]).unwrap().to_string());

let mut entries = prefixes
.map(|x| aircrafts_prefixed(&x).map(|r| (x, r)))
.collect::<Result<Vec<_>, _>>()?;
let mut entries =
futures::future::try_join_all(prefixes.map(|x| aircrafts_prefixed(x, client))).await?;

let children = entries
.iter_mut()
.map(|(_, ref mut r)| children(r))
.collect::<Result<Vec<_>, _>>()?;
let mut _children = vec![];
for entry in entries.iter_mut() {
_children.extend(children(&mut entry.1, client).await?)
}

entries.extend(children.into_iter().flatten());
entries.extend(_children);

Ok(entries
.into_iter()
Expand Down Expand Up @@ -117,9 +161,16 @@ pub fn load_aircrafts() -> Result<Aircrafts, Box<dyn Error>> {
mod test {
use super::*;

#[test]
fn work() {
assert_eq!(aircrafts_prefixed("A0").unwrap().len(), 24465);
#[tokio::test]
async fn work() {
assert_eq!(
aircrafts_prefixed("A0".to_string(), None)
.await
.unwrap()
.1
.len(),
24465
);
// although important, this is an expensive call to run on every test => only run ad-hoc
//assert_eq!(aircrafts().unwrap().len(), 463747);
}
Expand Down
10 changes: 5 additions & 5 deletions src/airports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ pub struct Airport {
pub type_: String,
}

fn airports() -> Result<String, Box<dyn std::error::Error>> {
async fn airports() -> Result<String, Box<dyn std::error::Error>> {
let url = "https://raw.githubusercontent.com/davidmegginson/ourairports-data/main/airports.csv";

let client = reqwest::blocking::Client::builder()
let client = reqwest::Client::builder()
.redirect(reqwest::redirect::Policy::none())
.build()
.unwrap();
Ok(client.get(url).send()?.text()?)
Ok(client.get(url).send().await?.text().await?)
}

/// Returns a list of airports
pub fn airports_cached() -> Result<Vec<Airport>, Box<dyn std::error::Error>> {
pub async fn airports_cached() -> Result<Vec<Airport>, Box<dyn std::error::Error>> {
let file_path = "database/airports.csv";
if !std::path::Path::new(&file_path).exists() {
let data = airports()?;
let data = airports().await?;
std::fs::write(&file_path, data)?;
}

Expand Down
Loading

0 comments on commit da6fdb6

Please sign in to comment.