diff --git a/Cargo.toml b/Cargo.toml index 17a18ff..60d9fe2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ reqwest-middleware = "*" rand = {version="*", default_features = false, features = ["std", "std_rng", "getrandom"]} # to perform time-based calculations -time = {version="*", default_features = false, features = ["formatting", "parsing", "macros"]} +time = {version="*", default_features = false, features = ["formatting", "parsing", "macros", "serde"]} # compute distances between geo-points geoutils = {version="*", default_features = false} @@ -33,11 +33,10 @@ futures = "0.3" # logging log = "*" -# azure integration -azure_storage = "*" -azure_storage_blobs = "*" -azure_core = "*" -bytes = "1.5" +# S3 integration +aws-config = { version = "1.1.4", features = ["behavior-version-latest"] } +aws-sdk-s3 = "*" +aws-credential-types = "*" [dev-dependencies] tinytemplate = "1.1" diff --git a/README.md b/README.md index 1d44585..9af0d1e 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This repository contains a CLI application to analyze flights of private jets. -It is supported by an Azure Blob storage container for caching data, thereby +It is supported by an S3 Blob storage container for caching data, thereby reducing its impact to [https://adsbexchange.com/](https://adsbexchange.com/). ## Risk and impact @@ -20,9 +20,9 @@ to cache all hits to [https://adsbexchange.com/](https://adsbexchange.com/) on an horizontally scaled remote storage and therefore remove its impact to adsbexchange.com of future calls. -All data cached is available on Azure blob storage: -* account: `privatejets` -* container: `data` +All cached data is available on S3 blob storage at endpoint + +> `https://private-jets.fra1.digitaloceanspaces.com` and has anonymous and public read permissions. @@ -35,20 +35,19 @@ to perform actual calculations. To use one of such examples: 2. run `cargo run --example single_day -- --tail-number "OY-GFS" --date "2023-10-20"` 3. open `OY-GFS_2023-10-20_0.md` -Step 2. has an optional argument, `--azure-sas-token`, specifying an Azure storage SAS -token. -When used, cache is written to the remote container, as opposed to disk. +Step 2. has an optional arguments, `--access-key`, `--secret-access-key`, specifying +credentials to write to the remote storate, as opposed to disk. -Finally, setting `--backend disk` ignores the Azure's remote storage altogether and +Finally, setting `--backend disk` ignores the remote storage altogether and only uses disk for caching (resulting in higher cache misses and thus more interactions with ADS-B exchange). In general: * Use the default parameters when creating ad-hoc stories -* Use `--azure-sas-token` when improving the database with new data. +* Use `--access-key` when improving the database with new data. * Use `--backend disk` when testing the caching system -As of today, the flag `--azure-sas-token` is only available when the code is executed +As of today, the flag `--access-key` is only available when the code is executed from `main`, as writing to the blob storage must be done through a controlled code base that preserves data integrity. @@ -62,9 +61,8 @@ cargo run --example country -- --from=2024-01-13 --to=2024-01-21 --country=denma # Story about Portuguese private jets that flew between two dates cargo run --example country -- --from=2024-01-13 --to=2024-01-21 --country=portugal -# Story about German private jets that flew between in 2023, where the azure-sas-token -# is on the file token.txt -cargo run --example country -- --from=2023-01-01 --to=2024-01-01 --country=germany --azure-sas-token=$(cat token.txt) +# Story about German private jets that flew in 2023, where secret is on a file +cargo run --example country -- --from=2023-01-01 --to=2024-01-01 --country=germany --access-key=DO00AUDGL32QLFKV8CEP --secret-access-key=$(cat secrets.txt) ``` ## Methodology @@ -75,5 +73,5 @@ The methodology used to extract information is available at [`methodology.md`](. ### Set of worldwide aicrafts whose primary use is to be a private jet: -* [Data](https://privatejets.blob.core.windows.net/data/database/private_jets/2023/11/06/data.csv) -* [Description](https://privatejets.blob.core.windows.net/data/database/private_jets/2023/11/06/description.md) +* [Data](https://private-jets.fra1.digitaloceanspaces.com/private_jets/2023/11/06/data.csv) +* [Description](https://private-jets.fra1.digitaloceanspaces.com/private_jets/2023/11/06/description.md) diff --git a/examples/cache_state.rs b/examples/cache_state.rs index 9ee0e8c..72ae31d 100644 --- a/examples/cache_state.rs +++ b/examples/cache_state.rs @@ -6,7 +6,7 @@ use itertools::Itertools; use flights::Aircraft; async fn private_jets( - client: Option<&flights::fs_azure::ContainerClient>, + client: Option<&flights::fs_s3::ContainerClient>, ) -> Result, Box> { // load datasets to memory let aircrafts = flights::load_aircrafts(client).await?; @@ -29,7 +29,7 @@ async fn main() -> Result<(), Box> { }) .collect::>(); - let client = flights::fs_azure::initialize_anonymous("privatejets", "data"); + let client = flights::fs_s3::anonymous_client().await; let existing = flights::existing_months_positions(&client).await?; diff --git a/examples/country.rs b/examples/country.rs index 571c26d..503c1b5 100644 --- a/examples/country.rs +++ b/examples/country.rs @@ -70,7 +70,7 @@ pub struct Context { #[derive(clap::ValueEnum, Debug, Clone)] enum Backend { Disk, - Azure, + Remote, } fn parse_date(arg: &str) -> Result { @@ -181,10 +181,13 @@ impl Country { #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Cli { - /// The Azure token + /// The token to the remote storage #[arg(long)] - azure_sas_token: Option, - #[arg(long, value_enum, default_value_t=Backend::Azure)] + access_key: Option, + /// The token to the remote storage + #[arg(long)] + secret_access_key: Option, + #[arg(long, value_enum, default_value_t=Backend::Remote)] backend: Backend, /// Name of the country to compute on @@ -213,7 +216,7 @@ async fn legs( to: Date, icao_number: &str, location: Option, - client: Option<&flights::fs_azure::ContainerClient>, + client: Option<&flights::fs_s3::ContainerClient>, ) -> Result, Box> { let positions = flights::aircraft_positions(from, to, icao_number, client).await?; let mut positions = positions @@ -278,18 +281,13 @@ async fn main() -> Result<(), Box> { let cli = Cli::parse(); - // optionally initialize Azure client - let client = match (cli.backend, cli.azure_sas_token) { - (Backend::Disk, None) => None, - (Backend::Azure, None) => Some(flights::fs_azure::initialize_anonymous( - "privatejets", - "data", - )), - (_, Some(token)) => Some(flights::fs_azure::initialize_sas( - &token, - "privatejets", - "data", - )?), + // initialize client + let client = match (cli.backend, cli.access_key, cli.secret_access_key) { + (Backend::Disk, _, _) => None, + (_, Some(access_key), Some(secret_access_key)) => { + Some(flights::fs_s3::client(access_key, secret_access_key).await) + } + (Backend::Remote, _, _) => Some(flights::fs_s3::anonymous_client().await), }; // load datasets to memory diff --git a/examples/export_legs.rs b/examples/export_legs.rs index f068baf..3211a03 100644 --- a/examples/export_legs.rs +++ b/examples/export_legs.rs @@ -12,9 +12,12 @@ const ABOUT: &'static str = r#"Builds the database of all private jet positions #[derive(Parser, Debug)] #[command(author, version, about = ABOUT)] struct Cli { - /// The Azure token - #[arg(short, long)] - azure_sas_token: Option, + /// The token to the remote storage + #[arg(long)] + access_key: String, + /// The token to the remote storage + #[arg(long)] + secret_access_key: String, } #[tokio::main(flavor = "multi_thread")] @@ -26,11 +29,7 @@ async fn main() -> Result<(), Box> { let cli = Cli::parse(); - // optionally initialize Azure client - let client = match cli.azure_sas_token.clone() { - None => flights::fs_azure::initialize_anonymous("privatejets", "data"), - Some(token) => flights::fs_azure::initialize_sas(&token, "privatejets", "data")?, - }; + let client = flights::fs_s3::client(cli.access_key, cli.secret_access_key).await; // load datasets to memory let aircrafts = load_aircrafts(Some(&client)).await?; diff --git a/examples/export_private_jets.rs b/examples/export_private_jets.rs index c830618..1e06ebc 100644 --- a/examples/export_private_jets.rs +++ b/examples/export_private_jets.rs @@ -9,21 +9,24 @@ use flights::{load_aircrafts, load_private_jet_models}; #[derive(clap::ValueEnum, Debug, Clone)] enum Backend { Disk, - Azure, + Remote, } const ABOUT: &'static str = r#"Exports the database of all worldwide aircrafts whose primary use is to be a private jet to "data.csv" and its description at `description.md` (in disk). -If `azure_sas_token` is provided, data is written to the public blob storage instead. +If `access_key` and `secret_access_key` is provided, data is written to the public blob storage instead. "#; #[derive(Parser, Debug)] #[command(author, version, about = ABOUT)] struct Cli { - /// The Azure token - #[arg(short, long)] - azure_sas_token: Option, - #[arg(short, long, value_enum, default_value_t=Backend::Azure)] + /// The token to the remote storage + #[arg(long)] + access_key: Option, + /// The token to the remote storage + #[arg(long)] + secret_access_key: Option, + #[arg(short, long, value_enum, default_value_t=Backend::Remote)] backend: Backend, } @@ -36,18 +39,13 @@ async fn main() -> Result<(), Box> { let cli = Cli::parse(); - // optionally initialize Azure client - let client = match (cli.backend, cli.azure_sas_token.clone()) { - (Backend::Disk, None) => None, - (Backend::Azure, None) => Some(flights::fs_azure::initialize_anonymous( - "privatejets", - "data", - )), - (_, Some(token)) => Some(flights::fs_azure::initialize_sas( - &token, - "privatejets", - "data", - )?), + // initialize client + let client = match (cli.backend, cli.access_key, cli.secret_access_key) { + (Backend::Disk, _, _) => None, + (_, Some(access_key), Some(secret_access_key)) => { + Some(flights::fs_s3::client(access_key, secret_access_key).await) + } + (Backend::Remote, _, _) => Some(flights::fs_s3::anonymous_client().await), }; // load datasets to memory @@ -77,7 +75,7 @@ It contains 3 columns: Both `icao_number` and `tail_number` are unique keys (independently). "#; - if cli.azure_sas_token.is_some() { + if client.as_ref().map(|c| c.can_put()).unwrap_or(false) { let client = client.unwrap(); client .put("database/private_jets/2023/11/06/data.csv", data_csv) diff --git a/examples/period.rs b/examples/period.rs index 75b985c..fd3256a 100644 --- a/examples/period.rs +++ b/examples/period.rs @@ -40,7 +40,7 @@ fn render(context: &Context) -> Result<(), Box> { #[derive(clap::ValueEnum, Debug, Clone)] enum Backend { Disk, - Azure, + Remote, } fn parse_date(arg: &str) -> Result { @@ -53,10 +53,13 @@ fn parse_date(arg: &str) -> Result { #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Cli { - /// The Azure token + /// The token to the remote storage #[arg(long)] - azure_sas_token: Option, - #[arg(long, value_enum, default_value_t=Backend::Azure)] + access_key: Option, + /// The token to the remote storage + #[arg(long)] + secret_access_key: Option, + #[arg(long, value_enum, default_value_t=Backend::Remote)] backend: Backend, /// The tail number @@ -79,18 +82,13 @@ async fn main() -> Result<(), Box> { let cli = Cli::parse(); - // optionally initialize Azure client - let client = match (cli.backend, cli.azure_sas_token) { - (Backend::Disk, None) => None, - (Backend::Azure, None) => Some(flights::fs_azure::initialize_anonymous( - "privatejets", - "data", - )), - (_, Some(token)) => Some(flights::fs_azure::initialize_sas( - &token, - "privatejets", - "data", - )?), + // initialize client + let client = match (cli.backend, cli.access_key, cli.secret_access_key) { + (Backend::Disk, _, _) => None, + (_, Some(access_key), Some(secret_access_key)) => { + Some(flights::fs_s3::client(access_key, secret_access_key).await) + } + (Backend::Remote, _, _) => Some(flights::fs_s3::anonymous_client().await), }; // load datasets to memory diff --git a/examples/single_day.rs b/examples/single_day.rs index 73b2485..754925e 100644 --- a/examples/single_day.rs +++ b/examples/single_day.rs @@ -32,7 +32,7 @@ pub struct Context { #[derive(clap::ValueEnum, Debug, Clone)] enum Backend { Disk, - Azure, + Remote, } const ABOUT: &'static str = r#"Writes a markdown file per leg (named `{tail-number}_{date}_{leg}.md`) on disk with a description of: @@ -54,11 +54,14 @@ struct Cli { /// The date in format `yyyy-mm-dd` #[arg(short, long, value_parser = parse_date)] date: time::Date, - /// Optional azure token to write any new data to the blob storage - #[arg(short, long)] - azure_sas_token: Option, + /// The token to the remote storage + #[arg(long)] + access_key: Option, + /// The token to the remote storage + #[arg(long)] + secret_access_key: Option, /// The backend to read cached data from. - #[arg(short, long, value_enum, default_value_t=Backend::Azure)] + #[arg(short, long, value_enum, default_value_t=Backend::Remote)] backend: Backend, } @@ -75,7 +78,7 @@ async fn flight_date( owners: &Owners, aircraft_owners: &AircraftOwners, aircrafts: &Aircrafts, - client: Option<&fs_azure::ContainerClient>, + client: Option<&fs_s3::ContainerClient>, ) -> Result, Box> { let models = load_private_jet_models()?; let airports = airports_cached().await?; @@ -177,18 +180,13 @@ async fn main() -> Result<(), Box> { let cli = Cli::parse(); - // optionally initialize Azure client - let client = match (cli.backend, cli.azure_sas_token) { - (Backend::Disk, None) => None, - (Backend::Azure, None) => Some(flights::fs_azure::initialize_anonymous( - "privatejets", - "data", - )), - (_, Some(token)) => Some(flights::fs_azure::initialize_sas( - &token, - "privatejets", - "data", - )?), + // initialize client + let client = match (cli.backend, cli.access_key, cli.secret_access_key) { + (Backend::Disk, _, _) => None, + (_, Some(access_key), Some(secret_access_key)) => { + Some(flights::fs_s3::client(access_key, secret_access_key).await) + } + (Backend::Remote, _, _) => Some(flights::fs_s3::anonymous_client().await), }; let owners = load_owners()?; diff --git a/methodology.md b/methodology.md index b731529..8849d18 100644 --- a/methodology.md +++ b/methodology.md @@ -61,7 +61,7 @@ has a continuous sequence of ADS-B positions in time where the aircraft is flyin The aircraft at a given segment between two ADS-B positions is considered grounded (not flying) when any of: 1. both positions are on the ground 2. the time between these positions is > 5m and any of the positions is below 10.000 feet -3. the time between these positions is > 10h +3. the time between these positions is > 4h Condition 1. is the normal case where ADS-B signal was received when the aircraft landed. Condition 2. is used to mitigate the risk that ADS-B receivers sometimes diff --git a/src/aircraft_db.rs b/src/aircraft_db.rs index 91a7bf7..7ae7fe7 100644 --- a/src/aircraft_db.rs +++ b/src/aircraft_db.rs @@ -7,7 +7,7 @@ use reqwest; use serde::{Deserialize, Serialize}; use serde_json; -use crate::{fs, fs_azure}; +use crate::{fs, fs_s3}; /// [`HashMap`] between tail number (e.g. "OY-TWM") and an [`Aircraft`] pub type Aircrafts = HashMap; @@ -26,10 +26,9 @@ pub struct Aircraft { } static DATABASE: &'static str = "db-20231106"; -static DIRECTORY: &'static str = "database"; fn cache_file_path(prefix: &str) -> String { - format!("{DIRECTORY}/{DATABASE}/{prefix}.json") + format!("{DATABASE}/{prefix}.json") } fn url(prefix: &str) -> String { @@ -48,7 +47,7 @@ async fn aircrafts(prefix: &str) -> Result, reqwest::Error> { /// Caches to disk or remote storage the first time it is executed async fn aircrafts_prefixed( prefix: String, - client: Option<&fs_azure::ContainerClient>, + client: Option<&fs_s3::ContainerClient>, ) -> Result<(String, HashMap>>), String> { let blob_name = cache_file_path(&prefix); let fetch = aircrafts(&prefix); @@ -76,7 +75,7 @@ async fn aircrafts_prefixed( #[async_recursion] async fn children<'a: 'async_recursion>( entries: &mut HashMap>>, - client: Option<&'a fs_azure::ContainerClient>, + client: Option<&'a fs_s3::ContainerClient>, ) -> Result>>)>, String> { let Some(entries) = entries.remove("children") else { return Ok(Default::default()); @@ -109,7 +108,7 @@ async fn children<'a: 'async_recursion>( /// This function is idempotent but not pure: it caches every https request either to disk or remote storage /// to not penalize adsbexchange.com pub async fn load_aircrafts( - client: Option<&fs_azure::ContainerClient>, + client: Option<&fs_s3::ContainerClient>, ) -> Result> { let prefixes = (b'A'..=b'F').chain(b'0'..b'9'); let prefixes = prefixes.map(|x| std::str::from_utf8(&[x]).unwrap().to_string()); diff --git a/src/fs.rs b/src/fs.rs index 8ca6f32..0f6dcf2 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; pub trait BlobStorageProvider { type Error: std::error::Error + Send; async fn maybe_get(&self, blob_name: &str) -> Result>, Self::Error>; - async fn put(&self, blob_name: &str, contents: Vec) -> Result, Self::Error>; + async fn put(&self, blob_name: &str, contents: Vec) -> Result<(), Self::Error>; fn can_put(&self) -> bool; } @@ -27,12 +27,12 @@ impl BlobStorageProvider for LocalDisk { } #[must_use] - async fn put(&self, blob_name: &str, contents: Vec) -> Result, Self::Error> { + async fn put(&self, blob_name: &str, contents: Vec) -> Result<(), Self::Error> { let mut dir: std::path::PathBuf = blob_name.into(); dir.pop(); std::fs::create_dir_all(dir)?; std::fs::write(blob_name, &contents)?; - Ok(contents) + Ok(()) } fn can_put(&self) -> bool { @@ -105,11 +105,11 @@ where log::info!("{blob_name} - cache do not write"); return Ok(contents); }; - let data = provider - .put(blob_name, contents) + provider + .put(blob_name, contents.clone()) .await .map_err(|e| Error::Backend(e))?; log::info!("{blob_name} - cache write"); - Ok(data) + Ok(contents) } } diff --git a/src/fs_azure.rs b/src/fs_azure.rs deleted file mode 100644 index e45c235..0000000 --- a/src/fs_azure.rs +++ /dev/null @@ -1,105 +0,0 @@ -use azure_storage::prelude::*; -use azure_storage_blobs::prelude::ClientBuilder; -pub use azure_storage_blobs::prelude::ContainerClient as _ContainerClient; - -use crate::fs::BlobStorageProvider; - -pub struct ContainerClient { - pub client: _ContainerClient, - can_put: bool, -} - -/// Returns whether the blob exists in container -async fn exists(client: &ContainerClient, blob_name: &str) -> Result { - client.client.blob_client(blob_name).exists().await -} - -/// Initialize a [`ContainerClient`] using SAS token -pub fn initialize_sas( - token: &str, - account: &str, - container: &str, -) -> azure_core::Result { - StorageCredentials::sas_token(token) - .map(|credentials| ClientBuilder::new(account, credentials).container_client(container)) - .map(|client| ContainerClient { - client, - can_put: true, - }) -} - -/// Initialize an anonymous [`ContainerClient`] -pub fn initialize_anonymous(account: &str, container: &str) -> ContainerClient { - let client = - ClientBuilder::new(account, StorageCredentials::anonymous()).container_client(container); - - ContainerClient { - client, - can_put: false, - } -} - -#[async_trait::async_trait] -impl BlobStorageProvider for ContainerClient { - type Error = azure_core::Error; - - #[must_use] - async fn maybe_get(&self, blob_name: &str) -> Result>, Self::Error> { - if exists(self, blob_name).await? { - Ok(Some( - self.client.blob_client(blob_name).get_content().await?, - )) - } else { - Ok(None) - } - } - - #[must_use] - async fn put(&self, blob_name: &str, contents: Vec) -> Result, Self::Error> { - self.client - .blob_client(blob_name) - .put_block_blob(contents.clone()) - .content_type("text/plain") - .await?; - Ok(contents) - } - - fn can_put(&self) -> bool { - self.can_put - } -} - -/// * read from azure -/// * if not found and can't write to azure => read disk and write to disk -/// * if not found and can write to azure => fetch and write -pub(crate) async fn cached_call, std::io::Error>>>( - blob_name: &str, - fetch: F, - action: crate::fs::CacheAction, - client: Option<&ContainerClient>, -) -> Result, std::io::Error> { - let Some(client) = client else { - return Ok( - crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action) - .await - .map_err(std::io::Error::other)?, - ); - }; - - let Some(data) = client - .maybe_get(blob_name) - .await - .map_err(std::io::Error::other)? - else { - return Ok(if !client.can_put() { - crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action) - .await - .map_err(std::io::Error::other)? - } else { - crate::fs::cached(&blob_name, fetch, client, action) - .await - .map_err(std::io::Error::other)? - }); - }; - Ok(data) -} diff --git a/src/fs_s3.rs b/src/fs_s3.rs new file mode 100644 index 0000000..ae5ee75 --- /dev/null +++ b/src/fs_s3.rs @@ -0,0 +1,216 @@ +use std::fmt::Display; + +use aws_credential_types::provider::ProvideCredentials; +use aws_sdk_s3::{ + config::Credentials, error::SdkError, operation::head_object::HeadObjectError, + primitives::ByteStream, +}; + +use crate::fs::BlobStorageProvider; + +#[derive(Clone, Debug)] +pub struct Error(String); + +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + +impl std::error::Error for Error {} + +impl From for Error { + fn from(value: String) -> Self { + Error(value) + } +} + +pub struct ContainerClient { + pub client: aws_sdk_s3::Client, + pub bucket: String, + can_put: bool, +} + +/// Returns whether the blob exists +async fn exists(client: &ContainerClient, blob_name: &str) -> Result { + let head_object_output = client + .client + .head_object() + .bucket(&client.bucket) + .key(blob_name) + .send() + .await; + + match head_object_output { + Ok(_) => Ok(true), + Err(err) => match &err { + SdkError::ServiceError(e) => { + if matches!(e.err(), HeadObjectError::NotFound(_)) { + Ok(false) + } else { + Err(format!("{err:?}").into()) + } + } + _ => Err(format!("{err:?}").into()), + }, + } +} + +async fn get(client: &ContainerClient, blob_name: &str) -> Result, Error> { + let object = client + .client + .get_object() + .bucket(&client.bucket) + .key(blob_name) + .send() + .await + .map_err(|e| Error::from(format!("{e:?}")))?; + + object + .body + .collect() + .await + .map(|x| x.into_bytes().to_vec()) + .map_err(|e| format!("{e:?}").into()) +} + +async fn put(client: &ContainerClient, blob_name: &str, content: Vec) -> Result<(), Error> { + let stream = ByteStream::from(content); + + client + .client + .put_object() + .bucket(&client.bucket) + .key(blob_name) + .body(stream) + .send() + .await + .map_err(|e| Error::from(format!("{e:?}"))) + .map(|_| ()) +} + +#[derive(Debug)] +struct Provider { + access_key: String, + secret_access_key: String, +} + +impl ProvideCredentials for Provider { + fn provide_credentials<'a>( + &'a self, + ) -> aws_credential_types::provider::future::ProvideCredentials<'a> + where + Self: 'a, + { + let access_key = self.access_key.clone(); + let secret_access_key = self.secret_access_key.clone(); + aws_credential_types::provider::future::ProvideCredentials::new(async { + Ok(Credentials::new( + access_key, + secret_access_key, + None, + None, + "example", + )) + }) + } +} + +/// Initialize a [`ContainerClient`] access key and secret access key +pub async fn client(access_key: String, secret_access_key: String) -> ContainerClient { + let provider = Provider { + access_key, + secret_access_key, + }; + + let config = aws_config::ConfigLoader::default() + .behavior_version(aws_config::BehaviorVersion::latest()) + .region("fra1") + .endpoint_url("https://fra1.digitaloceanspaces.com") + .credentials_provider(provider) + .load() + .await; + let client = aws_sdk_s3::Client::new(&config); + + ContainerClient { + client, + bucket: "private-jets".to_string(), + can_put: true, + } +} + +/// Initialize an anonymous [`ContainerClient`] +pub async fn anonymous_client() -> ContainerClient { + let config = aws_config::ConfigLoader::default() + .behavior_version(aws_config::BehaviorVersion::latest()) + .region("fra1") + .endpoint_url("https://fra1.digitaloceanspaces.com") + .no_credentials() + .load() + .await; + let client = aws_sdk_s3::Client::new(&config); + + ContainerClient { + client, + bucket: "private-jets".to_string(), + can_put: false, + } +} + +#[async_trait::async_trait] +impl BlobStorageProvider for ContainerClient { + type Error = Error; + + #[must_use] + async fn maybe_get(&self, blob_name: &str) -> Result>, Self::Error> { + if exists(self, blob_name).await? { + Ok(Some(get(&self, blob_name).await?)) + } else { + Ok(None) + } + } + + #[must_use] + async fn put(&self, blob_name: &str, contents: Vec) -> Result<(), Self::Error> { + put(&self, blob_name, contents).await + } + + fn can_put(&self) -> bool { + self.can_put + } +} + +/// * read from remote +/// * if not found and can't write to remote => read disk and write to disk +/// * if not found and can write to remote => fetch and write +pub(crate) async fn cached_call, std::io::Error>>>( + blob_name: &str, + fetch: F, + action: crate::fs::CacheAction, + client: Option<&ContainerClient>, +) -> Result, std::io::Error> { + let Some(client) = client else { + return Ok( + crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action) + .await + .map_err(std::io::Error::other)?, + ); + }; + + let Some(data) = client + .maybe_get(blob_name) + .await + .map_err(std::io::Error::other)? + else { + return Ok(if !client.can_put() { + crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action) + .await + .map_err(std::io::Error::other)? + } else { + crate::fs::cached(&blob_name, fetch, client, action) + .await + .map_err(std::io::Error::other)? + }); + }; + Ok(data) +} diff --git a/src/icao_to_trace.rs b/src/icao_to_trace.rs index 3bb5824..89c9a41 100644 --- a/src/icao_to_trace.rs +++ b/src/icao_to_trace.rs @@ -11,7 +11,7 @@ use time::Date; use time::PrimitiveDateTime; use super::Position; -use crate::{fs, fs_azure}; +use crate::{fs, fs_s3}; fn last_2(icao: &str) -> &str { let bytes = icao.as_bytes(); @@ -42,11 +42,10 @@ fn adsbx_sid() -> String { format!("{time}_{random_chars}") } -pub(crate) static DIRECTORY: &'static str = "database"; pub(crate) static DATABASE: &'static str = "globe_history"; fn cache_file_path(icao: &str, date: &time::Date) -> String { - format!("{DIRECTORY}/{DATABASE}/{date}/trace_full_{icao}.json") + format!("{DATABASE}/{date}/trace_full_{icao}.json") } fn to_io_err(error: reqwest::Error) -> std::io::Error { @@ -123,13 +122,13 @@ async fn globe_history(icao: &str, date: &time::Date) -> Result, std::io async fn globe_history_cached( icao: &str, date: &time::Date, - client: Option<&fs_azure::ContainerClient>, + client: Option<&fs_s3::ContainerClient>, ) -> Result, std::io::Error> { let blob_name = cache_file_path(icao, date); let action = fs::CacheAction::from_date(&date); let fetch = globe_history(&icao, date); - Ok(fs_azure::cached_call(&blob_name, fetch, action, client).await?) + Ok(fs_s3::cached_call(&blob_name, fetch, action, client).await?) } /// Returns the trace of the icao number of a given day from https://adsbexchange.com. @@ -147,7 +146,7 @@ async fn globe_history_cached( pub async fn trace_cached( icao: &str, date: &time::Date, - client: Option<&fs_azure::ContainerClient>, + client: Option<&fs_s3::ContainerClient>, ) -> Result, std::io::Error> { let data = globe_history_cached(icao, date, client).await?; @@ -169,7 +168,7 @@ pub async fn trace_cached( pub async fn positions( icao_number: &str, date: time::Date, - client: Option<&fs_azure::ContainerClient>, + client: Option<&fs_s3::ContainerClient>, ) -> Result, std::io::Error> { use time::ext::NumericalDuration; let icao: Arc = icao_number.to_string().into(); @@ -212,7 +211,7 @@ pub(crate) async fn cached_aircraft_positions( from: Date, to: Date, icao_number: &str, - client: Option<&super::fs_azure::ContainerClient>, + client: Option<&fs_s3::ContainerClient>, ) -> Result>, std::io::Error> { let dates = super::DateIter { from, diff --git a/src/lib.rs b/src/lib.rs index f3db614..85574b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ mod airports; mod csv; mod emissions; pub(crate) mod fs; -pub mod fs_azure; +pub mod fs_s3; mod icao_to_trace; mod legs; mod model; diff --git a/src/trace_month.rs b/src/trace_month.rs index f4696a6..5326ef3 100644 --- a/src/trace_month.rs +++ b/src/trace_month.rs @@ -4,18 +4,16 @@ use std::{ sync::Arc, }; -use azure_storage_blobs::container::operations::BlobItem; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use time::Date; use super::Position; -use crate::{cached_aircraft_positions, fs, fs_azure}; +use crate::{cached_aircraft_positions, fs, fs_s3}; -static DIRECTORY: &'static str = "database"; static DATABASE: &'static str = "trace"; fn blob_name_to_pk(blob: &str) -> (Arc, time::Date) { - let bla = &blob["database/trace/icao_number=".len()..]; + let bla = &blob["trace/icao_number=".len()..]; let end = bla.find("/").unwrap(); let icao = &bla[..end]; let date_start = end + "/month=".len(); @@ -37,7 +35,7 @@ fn blob_name_to_pk(blob: &str) -> (Arc, time::Date) { fn pk_to_blob_name(icao: &str, date: &time::Date) -> String { format!( - "{DIRECTORY}/{DATABASE}/icao_number={icao}/month={}-{:02}/data.json", + "{DATABASE}/icao_number={icao}/month={}-{:02}/data.json", date.year(), date.month() as u8 ) @@ -63,7 +61,7 @@ fn get_month(current: &time::Date) -> (time::Date, time::Date) { pub async fn month_positions( month: time::Date, icao_number: &str, - client: Option<&super::fs_azure::ContainerClient>, + client: Option<&fs_s3::ContainerClient>, ) -> Result>, Box> { log::info!("month_positions({month},{icao_number})"); assert_eq!(month.day(), 1); @@ -86,7 +84,7 @@ pub async fn month_positions( Ok(bytes) }; - let r = fs_azure::cached_call(&blob_name, fetch, action, client).await?; + let r = fs_s3::cached_call(&blob_name, fetch, action, client).await?; Ok(serde_json::from_slice(&r)?) } @@ -94,13 +92,13 @@ pub async fn month_positions( /// # Implementation /// This function is idempotent but not pure: /// * the data is retrieved from `https://globe.adsbexchange.com` -/// * the call is cached on local disk or Azure Blob (depending on `client` configuration) +/// * the call is cached on local disk or Remote Blob (depending on `client` configuration) /// * the data is retrieved in batches of months and cached, to reduce IO pub async fn aircraft_positions( from: Date, to: Date, icao_number: &str, - client: Option<&super::fs_azure::ContainerClient>, + client: Option<&fs_s3::ContainerClient>, ) -> Result>, Box> { let dates = super::DateIter { from, @@ -142,39 +140,30 @@ pub async fn aircraft_positions( } /// Returns the set of (icao, month) that exists in the db -pub fn existing_months_positions_stream( - client: &fs_azure::ContainerClient, -) -> impl Stream, time::Date)>, azure_storage::Error>> { - client +pub async fn existing_months_positions( + client: &fs_s3::ContainerClient, +) -> Result, time::Date)>, fs_s3::Error> { + Ok(client .client - .list_blobs() - .prefix(format!("{DIRECTORY}/{DATABASE}/")) - .into_stream() + .list_objects_v2() + .bucket(&client.bucket) + .prefix(format!("{DATABASE}/")) + .into_paginator() + .send() + .try_collect() + .await + .map_err(|e| fs_s3::Error::from(e.to_string()))? + .into_iter() .map(|response| { - let blobs = response?.blobs; - Ok::<_, azure_core::Error>( - blobs - .items - .into_iter() - .filter_map(|blob| match blob { - BlobItem::Blob(blob) => Some(blob.name), - BlobItem::BlobPrefix(_) => None, - }) - .map(|blob| blob_name_to_pk(&blob)) - .collect::>(), - ) + response + .contents() + .iter() + .filter_map(|blob| blob.key()) + .map(|blob| blob_name_to_pk(&blob)) + .collect::>() }) -} - -/// Returns the set of (icao, month) that exists in the db -pub async fn existing_months_positions( - client: &fs_azure::ContainerClient, -) -> Result, time::Date)>, Box> { - let r = existing_months_positions_stream(client) - .try_collect::>() - .await?; - - Ok(r.into_iter().flatten().collect()) + .flatten() + .collect()) } #[cfg(test)] diff --git a/tests/it/main.rs b/tests/it/main.rs index 3a547a2..c0cae83 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -71,7 +71,7 @@ async fn legs( from: Date, to: Date, icao_number: &str, - client: Option<&flights::fs_azure::ContainerClient>, + client: Option<&flights::fs_s3::ContainerClient>, ) -> Result, Box> { let positions = flights::aircraft_positions(from, to, icao_number, client).await?; let mut positions = positions @@ -127,8 +127,8 @@ async fn case_45c824_2023_12_12() -> Result<(), Box> { } #[tokio::test] -async fn fs_azure() -> Result<(), Box> { - let client = flights::fs_azure::initialize_anonymous("privatejets", "data"); +async fn fs_s3() -> Result<(), Box> { + let client = flights::fs_s3::anonymous_client().await; let _ = flights::positions("459cd3", date!(2020 - 01 - 01), Some(&client)).await?; Ok(())