Skip to content

Commit

Permalink
Improved resilience to network
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecardleitao committed Feb 1, 2024
1 parent 6f52aa7 commit ccde0e3
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 21 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/target
/Cargo.lock
database/
venv/
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ clap = { version = "4.4.6", features = ["derive"] }
tokio = {version="1.0", features=["rt", "macros", "rt-multi-thread"]}
simple_logger = "*"
num-format = "*"
itertools = "*"
90 changes: 90 additions & 0 deletions examples/export_legs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::error::Error;

use clap::Parser;
use futures::StreamExt;
use itertools::Itertools;
use simple_logger::SimpleLogger;

use flights::{load_aircrafts, load_private_jet_types};

#[derive(clap::ValueEnum, Debug, Clone)]
enum Backend {
Disk,
Azure,
}

const ABOUT: &'static str = r#"Builds the database of all private jet positions from 2023"#;

#[derive(Parser, Debug)]
#[command(author, version, about = ABOUT)]
struct Cli {
/// The Azure token
#[arg(short, long)]
azure_sas_token: Option<String>,
#[arg(short, long, value_enum, default_value_t=Backend::Azure)]
backend: Backend,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
SimpleLogger::new()
.with_level(log::LevelFilter::Warn)
.init()
.unwrap();

let cli = Cli::parse();

// optionally initialize Azure client
let client = match (cli.backend, cli.azure_sas_token.clone()) {
(Backend::Disk, None) => None,
(Backend::Azure, None) => Some(flights::fs_azure::initialize_anonymous(
"privatejets",
"data",
)),
(_, Some(token)) => Some(flights::fs_azure::initialize_sas(
&token,
"privatejets",
"data",
)?),
};

// load datasets to memory
let aircrafts = load_aircrafts(client.as_ref()).await?;
let types = load_private_jet_types()?;

let private_jets = aircrafts
.values()
// its primary use is to be a private jet
.filter(|a| types.contains_key(&a.model))
.collect::<Vec<_>>();

let months = (2023..2024)
.cartesian_product(1..=12u8)
.map(|(year, month)| {
time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1)
.expect("day 1 never errors")
})
.collect::<Vec<_>>();

let required = private_jets
.into_iter()
.cartesian_product(months.into_iter())
.collect::<Vec<_>>();

let tasks = required.into_iter().map(|(aircraft, month)| {
flights::month_positions(month, &aircraft.icao_number, client.as_ref())
});

futures::stream::iter(tasks)
// limit concurrent tasks
.buffered(10)
// continue if error
.map(|r| {
if let Err(e) = r {
log::error!("{e}");
}
})
.collect::<Vec<_>>()
.await;
Ok(())
}
24 changes: 18 additions & 6 deletions src/fs_azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub use azure_storage_blobs::prelude::ContainerClient as _ContainerClient;
use crate::fs::BlobStorageProvider;

pub struct ContainerClient {
client: _ContainerClient,
pub client: _ContainerClient,
can_put: bool,
}

Expand Down Expand Up @@ -77,16 +77,28 @@ pub(crate) async fn cached_call<F: futures::Future<Output = Result<Vec<u8>, std:
fetch: F,
action: crate::fs::CacheAction,
client: Option<&ContainerClient>,
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
) -> Result<Vec<u8>, std::io::Error> {
let Some(client) = client else {
return Ok(crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await?);
return Ok(
crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action)
.await
.map_err(std::io::Error::other)?,
);
};

let Some(data) = client.maybe_get(blob_name).await? else {
let Some(data) = client
.maybe_get(blob_name)
.await
.map_err(std::io::Error::other)?
else {
return Ok(if !client.can_put() {
crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await?
crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action)
.await
.map_err(std::io::Error::other)?
} else {
crate::fs::cached(&blob_name, fetch, client, action).await?
crate::fs::cached(&blob_name, fetch, client, action)
.await
.map_err(std::io::Error::other)?
});
};
Ok(data)
Expand Down
17 changes: 8 additions & 9 deletions src/icao_to_trace.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;

use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -84,8 +83,8 @@ async fn globe_history(icao: &str, date: &time::Date) -> Result<Vec<u8>, std::io
headers.insert("Sec-Fetch-Site", "same-origin".parse().unwrap());
headers.insert("TE", "trailers".parse().unwrap());

// Retry up to 3 times with increasing intervals between attempts.
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
// Retry up to 5 times with increasing intervals between attempts.
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(5);
let client = ClientBuilder::new(reqwest::Client::new())
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
Expand Down Expand Up @@ -125,7 +124,7 @@ async fn globe_history_cached(
icao: &str,
date: &time::Date,
client: Option<&fs_azure::ContainerClient>,
) -> Result<Vec<u8>, Box<dyn Error>> {
) -> Result<Vec<u8>, std::io::Error> {
let blob_name = cache_file_path(icao, date);
let action = fs::CacheAction::from_date(&date);
let fetch = globe_history(&icao, date);
Expand All @@ -149,7 +148,7 @@ pub async fn trace_cached(
icao: &str,
date: &time::Date,
client: Option<&fs_azure::ContainerClient>,
) -> Result<Vec<serde_json::Value>, Box<dyn std::error::Error>> {
) -> Result<Vec<serde_json::Value>, std::io::Error> {
let data = globe_history_cached(icao, date, client).await?;

let mut value = serde_json::from_slice::<serde_json::Value>(&data)?;
Expand All @@ -171,7 +170,7 @@ pub async fn positions(
icao_number: &str,
date: time::Date,
client: Option<&fs_azure::ContainerClient>,
) -> Result<impl Iterator<Item = Position>, Box<dyn Error>> {
) -> Result<impl Iterator<Item = Position>, std::io::Error> {
use time::ext::NumericalDuration;
let icao: Arc<str> = icao_number.to_string().into();
trace_cached(icao_number, &date, client)
Expand Down Expand Up @@ -214,15 +213,15 @@ pub(crate) async fn cached_aircraft_positions(
to: Date,
icao_number: &str,
client: Option<&super::fs_azure::ContainerClient>,
) -> Result<HashMap<Date, Vec<Position>>, Box<dyn Error>> {
) -> Result<HashMap<Date, Vec<Position>>, std::io::Error> {
let dates = super::DateIter {
from,
to,
increment: time::Duration::days(1),
};

let tasks = dates.map(|date| async move {
Result::<_, Box<dyn Error>>::Ok((
Result::<_, std::io::Error>::Ok((
date.clone(),
positions(icao_number, date, client)
.await?
Expand All @@ -237,4 +236,4 @@ pub(crate) async fn cached_aircraft_positions(
.await
}

pub use crate::trace_month::aircraft_positions;
pub use crate::trace_month::*;
10 changes: 4 additions & 6 deletions src/trace_month.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ fn get_month(current: &time::Date) -> (time::Date, time::Date) {
(first_of_month, first_of_next_month)
}

async fn month_positions(
month: &time::Date,
pub async fn month_positions(
month: time::Date,
icao_number: &str,
client: Option<&super::fs_azure::ContainerClient>,
) -> Result<HashMap<Date, Vec<Position>>, Box<dyn Error>> {
Expand All @@ -49,9 +49,7 @@ async fn month_positions(

// returns positions in the month, cached
let fetch = async {
let positions = cached_aircraft_positions(from, to, icao_number, client)
.await
.unwrap();
let positions = cached_aircraft_positions(from, to, icao_number, client).await?;

let positions = positions
.into_iter()
Expand Down Expand Up @@ -94,7 +92,7 @@ pub async fn aircraft_positions(

let tasks = months
.into_iter()
.map(|month| async move { month_positions(&month, icao_number, client).await });
.map(|month| async move { month_positions(month, icao_number, client).await });

let positions = futures::stream::iter(tasks)
// limit concurrent tasks
Expand Down

0 comments on commit ccde0e3

Please sign in to comment.