From da6fdb6c7bc73faa2be3ef4d9e6c6e7b3e435fe4 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Tue, 21 Nov 2023 09:49:04 +0100 Subject: [PATCH] Added multi-threading and remote store --- Cargo.toml | 11 +++- examples/period.rs | 30 ++++++++--- examples/single_day.rs | 18 ++++--- src/aircraft_db.rs | 113 ++++++++++++++++++++++++++++++----------- src/airports.rs | 10 ++-- src/fs_azure.rs | 65 ++++++++++++++++++++++++ src/icao_to_trace.rs | 76 +++++++++++++++++++++------ src/lib.rs | 1 + tests/it/main.rs | 12 ++--- 9 files changed, 263 insertions(+), 73 deletions(-) create mode 100644 src/fs_azure.rs diff --git a/Cargo.toml b/Cargo.toml index 120d848..3696d4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", default_features = false } # perform requests to the internet -reqwest = {version="*", features = ["blocking", "gzip"]} +reqwest = {version="*", features = ["gzip"]} # create random string for cookies rand = {version="*", default_features = false, features = ["std", "std_rng", "getrandom"]} @@ -24,6 +24,15 @@ geoutils = {version="*", default_features = false} # read airport names csv = {version="*", default_features = false} +# azure integration +azure_storage = "*" +azure_storage_blobs = "*" +azure_core = "*" +futures = "0.3" +bytes = "1.5" +async-recursion = "1.0" + [dev-dependencies] tinytemplate = "1.1" clap = { version = "4.4.6", features = ["derive"] } +tokio = {version="1.0", features=["rt", "macros", "rt-multi-thread"]} diff --git a/examples/period.rs b/examples/period.rs index 4a97178..450a8b1 100644 --- a/examples/period.rs +++ b/examples/period.rs @@ -1,5 +1,7 @@ use std::error::Error; +use clap::Parser; + use flights::{ emissions, load_aircraft_owners, load_aircrafts, load_owners, Aircraft, Class, Company, Fact, }; @@ -34,10 +36,25 @@ fn render(context: &Context) -> Result<(), Box> { Ok(()) } -fn main() -> Result<(), Box> { +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// The Azure token + #[arg(short, long)] + azure_sas_token: Option, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cli = Cli::parse(); + + let client = cli + .azure_sas_token + .map(|token| flights::fs_azure::initialize(&token, "privatejets", "data").unwrap()); + let owners = load_owners()?; let aircraft_owners = load_aircraft_owners()?; - let aircrafts = load_aircrafts()?; + let aircrafts = load_aircrafts(client.as_ref()).await?; let to = time::OffsetDateTime::now_utc().date() - time::Duration::days(1); let from = to - time::Duration::days(90); @@ -70,10 +87,11 @@ fn main() -> Result<(), Box> { increment: time::Duration::days(1), }; - let mut positions = vec![]; - for date in iter { - positions.extend(flights::positions(icao, &date, 1000.0)?); - } + let iter = iter.map(|date| flights::positions(icao, date, 1000.0, client.as_ref())); + + let positions = futures::future::try_join_all(iter).await?; + let mut positions = positions.into_iter().flatten().collect::>(); + positions.sort_unstable_by_key(|x| x.datetime()); let legs = flights::legs(positions.into_iter()); let legs = legs diff --git a/examples/single_day.rs b/examples/single_day.rs index 58bd95e..3eae28f 100644 --- a/examples/single_day.rs +++ b/examples/single_day.rs @@ -41,14 +41,14 @@ struct Cli { date: String, } -pub fn flight_date( +async fn flight_date( tail_number: &str, - date: &time::Date, + date: time::Date, owners: &Owners, aircraft_owners: &AircraftOwners, aircrafts: &Aircrafts, ) -> Result, Box> { - let airports = airports_cached()?; + let airports = airports_cached().await?; let aircraft_owner = aircraft_owners .get(tail_number) .ok_or_else(|| Into::>::into("Owner of tail number not found"))?; @@ -69,7 +69,7 @@ pub fn flight_date( let icao = &aircraft.icao_number; println!("ICAO number: {}", icao); - let positions = positions(icao, date, 1000.0)?; + let positions = positions(icao, date, 1000.0, None).await?; let legs = legs(positions); println!("Number of legs: {}", legs.len()); @@ -138,14 +138,15 @@ fn process_leg( Ok(()) } -pub fn main() -> Result<(), Box> { +#[tokio::main] +async fn main() -> Result<(), Box> { let cli = Cli::parse(); std::fs::create_dir_all("database")?; let owners = load_owners()?; let aircraft_owners = load_aircraft_owners()?; - let aircrafts = load_aircrafts()?; + let aircrafts = load_aircrafts(None).await?; let dane_emissions_kg = Fact { claim: 5100, @@ -160,11 +161,12 @@ pub fn main() -> Result<(), Box> { let mut events = flight_date( &cli.tail_number, - &date, + date, &owners, &aircraft_owners, &aircrafts, - )?; + ) + .await?; if events.len() == 2 && events[0].from_airport == events[1].to_airport { let mut event = events.remove(0); diff --git a/src/aircraft_db.rs b/src/aircraft_db.rs index 1830e0d..74644de 100644 --- a/src/aircraft_db.rs +++ b/src/aircraft_db.rs @@ -2,10 +2,13 @@ use std::collections::HashMap; use std::error::Error; +use async_recursion::async_recursion; use reqwest; use serde::{Deserialize, Serialize}; use serde_json; +use crate::fs_azure; + /// [`HashMap`] between tail number (e.g. "OY-TWM") and an [`Aircraft`] pub type Aircrafts = HashMap; @@ -26,48 +29,88 @@ static DIRECTORY: &'static str = "database"; fn cache_file_path(prefix: &str) -> String { format!("{DIRECTORY}/{DATABASE}/{prefix}.json") } + fn source(prefix: &str) -> String { format!("https://globe.adsbexchange.com/{DATABASE}/{prefix}.js") } -/// Returns a map between tail number (e.g. "OYTWM": "45D2ED") +/// Returns a map between tail number (e.g. "OYTWM": "45D2ED") /// Caches to disk the first time it is executed -fn aircrafts_prefixed( +async fn aircrafts_prefixed_azure( prefix: &str, -) -> Result>>, Box> { + client: &fs_azure::ContainerClient, +) -> Result, Box> { + let path = &cache_file_path(prefix); + + let data = if !fs_azure::exists(client, path).await? { + let source = &source(prefix); + let req = reqwest::get(source).await?; + let data = req.bytes().await?; + fs_azure::put(client, &path, data.clone()).await?; + data.into() + } else { + fs_azure::get(client, path).await? + }; + Ok(data) +} + +/// Returns a map between tail number (e.g. "OYTWM": "45D2ED") +/// Caches to disk the first time it is executed +async fn aircrafts_prefixed_fs(prefix: &str) -> Result, Box> { let path = &cache_file_path(prefix); if !std::path::Path::new(path).exists() { let source = &source(prefix); - let req = reqwest::blocking::get(source)?; - let data = req.text()?; + let req = reqwest::get(source).await?; + let data = req.text().await?; std::fs::create_dir_all(format!("{DIRECTORY}/{DATABASE}"))?; std::fs::write(path, data)?; } - let data = std::fs::read(path)?; - Ok(serde_json::from_slice(&data)?) + Ok(std::fs::read(path)?) } -fn children( +/// Returns a map between tail number (e.g. "OYTWM": "45D2ED") +/// Caches to disk the first time it is executed +async fn aircrafts_prefixed( + prefix: String, + client: Option<&fs_azure::ContainerClient>, +) -> Result<(String, HashMap>>), String> { + let data = match client { + Some(client) => aircrafts_prefixed_azure(&prefix, client).await, + None => aircrafts_prefixed_fs(&prefix).await, + } + .map_err(|e| e.to_string())?; + Ok(( + prefix, + serde_json::from_slice(&data).map_err(|e| e.to_string())?, + )) +} + +#[async_recursion] +async fn children<'a: 'async_recursion>( entries: &mut HashMap>>, -) -> Result>>)>, Box> { + client: Option<&'a fs_azure::ContainerClient>, +) -> Result>>)>, String> { let Some(entries) = entries.remove("children") else { return Ok(Default::default()); }; - let mut entries = entries - .into_iter() - .map(|x| x.unwrap()) - .map(|x| aircrafts_prefixed(&x).map(|r| (x, r))) - .collect::, _>>()?; + let mut entries = futures::future::try_join_all( + entries + .into_iter() + .map(|x| x.unwrap()) + .map(|x| aircrafts_prefixed(x, client)), + ) + .await + .map_err(|e| e.to_string())?; // recurse over all children - let children = entries - .iter_mut() - .map(|(_, ref mut r)| children(r)) - .collect::, _>>()?; + let mut _children = vec![]; + for entry in entries.iter_mut() { + _children.extend(children(&mut entry.1, client).await?) + } - entries.extend(children.into_iter().flatten()); + entries.extend(_children); Ok(entries) } @@ -75,20 +118,21 @@ fn children( /// It returns ~0.5m aircrafts /// # Implementation /// This function is idempotent but not pure: it caches every https request to disk to not penalize adsbexchange.com -pub fn load_aircrafts() -> Result> { +pub async fn load_aircrafts( + client: Option<&fs_azure::ContainerClient>, +) -> Result> { let prefixes = (b'A'..=b'F').chain(b'0'..b'9'); let prefixes = prefixes.map(|x| std::str::from_utf8(&[x]).unwrap().to_string()); - let mut entries = prefixes - .map(|x| aircrafts_prefixed(&x).map(|r| (x, r))) - .collect::, _>>()?; + let mut entries = + futures::future::try_join_all(prefixes.map(|x| aircrafts_prefixed(x, client))).await?; - let children = entries - .iter_mut() - .map(|(_, ref mut r)| children(r)) - .collect::, _>>()?; + let mut _children = vec![]; + for entry in entries.iter_mut() { + _children.extend(children(&mut entry.1, client).await?) + } - entries.extend(children.into_iter().flatten()); + entries.extend(_children); Ok(entries .into_iter() @@ -117,9 +161,16 @@ pub fn load_aircrafts() -> Result> { mod test { use super::*; - #[test] - fn work() { - assert_eq!(aircrafts_prefixed("A0").unwrap().len(), 24465); + #[tokio::test] + async fn work() { + assert_eq!( + aircrafts_prefixed("A0".to_string(), None) + .await + .unwrap() + .1 + .len(), + 24465 + ); // although important, this is an expensive call to run on every test => only run ad-hoc //assert_eq!(aircrafts().unwrap().len(), 463747); } diff --git a/src/airports.rs b/src/airports.rs index 44d8bfe..7d94921 100644 --- a/src/airports.rs +++ b/src/airports.rs @@ -9,21 +9,21 @@ pub struct Airport { pub type_: String, } -fn airports() -> Result> { +async fn airports() -> Result> { let url = "https://raw.githubusercontent.com/davidmegginson/ourairports-data/main/airports.csv"; - let client = reqwest::blocking::Client::builder() + let client = reqwest::Client::builder() .redirect(reqwest::redirect::Policy::none()) .build() .unwrap(); - Ok(client.get(url).send()?.text()?) + Ok(client.get(url).send().await?.text().await?) } /// Returns a list of airports -pub fn airports_cached() -> Result, Box> { +pub async fn airports_cached() -> Result, Box> { let file_path = "database/airports.csv"; if !std::path::Path::new(&file_path).exists() { - let data = airports()?; + let data = airports().await?; std::fs::write(&file_path, data)?; } diff --git a/src/fs_azure.rs b/src/fs_azure.rs new file mode 100644 index 0000000..7e869ac --- /dev/null +++ b/src/fs_azure.rs @@ -0,0 +1,65 @@ +use azure_storage::prelude::*; +use azure_storage_blobs::{ + blob::operations::PutBlockBlobResponse, container::operations::BlobItem, prelude::ClientBuilder, +}; +use futures::stream::StreamExt; + +pub use azure_storage_blobs::prelude::ContainerClient; + +/// Lists all blobs in container +pub async fn list(client: ContainerClient) -> Result, azure_storage::Error> { + let mut result = vec![]; + let mut blobs = client.list_blobs().into_stream(); + while let Some(response) = blobs.next().await { + result.extend( + response? + .blobs + .items + .into_iter() + .filter_map(|blob| match blob { + BlobItem::Blob(blob) => Some(blob.name), + BlobItem::BlobPrefix(_) => None, + }), + ); + } + Ok(result) +} + +/// Returns whether the blob exists in container +pub async fn exists( + client: &ContainerClient, + blob_name: &str, +) -> Result { + client.blob_client(blob_name).exists().await +} + +/// Puts a blob in container +pub async fn put( + client: &ContainerClient, + blob_name: &str, + content: impl Into, +) -> Result { + client + .blob_client(blob_name) + .put_block_blob(content) + .content_type("text/plain") + .await +} + +/// Gets a blob from container +pub async fn get( + client: &ContainerClient, + blob_name: &str, +) -> Result, azure_storage::Error> { + client.blob_client(blob_name).get_content().await +} + +/// Initialize write access to the storage +pub fn initialize( + token: &str, + account: &str, + container: &str, +) -> azure_core::Result { + StorageCredentials::sas_token(token) + .map(|credentials| ClientBuilder::new(account, credentials).container_client(container)) +} diff --git a/src/icao_to_trace.rs b/src/icao_to_trace.rs index aeda839..3699178 100644 --- a/src/icao_to_trace.rs +++ b/src/icao_to_trace.rs @@ -5,6 +5,8 @@ use reqwest::header; use reqwest::{self, StatusCode}; use time::PrimitiveDateTime; +use crate::fs_azure; + use super::Position; fn last_2(icao: &str) -> &str { @@ -36,7 +38,17 @@ fn adsbx_sid() -> String { format!("{time}_{random_chars}") } -fn globe_history(icao: &str, date: &time::Date) -> Result> { +static DIRECTORY: &'static str = "database"; +static DATABASE: &'static str = "globe_history"; + +fn cache_file_path(icao: &str, date: &time::Date) -> String { + format!("{DIRECTORY}/{DATABASE}/{date}/trace_full_{icao}.json") +} + +async fn globe_history( + icao: &str, + date: &time::Date, +) -> Result, Box> { let referer = format!("https://globe.adsbexchange.com/?icao={icao}&lat=54.448&lon=10.602&zoom=7.0"); let url = to_url(icao, date); @@ -65,14 +77,14 @@ fn globe_history(icao: &str, date: &time::Date) -> Result Result Result, Box> { - let file_path = format!("database/{icao}_{date}.json"); + let file_path = cache_file_path(icao, date); if !std::path::Path::new(&file_path).exists() { - let data = globe_history(&icao, date)?; - std::fs::write(&file_path, data)?; + let contents = globe_history(&icao, date).await?; + std::fs::create_dir_all(format!("{DIRECTORY}/{DATABASE}/{date}"))?; + std::fs::write(&file_path, &contents)?; + Ok(contents) + } else { + Ok(std::fs::read(file_path)?) + } +} + +async fn globe_history_cached_azure( + client: &fs_azure::ContainerClient, + icao: &str, + date: &time::Date, +) -> Result, Box> { + let blob_name = cache_file_path(icao, date); + if !fs_azure::exists(client, &blob_name).await? { + let data = globe_history(&icao, date).await?; + fs_azure::put(client, &blob_name, data.clone()).await?; + Ok(data) + } else { + Ok(fs_azure::get(client, &blob_name).await?) } +} - Ok(std::fs::read(file_path)?) +/// Returns a map between tail number (e.g. "OYTWM": "45D2ED") +/// Caches to disk the first time it is executed +async fn globe_history_cached( + icao: &str, + date: &time::Date, + client: Option<&fs_azure::ContainerClient>, +) -> Result, Box> { + match client { + Some(client) => globe_history_cached_azure(client, icao, date).await, + None => globe_history_cached_fs(icao, date).await, + } } /// Returns the trace of the icao number of a given day from https://adsbexchange.com. @@ -112,11 +155,12 @@ fn globe_history_cached( /// # Implementation /// Because these are historical values, this function caches them the first time it is used /// by the two arguments -pub fn trace_cached( +pub async fn trace_cached( icao: &str, date: &time::Date, + client: Option<&fs_azure::ContainerClient>, ) -> Result, Box> { - let data = globe_history_cached(icao, date)?; + let data = globe_history_cached(icao, date, client).await?; let mut value = serde_json::from_slice::(&data)?; let trace = value @@ -131,14 +175,14 @@ pub fn trace_cached( /// Returns an iterator of [`Position`] over the trace of `icao` on day `date` assuming that /// a flight below `threshold` feet is grounded. -pub fn positions( +pub async fn positions( icao: &str, - date: &time::Date, + date: time::Date, threshold: f64, + client: Option<&fs_azure::ContainerClient>, ) -> Result, Box> { use time::ext::NumericalDuration; - let date = date.clone(); - trace_cached(icao, &date).map(move |trace| { + trace_cached(icao, &date, client).await.map(move |trace| { trace.into_iter().map(move |entry| { let time_seconds = entry[0].as_f64().unwrap(); let time = time::Time::MIDNIGHT + time_seconds.seconds(); diff --git a/src/lib.rs b/src/lib.rs index d0690f7..dd0cc7c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ mod aircraft_types; mod airports; mod csv; mod emissions; +pub mod fs_azure; mod icao_to_trace; mod legs; mod model; diff --git a/tests/it/main.rs b/tests/it/main.rs index a1ec3ae..55cb71e 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -5,9 +5,9 @@ use time::macros::date; /// Verifies that we compute the correct number of legs. /// The expected 2 was confirmed by manual inspection of /// https://globe.adsbexchange.com/?icao=45d2ed&lat=54.128&lon=9.185&zoom=5.0&showTrace=2023-10-13 -#[test] -fn acceptance_legs() -> Result<(), Box> { - let positions = flights::positions("45d2ed", &date!(2023 - 10 - 13), 1000.0)?; +#[tokio::test] +async fn acceptance_legs() -> Result<(), Box> { + let positions = flights::positions("45d2ed", date!(2023 - 10 - 13), 1000.0, None).await?; let legs = flights::legs(positions); assert_eq!(legs.len(), 2); @@ -45,9 +45,9 @@ fn acceptance_test_emissions() { assert!(abs_difference(emissions, expected) / expected < accepted_error); } -#[test] -fn legs_() -> Result<(), Box> { - let positions = flights::positions("459cd3", &date!(2023 - 11 - 17), 1000.0)?; +#[tokio::test] +async fn legs_() -> Result<(), Box> { + let positions = flights::positions("459cd3", date!(2023 - 11 - 17), 1000.0, None).await?; let legs = flights::legs(positions.into_iter()); let legs = legs .into_iter()