Skip to content

Commit

Permalink
Fixed computation of timestamp (#55)
Browse files Browse the repository at this point in the history
This was an error that impacted most calculations.

The root issue was that the timestamp of a trace was being computed
since midnight, when it should be computed from a specific timestamp
available in the trace's json.

Added regression test.

Also improved the ergonomics of the library.
  • Loading branch information
jorgecardleitao authored Mar 21, 2024
1 parent 255d245 commit d94a9e1
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 184 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = { version = "1.0", default_features = false }

# perform requests to the internet
reqwest = {version="*", features = ["gzip"]}
reqwest = {version="0.11", features = ["gzip"]}
reqwest-retry = "*"
reqwest-middleware = "*"

Expand Down Expand Up @@ -47,7 +47,6 @@ simple_logger = { version = "*", optional = true }

[dev-dependencies]
tokio = {version="1.0", features=["rt", "macros", "rt-multi-thread"]}
itertools = "*"

[features]
build-binary = [
Expand Down
2 changes: 1 addition & 1 deletion src/aircraft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub async fn etl_aircrafts(client: Option<&fs_s3::ContainerClient>) -> Result<()

pub async fn read(
date: Date,
client: Option<&fs_s3::ContainerClient>,
client: Option<&dyn BlobStorageProvider>,
) -> Result<Aircrafts, String> {
let path = file_path(date);
let data = match client {
Expand Down
24 changes: 12 additions & 12 deletions src/bin/etl_legs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct Metadata {
}

async fn write_json(
client: &impl BlobStorageProvider,
client: &dyn BlobStorageProvider,
d: impl Serialize,
key: &str,
) -> Result<(), Box<dyn Error>> {
Expand All @@ -53,11 +53,11 @@ async fn write_json(
Ok(client.put(key, bytes).await?)
}

async fn write_csv<B: BlobStorageProvider>(
async fn write_csv(
items: impl Iterator<Item = impl Serialize>,
key: &str,
client: &B,
) -> Result<(), B::Error> {
client: &dyn BlobStorageProvider,
) -> Result<(), std::io::Error> {
let mut wtr = csv::Writer::from_writer(vec![]);
for leg in items {
wtr.serialize(leg).unwrap()
Expand Down Expand Up @@ -106,7 +106,7 @@ async fn write(
icao_number: &Arc<str>,
month: time::Date,
legs: impl Iterator<Item = impl Serialize>,
client: &impl BlobStorageProvider,
client: &dyn BlobStorageProvider,
) -> Result<(), Box<dyn Error>> {
let key = format!(
"{DATABASE}icao_number={icao_number}/month={}/data.csv",
Expand All @@ -121,7 +121,7 @@ async fn write(
async fn read<D: DeserializeOwned>(
icao_number: &Arc<str>,
month: time::Date,
client: &impl BlobStorageProvider,
client: &dyn BlobStorageProvider,
) -> Result<Vec<D>, Box<dyn Error>> {
let key = format!(
"{DATABASE}icao_number={icao_number}/month={}/data.csv",
Expand Down Expand Up @@ -170,7 +170,7 @@ async fn etl_task(
private_jets: &HashMap<Arc<str>, aircraft::Aircraft>,
models: &AircraftModels,
airports: &[Airport],
client: Option<&flights::fs_s3::ContainerClient>,
client: Option<&dyn BlobStorageProvider>,
) -> Result<(), Box<dyn Error>> {
// extract
let positions = flights::month_positions(month, &icao_number, client).await?;
Expand All @@ -190,7 +190,7 @@ async fn aggregate(
private_jets: Vec<aircraft::Aircraft>,
models: &AircraftModels,
airports: &[Airport],
client: &flights::fs_s3::ContainerClient,
client: &dyn BlobStorageProvider,
) -> Result<(), Box<dyn Error>> {
let private_jets = private_jets
.into_iter()
Expand Down Expand Up @@ -301,7 +301,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
todo.sort_unstable_by_key(|(icao, date)| (date, icao));
log::info!("todo : {}", todo.len());

let client = Some(&client);
let client = Some(&client as &dyn BlobStorageProvider);
let relevant_jets = &relevant_jets;
let models = &models;
let airports = &airports;
Expand All @@ -319,9 +319,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
});

let _ = futures::stream::iter(tasks)
.buffered(20)
.try_collect::<Vec<_>>()
.await?;
.buffered(50)
.collect::<Vec<_>>()
.await;

aggregate(private_jets, &models, &airports, client.unwrap()).await
}
3 changes: 2 additions & 1 deletion src/bin/period.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
(Backend::Remote, _, _) => Some(flights::fs_s3::anonymous_client().await),
};
let client = client.as_ref().map(|x| x as &dyn BlobStorageProvider);

// load datasets to memory
let owners = load_owners()?;
let aircraft_owners = load_aircraft_owners()?;
let aircrafts = load_aircrafts(client.as_ref()).await?;
let aircrafts = aircraft::read(date!(2023 - 11 - 06), client).await?;

let from = cli.from;
let to = cli.to.unwrap_or(time::OffsetDateTime::now_utc().date());
Expand Down
9 changes: 6 additions & 3 deletions src/bin/single_day.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::error::Error;

use clap::Parser;
use simple_logger::SimpleLogger;
use time::macros::date;
use tinytemplate::TinyTemplate;

use crate::aircraft::Aircrafts;
use flights::*;

static TEMPLATE: &str = include_str!(concat!(
Expand Down Expand Up @@ -82,7 +84,7 @@ async fn flight_date(
owners: &Owners,
aircraft_owners: &AircraftOwners,
aircrafts: &Aircrafts,
client: Option<&fs_s3::ContainerClient>,
client: Option<&dyn BlobStorageProvider>,
) -> Result<Vec<Event>, Box<dyn Error>> {
let models = load_private_jet_models()?;
let airports = airports_cached().await?;
Expand Down Expand Up @@ -190,10 +192,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
(Backend::Remote, _, _) => Some(flights::fs_s3::anonymous_client().await),
};
let client = client.as_ref().map(|x| x as &dyn BlobStorageProvider);

let owners = load_owners()?;
let aircraft_owners = load_aircraft_owners()?;
let aircrafts = load_aircrafts(client.as_ref()).await?;
let aircrafts = aircraft::read(date!(2023 - 11 - 06), client).await?;

let dane_emissions_kg = Fact {
claim: 5100,
Expand All @@ -207,7 +210,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
&owners,
&aircraft_owners,
&aircrafts,
client.as_ref(),
client,
)
.await?;

Expand Down
126 changes: 70 additions & 56 deletions src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ static ROOT: &'static str = "database/";
/// An object that can be used to get and put blobs.
#[async_trait]
pub trait BlobStorageProvider {
type Error: std::error::Error + Send + Sync + 'static;
async fn maybe_get(&self, blob_name: &str) -> Result<Option<Vec<u8>>, Self::Error>;
async fn put(&self, blob_name: &str, contents: Vec<u8>) -> Result<(), Self::Error>;
async fn list(&self, prefix: &str) -> Result<Vec<String>, Self::Error>;
async fn delete(&self, blob_name: &str) -> Result<(), Self::Error>;
async fn maybe_get(&self, blob_name: &str) -> Result<Option<Vec<u8>>, std::io::Error>;
async fn put(&self, blob_name: &str, contents: Vec<u8>) -> Result<(), std::io::Error>;
async fn list(&self, prefix: &str) -> Result<Vec<String>, std::io::Error>;
async fn delete(&self, blob_name: &str) -> Result<(), std::io::Error>;

fn can_put(&self) -> bool;
}
Expand All @@ -21,10 +20,8 @@ pub struct LocalDisk;

#[async_trait]
impl BlobStorageProvider for LocalDisk {
type Error = std::io::Error;

#[must_use]
async fn maybe_get(&self, blob_name: &str) -> Result<Option<Vec<u8>>, Self::Error> {
async fn maybe_get(&self, blob_name: &str) -> Result<Option<Vec<u8>>, std::io::Error> {
let path = PathBuf::from(ROOT).join(Path::new(blob_name));
if path.try_exists()? {
Ok(Some(std::fs::read(path)?))
Expand All @@ -34,7 +31,7 @@ impl BlobStorageProvider for LocalDisk {
}

#[must_use]
async fn put(&self, blob_name: &str, contents: Vec<u8>) -> Result<(), Self::Error> {
async fn put(&self, blob_name: &str, contents: Vec<u8>) -> Result<(), std::io::Error> {
let path = PathBuf::from(ROOT).join(Path::new(blob_name));
let mut dir = path.clone();
dir.pop();
Expand All @@ -44,12 +41,12 @@ impl BlobStorageProvider for LocalDisk {
}

#[must_use]
async fn list(&self, _prefix: &str) -> Result<Vec<String>, Self::Error> {
async fn list(&self, _prefix: &str) -> Result<Vec<String>, std::io::Error> {
todo!()
}

#[must_use]
async fn delete(&self, _prefix: &str) -> Result<(), Self::Error> {
async fn delete(&self, _prefix: &str) -> Result<(), std::io::Error> {
todo!()
}

Expand All @@ -58,29 +55,11 @@ impl BlobStorageProvider for LocalDisk {
}
}

#[derive(Debug)]
pub enum Error<F: std::error::Error + Send, E: std::error::Error + Send> {
/// An error originating from trying to read from source
Fetch(F),
/// An error originating from trying to read or write data from/to backend
Backend(E),
}

impl<F: std::error::Error + Send, E: std::error::Error + Send> std::error::Error for Error<F, E> {}

impl<F: std::error::Error + Send, E: std::error::Error + Send> std::fmt::Display for Error<F, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Fetch(e) => std::fmt::Display::fmt(&e, f),
Self::Backend(e) => std::fmt::Display::fmt(&e, f),
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CacheAction {
ReadFetchWrite,
ReadFetch,
FetchWrite,
}

impl CacheAction {
Expand All @@ -97,37 +76,72 @@ impl CacheAction {
/// Returns the data in `blob_name` from `provider`.
/// # Implementation
/// This function is idempotent but not pure.
pub async fn cached<E, P, F>(
pub async fn cached<E, F>(
blob_name: &str,
fetch: F,
provider: &P,
provider: &dyn BlobStorageProvider,
action: CacheAction,
) -> Result<Vec<u8>, Error<E, P::Error>>
) -> Result<Vec<u8>, std::io::Error>
where
E: std::error::Error + Send,
E: std::error::Error + Send + Sync + 'static,
F: futures::Future<Output = Result<Vec<u8>, E>>,
P: BlobStorageProvider,
{
log::info!("Fetch {blob_name}");
if let Some(data) = provider
.maybe_get(blob_name)
.await
.map_err(|e| Error::Backend(e))?
{
log::info!("{blob_name} - cache hit");
Ok(data)
} else {
log::info!("{blob_name} - cache miss");
let contents = fetch.await.map_err(|e| Error::Fetch(e))?;
if action == CacheAction::ReadFetch || !provider.can_put() {
log::info!("{blob_name} - cache do not write");
return Ok(contents);
};
provider
.put(blob_name, contents.clone())
.await
.map_err(|e| Error::Backend(e))?;
log::info!("{blob_name} - cache write");
Ok(contents)
match action {
CacheAction::FetchWrite => miss(blob_name, fetch, provider, action).await,
_ => {
log::info!("Fetch {blob_name}");
if let Some(data) = provider.maybe_get(blob_name).await? {
log::info!("{blob_name} - cache hit");
Ok(data)
} else {
miss(blob_name, fetch, provider, action).await
}
}
}
}

/// Writes the result of `fetch` into `provider`.
/// Returns the result of fetch.
/// # Implementation
/// This function is idempotent and pure.
pub async fn miss<E, F>(
blob_name: &str,
fetch: F,
provider: &dyn BlobStorageProvider,
action: CacheAction,
) -> Result<Vec<u8>, std::io::Error>
where
E: std::error::Error + Send + Sync + 'static,
F: futures::Future<Output = Result<Vec<u8>, E>>,
{
log::info!("{blob_name} - cache miss");
let contents = fetch.await.map_err(std::io::Error::other)?;
if action == CacheAction::ReadFetch || !provider.can_put() {
log::info!("{blob_name} - cache do not write");
return Ok(contents);
};
provider.put(blob_name, contents.clone()).await?;
log::info!("{blob_name} - cache write");
Ok(contents)
}

/// * read from remote
/// * if not found and can't write to remote => read disk and write to disk
/// * if not found and can write to remote => fetch and write
pub(crate) async fn cached_call<F: futures::Future<Output = Result<Vec<u8>, std::io::Error>>>(
blob_name: &str,
fetch: F,
client: Option<&dyn BlobStorageProvider>,
action: crate::fs::CacheAction,
) -> Result<Vec<u8>, std::io::Error> {
let client = client.unwrap_or(&crate::fs::LocalDisk);

let Some(data) = client.maybe_get(blob_name).await? else {
if !client.can_put() {
return crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await;
} else {
return crate::fs::cached(&blob_name, fetch, client, action).await;
};
};
Ok(data)
}
Loading

0 comments on commit d94a9e1

Please sign in to comment.