From eca048569d1177a3fac4bddeb927a10b34d3a54a Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Tue, 21 Nov 2023 14:39:19 +0100 Subject: [PATCH] Improved --- README.md | 42 ++++++++++++++++++++++++++++++----- examples/period.rs | 6 ++--- examples/single_day.rs | 15 ++++++++----- src/aircraft_db.rs | 13 ++++++----- src/airports.rs | 2 +- src/fs.rs | 44 ++++++++++++++++++++++++++++++------- src/fs_azure.rs | 45 +++++++++++++++++++++++++++++++++---- src/icao_to_trace.rs | 50 ++++++++++++++++++++++++++++++++---------- 8 files changed, 174 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index d1be5af..7b49370 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,28 @@ # Danish private flights This repository contains a CLI application that generates a text based summary of -private jet's flight information targetted to a Danish audience. +private jet's flight information targeted to a Danish audience. + +It is supported by an Azure Blob storage container for caching data, thereby +reducing its impact to [https://adsbexchange.com/](https://adsbexchange.com/). + +## Risk and impact + +This code performs API calls to [https://adsbexchange.com/](https://adsbexchange.com/), +a production website of a company. + +**Use critical thinking** when using this code and how it impacts them. + +We strongly recommend that if you plan to perform large scale analysis (e.g. in time or aircrafts), +that you reach out via an issue _before_, so that we can work together +to cache all hits to [https://adsbexchange.com/](https://adsbexchange.com/) +on an horizontally scaled remote storage and therefore remove its impact to adsbexchange.com +of future calls. + +All data cached is available on Azure blob storage: +* account: `privatejets` +* container: `data` + +and has anonymous and public read permissions. ## How to use @@ -9,11 +31,21 @@ private jet's flight information targetted to a Danish audience. 3. open `OY-GFS_2023-10-20_0.md` Step 2. has an optional argument, `--azure-sas-token`, specifying an Azure storage container SAS -token for account `privatejets`, container `data`. -When used, caching of data is done on the remote container, as opposed to local disk. +for account `privatejets`, container `data`. +When used, cache is written to the remote container, as opposed to disk. + +Finally, setting `--backend disk` ignores the Azure's remote storage altogether and +only uses disk for caching (resulting in higher cache misses and thus more +interactions with ADS-B exchange). + +In general: +* Use the default parameters when creating ad-hoc stories +* Use `--azure-sas-token` when improving the database with new data. +* Use `--backend disk` when testing the caching system -Furthermore, setting `--backend azure` without `azure-sas-token` provides read-access -to the remote container. +As of today, the flag `--azure-sas-token` is only available when the code is executed +from `main`, as writing to the blob storage must be done through a controlled code base +that preserves data integrity. ## Assumptions diff --git a/examples/period.rs b/examples/period.rs index 643f57d..0d718ef 100644 --- a/examples/period.rs +++ b/examples/period.rs @@ -39,7 +39,7 @@ fn render(context: &Context) -> Result<(), Box> { #[derive(clap::ValueEnum, Debug, Clone)] enum Backend { - LocalDisk, + Disk, Azure, } @@ -49,7 +49,7 @@ struct Cli { /// The Azure token #[arg(short, long)] azure_sas_token: Option, - #[arg(short, long, value_enum, default_value_t=Backend::LocalDisk)] + #[arg(short, long, value_enum, default_value_t=Backend::Azure)] backend: Backend, } @@ -64,7 +64,7 @@ async fn main() -> Result<(), Box> { // optionally initialize Azure client let client = match (cli.backend, cli.azure_sas_token) { - (Backend::LocalDisk, None) => None, + (Backend::Disk, None) => None, (Backend::Azure, None) => Some(flights::fs_azure::initialize_anonymous( "privatejets", "data", diff --git a/examples/single_day.rs b/examples/single_day.rs index 30115f9..667c4cd 100644 --- a/examples/single_day.rs +++ b/examples/single_day.rs @@ -1,11 +1,11 @@ use std::error::Error; +use clap::Parser; +use simple_logger::SimpleLogger; use tinytemplate::TinyTemplate; use flights::*; -use clap::Parser; - static TEMPLATE_NAME: &'static str = "t"; #[derive(serde::Serialize, serde::Deserialize, Debug)] @@ -31,7 +31,7 @@ pub struct Context { #[derive(clap::ValueEnum, Debug, Clone)] enum Backend { - LocalDisk, + Disk, Azure, } @@ -48,7 +48,7 @@ struct Cli { /// The Azure token #[arg(short, long)] azure_sas_token: Option, - #[arg(short, long, value_enum, default_value_t=Backend::LocalDisk)] + #[arg(short, long, value_enum, default_value_t=Backend::Azure)] backend: Backend, } @@ -159,11 +159,16 @@ fn process_leg( #[tokio::main] async fn main() -> Result<(), Box> { + SimpleLogger::new() + .with_level(log::LevelFilter::Info) + .init() + .unwrap(); + let cli = Cli::parse(); // optionally initialize Azure client let client = match (cli.backend, cli.azure_sas_token) { - (Backend::LocalDisk, None) => None, + (Backend::Disk, None) => None, (Backend::Azure, None) => Some(flights::fs_azure::initialize_anonymous( "privatejets", "data", diff --git a/src/aircraft_db.rs b/src/aircraft_db.rs index a34af78..5c0b5d1 100644 --- a/src/aircraft_db.rs +++ b/src/aircraft_db.rs @@ -34,7 +34,7 @@ fn url(prefix: &str) -> String { format!("https://globe.adsbexchange.com/{DATABASE}/{prefix}.js") } -async fn aircrafts(prefix: &str) -> Result, Box> { +async fn aircrafts(prefix: &str) -> Result, reqwest::Error> { Ok(reqwest::get(url(prefix)) .await? .bytes() @@ -52,10 +52,13 @@ async fn aircrafts_prefixed( let fetch = aircrafts(&prefix); let data = match client { - Some(client) => crate::fs::cached(&blob_name, fetch, client).await, - None => crate::fs::cached(&blob_name, fetch, &fs::LocalDisk).await, - } - .map_err(|e| e.to_string())?; + Some(client) => crate::fs::cached(&blob_name, fetch, client) + .await + .map_err(|e| e.to_string())?, + None => crate::fs::cached(&blob_name, fetch, &fs::LocalDisk) + .await + .map_err(|e| e.to_string())?, + }; Ok(( prefix, diff --git a/src/airports.rs b/src/airports.rs index 2c37107..fb15f47 100644 --- a/src/airports.rs +++ b/src/airports.rs @@ -9,7 +9,7 @@ pub struct Airport { pub type_: String, } -async fn airports() -> Result, Box> { +async fn airports() -> Result, reqwest::Error> { let url = "https://raw.githubusercontent.com/davidmegginson/ourairports-data/main/airports.csv"; Ok(reqwest::get(url).await?.bytes().await.map(|x| x.into())?) } diff --git a/src/fs.rs b/src/fs.rs index 00731ce..03c14b6 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; /// An object that can be used to get and put blobs. #[async_trait] pub trait BlobStorageProvider { - type Error: std::error::Error; + type Error: std::error::Error + Send; async fn maybe_get(&self, blob_name: &str) -> Result>, Self::Error>; async fn put(&self, blob_name: &str, contents: Vec) -> Result, Self::Error>; } @@ -34,28 +34,56 @@ impl BlobStorageProvider for LocalDisk { } } +#[derive(Debug)] +pub enum Error { + /// An error originating from trying to read from source + Fetch(F), + /// An error originating from trying to read or write data from/to backend + Backend(E), +} + +impl std::error::Error for Error {} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Fetch(e) => std::fmt::Display::fmt(&e, f), + Self::Backend(e) => std::fmt::Display::fmt(&e, f), + } + } +} + /// Tries to retrive `blob_name` from `provider`. If it does not exist, /// it calls `fetch` and writes the result into `provider`. /// Returns the data in `blob_name` from `provider`. /// # Implementation /// This function is idempotent but not pure. -pub async fn cached<'a, P, F>( +pub async fn cached( blob_name: &str, fetch: F, provider: &P, -) -> Result, Box> +) -> Result, Error> where - F: futures::Future, Box>>, + E: std::error::Error + Send, + F: futures::Future, E>>, P: BlobStorageProvider, - P::Error: 'a, { log::info!("Fetch {blob_name}"); - if let Some(data) = provider.maybe_get(blob_name).await? { + if let Some(data) = provider + .maybe_get(blob_name) + .await + .map_err(|e| Error::Backend(e))? + { log::info!("{blob_name} - cache hit"); Ok(data) } else { log::info!("{blob_name} - cache miss"); - let contents = fetch.await?; - Ok(provider.put(blob_name, contents).await?) + let contents = fetch.await.map_err(|e| Error::Fetch(e))?; + let data = provider + .put(blob_name, contents) + .await + .map_err(|e| Error::Backend(e))?; + log::info!("{blob_name} - cache write"); + Ok(data) } } diff --git a/src/fs_azure.rs b/src/fs_azure.rs index 8f160d6..b289656 100644 --- a/src/fs_azure.rs +++ b/src/fs_azure.rs @@ -1,3 +1,4 @@ +use azure_core::{error::HttpError, StatusCode}; use azure_storage::prelude::*; pub use azure_storage_blobs::prelude::ContainerClient; use azure_storage_blobs::{container::operations::BlobItem, prelude::ClientBuilder}; @@ -5,6 +6,24 @@ use futures::stream::StreamExt; use crate::fs::BlobStorageProvider; +#[derive(Debug)] +pub enum Error { + /// Unspecified error interacting with Azure blob storage + Error(azure_core::Error), + /// Unauthorized error when interacting with Azure blob storage + Unauthorized(azure_core::Error), +} + +impl std::error::Error for Error {} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Error::Unauthorized(e) | Error::Error(e) => e.fmt(f), + } + } +} + /// Lists all blobs in container pub async fn list(client: ContainerClient) -> Result, azure_storage::Error> { let mut result = vec![]; @@ -44,16 +63,27 @@ pub fn initialize_anonymous(account: &str, container: &str) -> ContainerClient { ClientBuilder::new(account, StorageCredentials::anonymous()).container_client(container) } +fn get_code(e: &azure_core::Error) -> Option { + let a = e.get_ref()?; + let a = a.downcast_ref::()?; + Some(a.status()) +} + pub struct AzureContainer<'a>(pub &'a ContainerClient); #[async_trait::async_trait] impl BlobStorageProvider for ContainerClient { - type Error = azure_core::Error; + type Error = Error; #[must_use] async fn maybe_get(&self, blob_name: &str) -> Result>, Self::Error> { - if exists(self, blob_name).await? { - Ok(Some(self.blob_client(blob_name).get_content().await?)) + if exists(self, blob_name).await.map_err(Error::Error)? { + Ok(Some( + self.blob_client(blob_name) + .get_content() + .await + .map_err(Error::Error)?, + )) } else { Ok(None) } @@ -64,7 +94,14 @@ impl BlobStorageProvider for ContainerClient { self.blob_client(blob_name) .put_block_blob(contents.clone()) .content_type("text/plain") - .await?; + .await + .map_err(|e| { + if get_code(&e) == Some(StatusCode::Unauthorized) { + Error::Unauthorized(e) + } else { + Error::Error(e) + } + })?; Ok(contents) } } diff --git a/src/icao_to_trace.rs b/src/icao_to_trace.rs index 6950cf7..fedcf77 100644 --- a/src/icao_to_trace.rs +++ b/src/icao_to_trace.rs @@ -45,10 +45,11 @@ fn cache_file_path(icao: &str, date: &time::Date) -> String { format!("{DIRECTORY}/{DATABASE}/{date}/trace_full_{icao}.json") } -async fn globe_history( - icao: &str, - date: &time::Date, -) -> Result, Box> { +fn to_io_err(error: reqwest::Error) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, error) +} + +async fn globe_history(icao: &str, date: &time::Date) -> Result, std::io::Error> { log::info!("globe_history({icao},{date})"); let referer = format!("https://globe.adsbexchange.com/?icao={icao}&lat=54.448&lon=10.602&zoom=7.0"); @@ -83,9 +84,14 @@ async fn globe_history( .build() .unwrap(); - let response = client.get(url).headers(headers).send().await?; + let response = client + .get(url) + .headers(headers) + .send() + .await + .map_err(to_io_err)?; if response.status() == StatusCode::OK { - Ok(response.bytes().await?.to_vec()) + Ok(response.bytes().await.map_err(to_io_err)?.to_vec()) } else if response.status() == StatusCode::NOT_FOUND { Ok(format!( r#"{{ @@ -97,11 +103,15 @@ async fn globe_history( ) .into_bytes()) } else { - Err("could not retrieve data from globe.adsbexchange.com".into()) + Err(std::io::Error::new::( + std::io::ErrorKind::Other, + "could not retrieve data from globe.adsbexchange.com".into(), + ) + .into()) } } -/// Returns a map between tail number (e.g. "OYTWM": "45D2ED") +/// Returns a map between tail number (e.g. "OYTWM": "45D2ED") /// Caches to disk the first time it is executed async fn globe_history_cached( icao: &str, @@ -111,10 +121,26 @@ async fn globe_history_cached( let blob_name = cache_file_path(icao, date); let fetch = globe_history(&icao, date); - match client { - Some(client) => crate::fs::cached(&blob_name, fetch, client).await, - None => crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk).await, - } + Ok(match client { + Some(client) => { + let result = crate::fs::cached(&blob_name, fetch, client).await; + if matches!( + result, + Err(crate::fs::Error::Backend(fs_azure::Error::Unauthorized(_))) + ) { + log::warn!("{blob_name} - Unauthorized - fall back to local disk"); + crate::fs::cached( + &blob_name, + globe_history(&icao, date), + &crate::fs::LocalDisk, + ) + .await? + } else { + result? + } + } + None => crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk).await?, + }) } /// Returns the trace of the icao number of a given day from https://adsbexchange.com.