diff --git a/examples/dk_jets.rs b/examples/dk_jets.rs index de84cac..328a20c 100644 --- a/examples/dk_jets.rs +++ b/examples/dk_jets.rs @@ -72,7 +72,7 @@ async fn legs( aircraft: &Aircraft, client: Option<&flights::fs_azure::ContainerClient>, ) -> Result, Box> { - let positions = flights::aircraft_positions(from, to, aircraft, client).await?; + let positions = flights::cached_aircraft_positions(from, to, aircraft, client).await?; let mut positions = positions .into_iter() .map(|(_, p)| p) diff --git a/examples/period.rs b/examples/period.rs index 83489fb..8fb581f 100644 --- a/examples/period.rs +++ b/examples/period.rs @@ -123,7 +123,8 @@ async fn main() -> Result<(), Box> { let icao = &aircraft.icao_number; log::info!("ICAO number: {}", icao); - let positions = flights::aircraft_positions(from, to, &aircraft, client.as_ref()).await?; + let positions = + flights::cached_aircraft_positions(from, to, &aircraft, client.as_ref()).await?; let mut positions = positions .into_iter() .map(|(_, p)| p) diff --git a/src/fs.rs b/src/fs.rs index 49c4ac0..8ca6f32 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -6,6 +6,8 @@ pub trait BlobStorageProvider { type Error: std::error::Error + Send; async fn maybe_get(&self, blob_name: &str) -> Result>, Self::Error>; async fn put(&self, blob_name: &str, contents: Vec) -> Result, Self::Error>; + + fn can_put(&self) -> bool; } /// A [`BlobStorageProvider`] for local disk @@ -32,6 +34,10 @@ impl BlobStorageProvider for LocalDisk { std::fs::write(blob_name, &contents)?; Ok(contents) } + + fn can_put(&self) -> bool { + true + } } #[derive(Debug)] @@ -59,6 +65,15 @@ pub enum CacheAction { ReadFetch, } +impl CacheAction { + pub fn from_date(date: &time::Date) -> Self { + let now = time::OffsetDateTime::now_utc().date(); + (date >= &now) + .then_some(Self::ReadFetch) + .unwrap_or(Self::ReadFetchWrite) + } +} + /// Tries to retrive `blob_name` from `provider`. If it does not exist, /// it calls `fetch` and writes the result into `provider`. /// Returns the data in `blob_name` from `provider`. @@ -86,7 +101,7 @@ where } else { log::info!("{blob_name} - cache miss"); let contents = fetch.await.map_err(|e| Error::Fetch(e))?; - if action == CacheAction::ReadFetch { + if action == CacheAction::ReadFetch || !provider.can_put() { log::info!("{blob_name} - cache do not write"); return Ok(contents); }; diff --git a/src/fs_azure.rs b/src/fs_azure.rs index 1da4070..25c9093 100644 --- a/src/fs_azure.rs +++ b/src/fs_azure.rs @@ -1,33 +1,19 @@ -use azure_core::{error::HttpError, StatusCode}; use azure_storage::prelude::*; -pub use azure_storage_blobs::prelude::ContainerClient; +pub use azure_storage_blobs::prelude::ContainerClient as _ContainerClient; use azure_storage_blobs::{container::operations::BlobItem, prelude::ClientBuilder}; use futures::stream::StreamExt; use crate::fs::BlobStorageProvider; -#[derive(Debug)] -pub enum Error { - /// Unspecified error interacting with Azure blob storage - Error(azure_core::Error), - /// Unauthorized error when interacting with Azure blob storage - Unauthorized(azure_core::Error), -} - -impl std::error::Error for Error {} - -impl std::fmt::Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Error::Unauthorized(e) | Error::Error(e) => e.fmt(f), - } - } +pub struct ContainerClient { + client: _ContainerClient, + can_put: bool, } /// Lists all blobs in container pub async fn list(client: ContainerClient) -> Result, azure_storage::Error> { let mut result = vec![]; - let mut blobs = client.list_blobs().into_stream(); + let mut blobs = client.client.list_blobs().into_stream(); while let Some(response) = blobs.next().await { result.extend( response? @@ -45,7 +31,7 @@ pub async fn list(client: ContainerClient) -> Result, azure_storage: /// Returns whether the blob exists in container async fn exists(client: &ContainerClient, blob_name: &str) -> Result { - client.blob_client(blob_name).exists().await + client.client.blob_client(blob_name).exists().await } /// Initialize a [`ContainerClient`] using SAS token @@ -56,31 +42,32 @@ pub fn initialize_sas( ) -> azure_core::Result { StorageCredentials::sas_token(token) .map(|credentials| ClientBuilder::new(account, credentials).container_client(container)) + .map(|client| ContainerClient { + client, + can_put: true, + }) } /// Initialize an anonymous [`ContainerClient`] pub fn initialize_anonymous(account: &str, container: &str) -> ContainerClient { - ClientBuilder::new(account, StorageCredentials::anonymous()).container_client(container) -} + let client = + ClientBuilder::new(account, StorageCredentials::anonymous()).container_client(container); -fn get_code(e: &azure_core::Error) -> Option { - let a = e.get_ref()?; - let a = a.downcast_ref::()?; - Some(a.status()) + ContainerClient { + client, + can_put: false, + } } #[async_trait::async_trait] impl BlobStorageProvider for ContainerClient { - type Error = Error; + type Error = azure_core::Error; #[must_use] async fn maybe_get(&self, blob_name: &str) -> Result>, Self::Error> { - if exists(self, blob_name).await.map_err(Error::Error)? { + if exists(self, blob_name).await? { Ok(Some( - self.blob_client(blob_name) - .get_content() - .await - .map_err(Error::Error)?, + self.client.blob_client(blob_name).get_content().await?, )) } else { Ok(None) @@ -89,46 +76,38 @@ impl BlobStorageProvider for ContainerClient { #[must_use] async fn put(&self, blob_name: &str, contents: Vec) -> Result, Self::Error> { - self.blob_client(blob_name) + self.client + .blob_client(blob_name) .put_block_blob(contents.clone()) .content_type("text/plain") - .await - .map_err(|e| { - if get_code(&e) == Some(StatusCode::Unauthorized) { - Error::Unauthorized(e) - } else { - Error::Error(e) - } - })?; + .await?; Ok(contents) } + + fn can_put(&self) -> bool { + self.can_put + } } -pub(crate) async fn cached_call< - F: Fn() -> (crate::fs::CacheAction, G), - G: futures::Future, std::io::Error>>, ->( +/// * read from azure +/// * if not found and can't write to azure => read disk and write to disk +/// * if not found and can write to azure => fetch and write +pub(crate) async fn cached_call, std::io::Error>>>( blob_name: &str, fetch: F, + action: crate::fs::CacheAction, client: Option<&ContainerClient>, ) -> Result, Box> { let Some(client) = client else { - let (action, fetch) = fetch(); return Ok(crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await?); }; - let (action, f) = fetch(); - let result = crate::fs::cached(&blob_name, f, client, action).await; - if matches!( - result, - Err(crate::fs::Error::Backend( - crate::fs_azure::Error::Unauthorized(_) - )) - ) { - log::warn!("{blob_name} - Unauthorized - fall back to local disk"); - let (action, fetch) = fetch(); - Ok(crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await?) - } else { - Ok(result?) - } + let Some(data) = client.maybe_get(blob_name).await? else { + return Ok(if !client.can_put() { + crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await? + } else { + crate::fs::cached(&blob_name, fetch, client, action).await? + }); + }; + Ok(data) } diff --git a/src/icao_to_trace.rs b/src/icao_to_trace.rs index 67372a4..6fbe850 100644 --- a/src/icao_to_trace.rs +++ b/src/icao_to_trace.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::error::Error; use std::sync::Arc; @@ -42,8 +43,8 @@ fn adsbx_sid() -> String { format!("{time}_{random_chars}") } -static DIRECTORY: &'static str = "database"; -static DATABASE: &'static str = "globe_history"; +pub(crate) static DIRECTORY: &'static str = "database"; +pub(crate) static DATABASE: &'static str = "globe_history"; fn cache_file_path(icao: &str, date: &time::Date) -> String { format!("{DIRECTORY}/{DATABASE}/{date}/trace_full_{icao}.json") @@ -125,16 +126,11 @@ async fn globe_history_cached( date: &time::Date, client: Option<&fs_azure::ContainerClient>, ) -> Result, Box> { - let now = time::OffsetDateTime::now_utc().date(); let blob_name = cache_file_path(icao, date); - let fetch = || { - let action = (date >= &now) - .then_some(fs::CacheAction::ReadFetch) - .unwrap_or(fs::CacheAction::ReadFetchWrite); - (action, globe_history(&icao, date)) - }; + let action = fs::CacheAction::from_date(&date); + let fetch = globe_history(&icao, date); - Ok(fs_azure::cached_call(&blob_name, fetch, client).await?) + Ok(fs_azure::cached_call(&blob_name, fetch, action, client).await?) } /// Returns the trace of the icao number of a given day from https://adsbexchange.com. @@ -234,7 +230,7 @@ pub async fn aircraft_positions( to: Date, aircraft: &Aircraft, client: Option<&super::fs_azure::ContainerClient>, -) -> Result)>, Box> { +) -> Result>, Box> { let dates = super::DateIter { from, to, @@ -253,6 +249,8 @@ pub async fn aircraft_positions( futures::stream::iter(tasks) // limit concurrent tasks .buffered(5) - .try_collect::>() + .try_collect() .await } + +pub use crate::trace_month::cached_aircraft_positions; diff --git a/src/lib.rs b/src/lib.rs index 645873e..d8747e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ mod icao_to_trace; mod legs; mod model; mod owners; +mod trace_month; use std::sync::Arc; diff --git a/src/owners.json b/src/owners.json index 7b05bfd..728764a 100644 --- a/src/owners.json +++ b/src/owners.json @@ -18,10 +18,5 @@ "claim": "claims to \"contribute to a sustainable and low-carbon future\".", "source": "https://eurowindenergy.com/about/sustainability", "date": "2023-10-20" - }, - "North Flying A/S": { - "claim": "", - "source": "https://eurowindenergy.com/about/sustainability", - "date": "2023-10-20" } } \ No newline at end of file