Skip to content

Commit

Permalink
Improved caching
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 22, 2024
1 parent 8acaf0f commit d17321b
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 80 deletions.
2 changes: 1 addition & 1 deletion examples/dk_jets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn legs(
aircraft: &Aircraft,
client: Option<&flights::fs_azure::ContainerClient>,
) -> Result<Vec<Leg>, Box<dyn Error>> {
let positions = flights::aircraft_positions(from, to, aircraft, client).await?;
let positions = flights::cached_aircraft_positions(from, to, aircraft, client).await?;
let mut positions = positions
.into_iter()
.map(|(_, p)| p)
Expand Down
3 changes: 2 additions & 1 deletion examples/period.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
let icao = &aircraft.icao_number;
log::info!("ICAO number: {}", icao);

let positions = flights::aircraft_positions(from, to, &aircraft, client.as_ref()).await?;
let positions =
flights::cached_aircraft_positions(from, to, &aircraft, client.as_ref()).await?;
let mut positions = positions
.into_iter()
.map(|(_, p)| p)
Expand Down
17 changes: 16 additions & 1 deletion src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub trait BlobStorageProvider {
type Error: std::error::Error + Send;
async fn maybe_get(&self, blob_name: &str) -> Result<Option<Vec<u8>>, Self::Error>;
async fn put(&self, blob_name: &str, contents: Vec<u8>) -> Result<Vec<u8>, Self::Error>;

fn can_put(&self) -> bool;
}

/// A [`BlobStorageProvider`] for local disk
Expand All @@ -32,6 +34,10 @@ impl BlobStorageProvider for LocalDisk {
std::fs::write(blob_name, &contents)?;
Ok(contents)
}

fn can_put(&self) -> bool {
true
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -59,6 +65,15 @@ pub enum CacheAction {
ReadFetch,
}

impl CacheAction {
pub fn from_date(date: &time::Date) -> Self {
let now = time::OffsetDateTime::now_utc().date();
(date >= &now)
.then_some(Self::ReadFetch)
.unwrap_or(Self::ReadFetchWrite)
}
}

/// Tries to retrive `blob_name` from `provider`. If it does not exist,
/// it calls `fetch` and writes the result into `provider`.
/// Returns the data in `blob_name` from `provider`.
Expand Down Expand Up @@ -86,7 +101,7 @@ where
} else {
log::info!("{blob_name} - cache miss");
let contents = fetch.await.map_err(|e| Error::Fetch(e))?;
if action == CacheAction::ReadFetch {
if action == CacheAction::ReadFetch || !provider.can_put() {
log::info!("{blob_name} - cache do not write");
return Ok(contents);
};
Expand Down
99 changes: 39 additions & 60 deletions src/fs_azure.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,19 @@
use azure_core::{error::HttpError, StatusCode};
use azure_storage::prelude::*;
pub use azure_storage_blobs::prelude::ContainerClient;
pub use azure_storage_blobs::prelude::ContainerClient as _ContainerClient;
use azure_storage_blobs::{container::operations::BlobItem, prelude::ClientBuilder};
use futures::stream::StreamExt;

use crate::fs::BlobStorageProvider;

#[derive(Debug)]
pub enum Error {
/// Unspecified error interacting with Azure blob storage
Error(azure_core::Error),
/// Unauthorized error when interacting with Azure blob storage
Unauthorized(azure_core::Error),
}

impl std::error::Error for Error {}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Unauthorized(e) | Error::Error(e) => e.fmt(f),
}
}
pub struct ContainerClient {
client: _ContainerClient,
can_put: bool,
}

/// Lists all blobs in container
pub async fn list(client: ContainerClient) -> Result<Vec<String>, azure_storage::Error> {
let mut result = vec![];
let mut blobs = client.list_blobs().into_stream();
let mut blobs = client.client.list_blobs().into_stream();
while let Some(response) = blobs.next().await {
result.extend(
response?
Expand All @@ -45,7 +31,7 @@ pub async fn list(client: ContainerClient) -> Result<Vec<String>, azure_storage:

/// Returns whether the blob exists in container
async fn exists(client: &ContainerClient, blob_name: &str) -> Result<bool, azure_storage::Error> {
client.blob_client(blob_name).exists().await
client.client.blob_client(blob_name).exists().await
}

/// Initialize a [`ContainerClient`] using SAS token
Expand All @@ -56,31 +42,32 @@ pub fn initialize_sas(
) -> azure_core::Result<ContainerClient> {
StorageCredentials::sas_token(token)
.map(|credentials| ClientBuilder::new(account, credentials).container_client(container))
.map(|client| ContainerClient {
client,
can_put: true,
})
}

/// Initialize an anonymous [`ContainerClient`]
pub fn initialize_anonymous(account: &str, container: &str) -> ContainerClient {
ClientBuilder::new(account, StorageCredentials::anonymous()).container_client(container)
}
let client =
ClientBuilder::new(account, StorageCredentials::anonymous()).container_client(container);

fn get_code(e: &azure_core::Error) -> Option<StatusCode> {
let a = e.get_ref()?;
let a = a.downcast_ref::<HttpError>()?;
Some(a.status())
ContainerClient {
client,
can_put: false,
}
}

#[async_trait::async_trait]
impl BlobStorageProvider for ContainerClient {
type Error = Error;
type Error = azure_core::Error;

#[must_use]
async fn maybe_get(&self, blob_name: &str) -> Result<Option<Vec<u8>>, Self::Error> {
if exists(self, blob_name).await.map_err(Error::Error)? {
if exists(self, blob_name).await? {
Ok(Some(
self.blob_client(blob_name)
.get_content()
.await
.map_err(Error::Error)?,
self.client.blob_client(blob_name).get_content().await?,
))
} else {
Ok(None)
Expand All @@ -89,46 +76,38 @@ impl BlobStorageProvider for ContainerClient {

#[must_use]
async fn put(&self, blob_name: &str, contents: Vec<u8>) -> Result<Vec<u8>, Self::Error> {
self.blob_client(blob_name)
self.client
.blob_client(blob_name)
.put_block_blob(contents.clone())
.content_type("text/plain")
.await
.map_err(|e| {
if get_code(&e) == Some(StatusCode::Unauthorized) {
Error::Unauthorized(e)
} else {
Error::Error(e)
}
})?;
.await?;
Ok(contents)
}

fn can_put(&self) -> bool {
self.can_put
}
}

pub(crate) async fn cached_call<
F: Fn() -> (crate::fs::CacheAction, G),
G: futures::Future<Output = Result<Vec<u8>, std::io::Error>>,
>(
/// * read from azure
/// * if not found and can't write to azure => read disk and write to disk
/// * if not found and can write to azure => fetch and write
pub(crate) async fn cached_call<F: futures::Future<Output = Result<Vec<u8>, std::io::Error>>>(
blob_name: &str,
fetch: F,
action: crate::fs::CacheAction,
client: Option<&ContainerClient>,
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let Some(client) = client else {
let (action, fetch) = fetch();
return Ok(crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await?);
};

let (action, f) = fetch();
let result = crate::fs::cached(&blob_name, f, client, action).await;
if matches!(
result,
Err(crate::fs::Error::Backend(
crate::fs_azure::Error::Unauthorized(_)
))
) {
log::warn!("{blob_name} - Unauthorized - fall back to local disk");
let (action, fetch) = fetch();
Ok(crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await?)
} else {
Ok(result?)
}
let Some(data) = client.maybe_get(blob_name).await? else {
return Ok(if !client.can_put() {
crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await?
} else {
crate::fs::cached(&blob_name, fetch, client, action).await?
});
};
Ok(data)
}
22 changes: 10 additions & 12 deletions src/icao_to_trace.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;

Expand Down Expand Up @@ -42,8 +43,8 @@ fn adsbx_sid() -> String {
format!("{time}_{random_chars}")
}

static DIRECTORY: &'static str = "database";
static DATABASE: &'static str = "globe_history";
pub(crate) static DIRECTORY: &'static str = "database";
pub(crate) static DATABASE: &'static str = "globe_history";

fn cache_file_path(icao: &str, date: &time::Date) -> String {
format!("{DIRECTORY}/{DATABASE}/{date}/trace_full_{icao}.json")
Expand Down Expand Up @@ -125,16 +126,11 @@ async fn globe_history_cached(
date: &time::Date,
client: Option<&fs_azure::ContainerClient>,
) -> Result<Vec<u8>, Box<dyn Error>> {
let now = time::OffsetDateTime::now_utc().date();
let blob_name = cache_file_path(icao, date);
let fetch = || {
let action = (date >= &now)
.then_some(fs::CacheAction::ReadFetch)
.unwrap_or(fs::CacheAction::ReadFetchWrite);
(action, globe_history(&icao, date))
};
let action = fs::CacheAction::from_date(&date);
let fetch = globe_history(&icao, date);

Ok(fs_azure::cached_call(&blob_name, fetch, client).await?)
Ok(fs_azure::cached_call(&blob_name, fetch, action, client).await?)
}

/// Returns the trace of the icao number of a given day from https://adsbexchange.com.
Expand Down Expand Up @@ -234,7 +230,7 @@ pub async fn aircraft_positions(
to: Date,
aircraft: &Aircraft,
client: Option<&super::fs_azure::ContainerClient>,
) -> Result<Vec<(Date, Vec<Position>)>, Box<dyn Error>> {
) -> Result<HashMap<Date, Vec<Position>>, Box<dyn Error>> {
let dates = super::DateIter {
from,
to,
Expand All @@ -253,6 +249,8 @@ pub async fn aircraft_positions(
futures::stream::iter(tasks)
// limit concurrent tasks
.buffered(5)
.try_collect::<Vec<_>>()
.try_collect()
.await
}

pub use crate::trace_month::cached_aircraft_positions;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod icao_to_trace;
mod legs;
mod model;
mod owners;
mod trace_month;

use std::sync::Arc;

Expand Down
5 changes: 0 additions & 5 deletions src/owners.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,5 @@
"claim": "claims to \"contribute to a sustainable and low-carbon future\".",
"source": "https://eurowindenergy.com/about/sustainability",
"date": "2023-10-20"
},
"North Flying A/S": {
"claim": "",
"source": "https://eurowindenergy.com/about/sustainability",
"date": "2023-10-20"
}
}

0 comments on commit d17321b

Please sign in to comment.