Skip to content

Commit

Permalink
Improved
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Nov 21, 2023
1 parent 4838839 commit eca0485
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 43 deletions.
42 changes: 37 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
# Danish private flights
This repository contains a CLI application that generates a text based summary of
private jet's flight information targetted to a Danish audience.
private jet's flight information targeted to a Danish audience.

It is supported by an Azure Blob storage container for caching data, thereby
reducing its impact to [https://adsbexchange.com/](https://adsbexchange.com/).

## Risk and impact

This code performs API calls to [https://adsbexchange.com/](https://adsbexchange.com/),
a production website of a company.

**Use critical thinking** when using this code and how it impacts them.

We strongly recommend that if you plan to perform large scale analysis (e.g. in time or aircrafts),
that you reach out via an issue _before_, so that we can work together
to cache all hits to [https://adsbexchange.com/](https://adsbexchange.com/)
on an horizontally scaled remote storage and therefore remove its impact to adsbexchange.com
of future calls.

All data cached is available on Azure blob storage:
* account: `privatejets`
* container: `data`

and has anonymous and public read permissions.

## How to use

Expand All @@ -9,11 +31,21 @@ private jet's flight information targetted to a Danish audience.
3. open `OY-GFS_2023-10-20_0.md`

Step 2. has an optional argument, `--azure-sas-token`, specifying an Azure storage container SAS
token for account `privatejets`, container `data`.
When used, caching of data is done on the remote container, as opposed to local disk.
for account `privatejets`, container `data`.
When used, cache is written to the remote container, as opposed to disk.

Finally, setting `--backend disk` ignores the Azure's remote storage altogether and
only uses disk for caching (resulting in higher cache misses and thus more
interactions with ADS-B exchange).

In general:
* Use the default parameters when creating ad-hoc stories
* Use `--azure-sas-token` when improving the database with new data.
* Use `--backend disk` when testing the caching system

Furthermore, setting `--backend azure` without `azure-sas-token` provides read-access
to the remote container.
As of today, the flag `--azure-sas-token` is only available when the code is executed
from `main`, as writing to the blob storage must be done through a controlled code base
that preserves data integrity.

## Assumptions

Expand Down
6 changes: 3 additions & 3 deletions examples/period.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn render(context: &Context) -> Result<(), Box<dyn Error>> {

#[derive(clap::ValueEnum, Debug, Clone)]
enum Backend {
LocalDisk,
Disk,
Azure,
}

Expand All @@ -49,7 +49,7 @@ struct Cli {
/// The Azure token
#[arg(short, long)]
azure_sas_token: Option<String>,
#[arg(short, long, value_enum, default_value_t=Backend::LocalDisk)]
#[arg(short, long, value_enum, default_value_t=Backend::Azure)]
backend: Backend,
}

Expand All @@ -64,7 +64,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

// optionally initialize Azure client
let client = match (cli.backend, cli.azure_sas_token) {
(Backend::LocalDisk, None) => None,
(Backend::Disk, None) => None,
(Backend::Azure, None) => Some(flights::fs_azure::initialize_anonymous(
"privatejets",
"data",
Expand Down
15 changes: 10 additions & 5 deletions examples/single_day.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::error::Error;

use clap::Parser;
use simple_logger::SimpleLogger;
use tinytemplate::TinyTemplate;

use flights::*;

use clap::Parser;

static TEMPLATE_NAME: &'static str = "t";

#[derive(serde::Serialize, serde::Deserialize, Debug)]
Expand All @@ -31,7 +31,7 @@ pub struct Context {

#[derive(clap::ValueEnum, Debug, Clone)]
enum Backend {
LocalDisk,
Disk,
Azure,
}

Expand All @@ -48,7 +48,7 @@ struct Cli {
/// The Azure token
#[arg(short, long)]
azure_sas_token: Option<String>,
#[arg(short, long, value_enum, default_value_t=Backend::LocalDisk)]
#[arg(short, long, value_enum, default_value_t=Backend::Azure)]
backend: Backend,
}

Expand Down Expand Up @@ -159,11 +159,16 @@ fn process_leg(

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
SimpleLogger::new()
.with_level(log::LevelFilter::Info)
.init()
.unwrap();

let cli = Cli::parse();

// optionally initialize Azure client
let client = match (cli.backend, cli.azure_sas_token) {
(Backend::LocalDisk, None) => None,
(Backend::Disk, None) => None,
(Backend::Azure, None) => Some(flights::fs_azure::initialize_anonymous(
"privatejets",
"data",
Expand Down
13 changes: 8 additions & 5 deletions src/aircraft_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn url(prefix: &str) -> String {
format!("https://globe.adsbexchange.com/{DATABASE}/{prefix}.js")
}

async fn aircrafts(prefix: &str) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
async fn aircrafts(prefix: &str) -> Result<Vec<u8>, reqwest::Error> {
Ok(reqwest::get(url(prefix))
.await?
.bytes()
Expand All @@ -52,10 +52,13 @@ async fn aircrafts_prefixed(
let fetch = aircrafts(&prefix);

let data = match client {
Some(client) => crate::fs::cached(&blob_name, fetch, client).await,
None => crate::fs::cached(&blob_name, fetch, &fs::LocalDisk).await,
}
.map_err(|e| e.to_string())?;
Some(client) => crate::fs::cached(&blob_name, fetch, client)
.await
.map_err(|e| e.to_string())?,
None => crate::fs::cached(&blob_name, fetch, &fs::LocalDisk)
.await
.map_err(|e| e.to_string())?,
};

Ok((
prefix,
Expand Down
2 changes: 1 addition & 1 deletion src/airports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct Airport {
pub type_: String,
}

async fn airports() -> Result<Vec<u8>, Box<dyn std::error::Error>> {
async fn airports() -> Result<Vec<u8>, reqwest::Error> {
let url = "https://raw.githubusercontent.com/davidmegginson/ourairports-data/main/airports.csv";
Ok(reqwest::get(url).await?.bytes().await.map(|x| x.into())?)
}
Expand Down
44 changes: 36 additions & 8 deletions src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
/// An object that can be used to get and put blobs.
#[async_trait]
pub trait BlobStorageProvider {
type Error: std::error::Error;
type Error: std::error::Error + Send;
async fn maybe_get(&self, blob_name: &str) -> Result<Option<Vec<u8>>, Self::Error>;
async fn put(&self, blob_name: &str, contents: Vec<u8>) -> Result<Vec<u8>, Self::Error>;
}
Expand Down Expand Up @@ -34,28 +34,56 @@ impl BlobStorageProvider for LocalDisk {
}
}

#[derive(Debug)]
pub enum Error<F: std::error::Error + Send, E: std::error::Error + Send> {
/// An error originating from trying to read from source
Fetch(F),
/// An error originating from trying to read or write data from/to backend
Backend(E),
}

impl<F: std::error::Error + Send, E: std::error::Error + Send> std::error::Error for Error<F, E> {}

impl<F: std::error::Error + Send, E: std::error::Error + Send> std::fmt::Display for Error<F, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Fetch(e) => std::fmt::Display::fmt(&e, f),
Self::Backend(e) => std::fmt::Display::fmt(&e, f),
}
}
}

/// Tries to retrive `blob_name` from `provider`. If it does not exist,
/// it calls `fetch` and writes the result into `provider`.
/// Returns the data in `blob_name` from `provider`.
/// # Implementation
/// This function is idempotent but not pure.
pub async fn cached<'a, P, F>(
pub async fn cached<E, P, F>(
blob_name: &str,
fetch: F,
provider: &P,
) -> Result<Vec<u8>, Box<dyn std::error::Error + 'a>>
) -> Result<Vec<u8>, Error<E, P::Error>>
where
F: futures::Future<Output = Result<Vec<u8>, Box<dyn std::error::Error>>>,
E: std::error::Error + Send,
F: futures::Future<Output = Result<Vec<u8>, E>>,
P: BlobStorageProvider,
P::Error: 'a,
{
log::info!("Fetch {blob_name}");
if let Some(data) = provider.maybe_get(blob_name).await? {
if let Some(data) = provider
.maybe_get(blob_name)
.await
.map_err(|e| Error::Backend(e))?
{
log::info!("{blob_name} - cache hit");
Ok(data)
} else {
log::info!("{blob_name} - cache miss");
let contents = fetch.await?;
Ok(provider.put(blob_name, contents).await?)
let contents = fetch.await.map_err(|e| Error::Fetch(e))?;
let data = provider
.put(blob_name, contents)
.await
.map_err(|e| Error::Backend(e))?;
log::info!("{blob_name} - cache write");
Ok(data)
}
}
45 changes: 41 additions & 4 deletions src/fs_azure.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
use azure_core::{error::HttpError, StatusCode};
use azure_storage::prelude::*;
pub use azure_storage_blobs::prelude::ContainerClient;
use azure_storage_blobs::{container::operations::BlobItem, prelude::ClientBuilder};
use futures::stream::StreamExt;

use crate::fs::BlobStorageProvider;

#[derive(Debug)]
pub enum Error {
/// Unspecified error interacting with Azure blob storage
Error(azure_core::Error),
/// Unauthorized error when interacting with Azure blob storage
Unauthorized(azure_core::Error),
}

impl std::error::Error for Error {}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Unauthorized(e) | Error::Error(e) => e.fmt(f),
}
}
}

/// Lists all blobs in container
pub async fn list(client: ContainerClient) -> Result<Vec<String>, azure_storage::Error> {
let mut result = vec![];
Expand Down Expand Up @@ -44,16 +63,27 @@ pub fn initialize_anonymous(account: &str, container: &str) -> ContainerClient {
ClientBuilder::new(account, StorageCredentials::anonymous()).container_client(container)
}

fn get_code(e: &azure_core::Error) -> Option<StatusCode> {
let a = e.get_ref()?;
let a = a.downcast_ref::<HttpError>()?;
Some(a.status())
}

pub struct AzureContainer<'a>(pub &'a ContainerClient);

#[async_trait::async_trait]
impl BlobStorageProvider for ContainerClient {
type Error = azure_core::Error;
type Error = Error;

#[must_use]
async fn maybe_get(&self, blob_name: &str) -> Result<Option<Vec<u8>>, Self::Error> {
if exists(self, blob_name).await? {
Ok(Some(self.blob_client(blob_name).get_content().await?))
if exists(self, blob_name).await.map_err(Error::Error)? {
Ok(Some(
self.blob_client(blob_name)
.get_content()
.await
.map_err(Error::Error)?,
))
} else {
Ok(None)
}
Expand All @@ -64,7 +94,14 @@ impl BlobStorageProvider for ContainerClient {
self.blob_client(blob_name)
.put_block_blob(contents.clone())
.content_type("text/plain")
.await?;
.await
.map_err(|e| {
if get_code(&e) == Some(StatusCode::Unauthorized) {
Error::Unauthorized(e)
} else {
Error::Error(e)
}
})?;
Ok(contents)
}
}
Loading

0 comments on commit eca0485

Please sign in to comment.