From d94a9e1c910d9638d51131107e070f9eb7176321 Mon Sep 17 00:00:00 2001 From: jorgecardleitao <149073281+jorgecardleitao@users.noreply.github.com> Date: Thu, 21 Mar 2024 23:50:24 +0100 Subject: [PATCH] Fixed computation of timestamp (#55) This was an error that impacted most calculations. The root issue was that the timestamp of a trace was being computed since midnight, when it should be computed from a specific timestamp available in the trace's json. Added regression test. Also improved the ergonomics of the library. --- Cargo.toml | 3 +- src/aircraft.rs | 2 +- src/bin/etl_legs.rs | 24 ++++---- src/bin/period.rs | 3 +- src/bin/single_day.rs | 9 ++- src/fs.rs | 126 +++++++++++++++++++++------------------ src/fs_s3.rs | 64 ++++++-------------- src/icao_to_trace.rs | 133 ++++++++++++++++++++++++++---------------- src/trace_month.rs | 22 +++---- tests/it/main.rs | 4 +- 10 files changed, 206 insertions(+), 184 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2b98df0..9a9273b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ serde = { version = "1.0", features = ["derive", "rc"] } serde_json = { version = "1.0", default_features = false } # perform requests to the internet -reqwest = {version="*", features = ["gzip"]} +reqwest = {version="0.11", features = ["gzip"]} reqwest-retry = "*" reqwest-middleware = "*" @@ -47,7 +47,6 @@ simple_logger = { version = "*", optional = true } [dev-dependencies] tokio = {version="1.0", features=["rt", "macros", "rt-multi-thread"]} -itertools = "*" [features] build-binary = [ diff --git a/src/aircraft.rs b/src/aircraft.rs index d8b1185..0614fe0 100644 --- a/src/aircraft.rs +++ b/src/aircraft.rs @@ -153,7 +153,7 @@ pub async fn etl_aircrafts(client: Option<&fs_s3::ContainerClient>) -> Result<() pub async fn read( date: Date, - client: Option<&fs_s3::ContainerClient>, + client: Option<&dyn BlobStorageProvider>, ) -> Result { let path = file_path(date); let data = match client { diff --git a/src/bin/etl_legs.rs b/src/bin/etl_legs.rs index 4f45d2f..b745fda 100644 --- a/src/bin/etl_legs.rs +++ b/src/bin/etl_legs.rs @@ -43,7 +43,7 @@ struct Metadata { } async fn write_json( - client: &impl BlobStorageProvider, + client: &dyn BlobStorageProvider, d: impl Serialize, key: &str, ) -> Result<(), Box> { @@ -53,11 +53,11 @@ async fn write_json( Ok(client.put(key, bytes).await?) } -async fn write_csv( +async fn write_csv( items: impl Iterator, key: &str, - client: &B, -) -> Result<(), B::Error> { + client: &dyn BlobStorageProvider, +) -> Result<(), std::io::Error> { let mut wtr = csv::Writer::from_writer(vec![]); for leg in items { wtr.serialize(leg).unwrap() @@ -106,7 +106,7 @@ async fn write( icao_number: &Arc, month: time::Date, legs: impl Iterator, - client: &impl BlobStorageProvider, + client: &dyn BlobStorageProvider, ) -> Result<(), Box> { let key = format!( "{DATABASE}icao_number={icao_number}/month={}/data.csv", @@ -121,7 +121,7 @@ async fn write( async fn read( icao_number: &Arc, month: time::Date, - client: &impl BlobStorageProvider, + client: &dyn BlobStorageProvider, ) -> Result, Box> { let key = format!( "{DATABASE}icao_number={icao_number}/month={}/data.csv", @@ -170,7 +170,7 @@ async fn etl_task( private_jets: &HashMap, aircraft::Aircraft>, models: &AircraftModels, airports: &[Airport], - client: Option<&flights::fs_s3::ContainerClient>, + client: Option<&dyn BlobStorageProvider>, ) -> Result<(), Box> { // extract let positions = flights::month_positions(month, &icao_number, client).await?; @@ -190,7 +190,7 @@ async fn aggregate( private_jets: Vec, models: &AircraftModels, airports: &[Airport], - client: &flights::fs_s3::ContainerClient, + client: &dyn BlobStorageProvider, ) -> Result<(), Box> { let private_jets = private_jets .into_iter() @@ -301,7 +301,7 @@ async fn main() -> Result<(), Box> { todo.sort_unstable_by_key(|(icao, date)| (date, icao)); log::info!("todo : {}", todo.len()); - let client = Some(&client); + let client = Some(&client as &dyn BlobStorageProvider); let relevant_jets = &relevant_jets; let models = ⊧ let airports = &airports; @@ -319,9 +319,9 @@ async fn main() -> Result<(), Box> { }); let _ = futures::stream::iter(tasks) - .buffered(20) - .try_collect::>() - .await?; + .buffered(50) + .collect::>() + .await; aggregate(private_jets, &models, &airports, client.unwrap()).await } diff --git a/src/bin/period.rs b/src/bin/period.rs index 92786ad..05a5e2a 100644 --- a/src/bin/period.rs +++ b/src/bin/period.rs @@ -93,11 +93,12 @@ async fn main() -> Result<(), Box> { } (Backend::Remote, _, _) => Some(flights::fs_s3::anonymous_client().await), }; + let client = client.as_ref().map(|x| x as &dyn BlobStorageProvider); // load datasets to memory let owners = load_owners()?; let aircraft_owners = load_aircraft_owners()?; - let aircrafts = load_aircrafts(client.as_ref()).await?; + let aircrafts = aircraft::read(date!(2023 - 11 - 06), client).await?; let from = cli.from; let to = cli.to.unwrap_or(time::OffsetDateTime::now_utc().date()); diff --git a/src/bin/single_day.rs b/src/bin/single_day.rs index bc1b4bb..e129a45 100644 --- a/src/bin/single_day.rs +++ b/src/bin/single_day.rs @@ -2,8 +2,10 @@ use std::error::Error; use clap::Parser; use simple_logger::SimpleLogger; +use time::macros::date; use tinytemplate::TinyTemplate; +use crate::aircraft::Aircrafts; use flights::*; static TEMPLATE: &str = include_str!(concat!( @@ -82,7 +84,7 @@ async fn flight_date( owners: &Owners, aircraft_owners: &AircraftOwners, aircrafts: &Aircrafts, - client: Option<&fs_s3::ContainerClient>, + client: Option<&dyn BlobStorageProvider>, ) -> Result, Box> { let models = load_private_jet_models()?; let airports = airports_cached().await?; @@ -190,10 +192,11 @@ async fn main() -> Result<(), Box> { } (Backend::Remote, _, _) => Some(flights::fs_s3::anonymous_client().await), }; + let client = client.as_ref().map(|x| x as &dyn BlobStorageProvider); let owners = load_owners()?; let aircraft_owners = load_aircraft_owners()?; - let aircrafts = load_aircrafts(client.as_ref()).await?; + let aircrafts = aircraft::read(date!(2023 - 11 - 06), client).await?; let dane_emissions_kg = Fact { claim: 5100, @@ -207,7 +210,7 @@ async fn main() -> Result<(), Box> { &owners, &aircraft_owners, &aircrafts, - client.as_ref(), + client, ) .await?; diff --git a/src/fs.rs b/src/fs.rs index 188c60b..1bfe4f2 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -7,11 +7,10 @@ static ROOT: &'static str = "database/"; /// An object that can be used to get and put blobs. #[async_trait] pub trait BlobStorageProvider { - type Error: std::error::Error + Send + Sync + 'static; - async fn maybe_get(&self, blob_name: &str) -> Result>, Self::Error>; - async fn put(&self, blob_name: &str, contents: Vec) -> Result<(), Self::Error>; - async fn list(&self, prefix: &str) -> Result, Self::Error>; - async fn delete(&self, blob_name: &str) -> Result<(), Self::Error>; + async fn maybe_get(&self, blob_name: &str) -> Result>, std::io::Error>; + async fn put(&self, blob_name: &str, contents: Vec) -> Result<(), std::io::Error>; + async fn list(&self, prefix: &str) -> Result, std::io::Error>; + async fn delete(&self, blob_name: &str) -> Result<(), std::io::Error>; fn can_put(&self) -> bool; } @@ -21,10 +20,8 @@ pub struct LocalDisk; #[async_trait] impl BlobStorageProvider for LocalDisk { - type Error = std::io::Error; - #[must_use] - async fn maybe_get(&self, blob_name: &str) -> Result>, Self::Error> { + async fn maybe_get(&self, blob_name: &str) -> Result>, std::io::Error> { let path = PathBuf::from(ROOT).join(Path::new(blob_name)); if path.try_exists()? { Ok(Some(std::fs::read(path)?)) @@ -34,7 +31,7 @@ impl BlobStorageProvider for LocalDisk { } #[must_use] - async fn put(&self, blob_name: &str, contents: Vec) -> Result<(), Self::Error> { + async fn put(&self, blob_name: &str, contents: Vec) -> Result<(), std::io::Error> { let path = PathBuf::from(ROOT).join(Path::new(blob_name)); let mut dir = path.clone(); dir.pop(); @@ -44,12 +41,12 @@ impl BlobStorageProvider for LocalDisk { } #[must_use] - async fn list(&self, _prefix: &str) -> Result, Self::Error> { + async fn list(&self, _prefix: &str) -> Result, std::io::Error> { todo!() } #[must_use] - async fn delete(&self, _prefix: &str) -> Result<(), Self::Error> { + async fn delete(&self, _prefix: &str) -> Result<(), std::io::Error> { todo!() } @@ -58,29 +55,11 @@ impl BlobStorageProvider for LocalDisk { } } -#[derive(Debug)] -pub enum Error { - /// An error originating from trying to read from source - Fetch(F), - /// An error originating from trying to read or write data from/to backend - Backend(E), -} - -impl std::error::Error for Error {} - -impl std::fmt::Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Fetch(e) => std::fmt::Display::fmt(&e, f), - Self::Backend(e) => std::fmt::Display::fmt(&e, f), - } - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum CacheAction { ReadFetchWrite, ReadFetch, + FetchWrite, } impl CacheAction { @@ -97,37 +76,72 @@ impl CacheAction { /// Returns the data in `blob_name` from `provider`. /// # Implementation /// This function is idempotent but not pure. -pub async fn cached( +pub async fn cached( blob_name: &str, fetch: F, - provider: &P, + provider: &dyn BlobStorageProvider, action: CacheAction, -) -> Result, Error> +) -> Result, std::io::Error> where - E: std::error::Error + Send, + E: std::error::Error + Send + Sync + 'static, F: futures::Future, E>>, - P: BlobStorageProvider, { - log::info!("Fetch {blob_name}"); - if let Some(data) = provider - .maybe_get(blob_name) - .await - .map_err(|e| Error::Backend(e))? - { - log::info!("{blob_name} - cache hit"); - Ok(data) - } else { - log::info!("{blob_name} - cache miss"); - let contents = fetch.await.map_err(|e| Error::Fetch(e))?; - if action == CacheAction::ReadFetch || !provider.can_put() { - log::info!("{blob_name} - cache do not write"); - return Ok(contents); - }; - provider - .put(blob_name, contents.clone()) - .await - .map_err(|e| Error::Backend(e))?; - log::info!("{blob_name} - cache write"); - Ok(contents) + match action { + CacheAction::FetchWrite => miss(blob_name, fetch, provider, action).await, + _ => { + log::info!("Fetch {blob_name}"); + if let Some(data) = provider.maybe_get(blob_name).await? { + log::info!("{blob_name} - cache hit"); + Ok(data) + } else { + miss(blob_name, fetch, provider, action).await + } + } } } + +/// Writes the result of `fetch` into `provider`. +/// Returns the result of fetch. +/// # Implementation +/// This function is idempotent and pure. +pub async fn miss( + blob_name: &str, + fetch: F, + provider: &dyn BlobStorageProvider, + action: CacheAction, +) -> Result, std::io::Error> +where + E: std::error::Error + Send + Sync + 'static, + F: futures::Future, E>>, +{ + log::info!("{blob_name} - cache miss"); + let contents = fetch.await.map_err(std::io::Error::other)?; + if action == CacheAction::ReadFetch || !provider.can_put() { + log::info!("{blob_name} - cache do not write"); + return Ok(contents); + }; + provider.put(blob_name, contents.clone()).await?; + log::info!("{blob_name} - cache write"); + Ok(contents) +} + +/// * read from remote +/// * if not found and can't write to remote => read disk and write to disk +/// * if not found and can write to remote => fetch and write +pub(crate) async fn cached_call, std::io::Error>>>( + blob_name: &str, + fetch: F, + client: Option<&dyn BlobStorageProvider>, + action: crate::fs::CacheAction, +) -> Result, std::io::Error> { + let client = client.unwrap_or(&crate::fs::LocalDisk); + + let Some(data) = client.maybe_get(blob_name).await? else { + if !client.can_put() { + return crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action).await; + } else { + return crate::fs::cached(&blob_name, fetch, client, action).await; + }; + }; + Ok(data) +} diff --git a/src/fs_s3.rs b/src/fs_s3.rs index 16997df..d2da1a9 100644 --- a/src/fs_s3.rs +++ b/src/fs_s3.rs @@ -177,29 +177,36 @@ pub async fn anonymous_client() -> ContainerClient { #[async_trait::async_trait] impl BlobStorageProvider for ContainerClient { - type Error = Error; - #[must_use] - async fn maybe_get(&self, blob_name: &str) -> Result>, Self::Error> { - if exists(self, blob_name).await? { - Ok(Some(get(&self, blob_name).await?)) + async fn maybe_get(&self, blob_name: &str) -> Result>, std::io::Error> { + if exists(self, blob_name) + .await + .map_err(std::io::Error::other)? + { + Ok(Some( + get(&self, blob_name).await.map_err(std::io::Error::other)?, + )) } else { Ok(None) } } #[must_use] - async fn put(&self, blob_name: &str, contents: Vec) -> Result<(), Self::Error> { - put(&self, blob_name, contents).await + async fn put(&self, blob_name: &str, contents: Vec) -> Result<(), std::io::Error> { + put(&self, blob_name, contents) + .await + .map_err(std::io::Error::other) } #[must_use] - async fn delete(&self, blob_name: &str) -> Result<(), Self::Error> { - delete(&self, blob_name).await + async fn delete(&self, blob_name: &str) -> Result<(), std::io::Error> { + delete(&self, blob_name) + .await + .map_err(std::io::Error::other) } #[must_use] - async fn list(&self, prefix: &str) -> Result, Self::Error> { + async fn list(&self, prefix: &str) -> Result, std::io::Error> { Ok(self .client .list_objects_v2() @@ -209,7 +216,7 @@ impl BlobStorageProvider for ContainerClient { .send() .try_collect() .await - .map_err(|e| Error::from(e.to_string()))? + .map_err(std::io::Error::other)? .into_iter() .map(|response| { response @@ -226,38 +233,3 @@ impl BlobStorageProvider for ContainerClient { self.can_put } } - -/// * read from remote -/// * if not found and can't write to remote => read disk and write to disk -/// * if not found and can write to remote => fetch and write -pub(crate) async fn cached_call, std::io::Error>>>( - blob_name: &str, - fetch: F, - action: crate::fs::CacheAction, - client: Option<&ContainerClient>, -) -> Result, std::io::Error> { - let Some(client) = client else { - return Ok( - crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action) - .await - .map_err(std::io::Error::other)?, - ); - }; - - let Some(data) = client - .maybe_get(blob_name) - .await - .map_err(std::io::Error::other)? - else { - return Ok(if !client.can_put() { - crate::fs::cached(&blob_name, fetch, &crate::fs::LocalDisk, action) - .await - .map_err(std::io::Error::other)? - } else { - crate::fs::cached(&blob_name, fetch, client, action) - .await - .map_err(std::io::Error::other)? - }); - }; - Ok(data) -} diff --git a/src/icao_to_trace.rs b/src/icao_to_trace.rs index 1fb84d9..9f8623b 100644 --- a/src/icao_to_trace.rs +++ b/src/icao_to_trace.rs @@ -8,7 +8,7 @@ use time::Date; use time::OffsetDateTime; use super::Position; -use crate::{fs, fs_s3}; +use crate::{fs, BlobStorageProvider}; fn last_2(icao: &str) -> &str { let bytes = icao.as_bytes(); @@ -119,13 +119,32 @@ async fn globe_history(icao: &str, date: &time::Date) -> Result, std::io async fn globe_history_cached( icao: &str, date: &time::Date, - client: Option<&fs_s3::ContainerClient>, + client: Option<&dyn BlobStorageProvider>, ) -> Result, std::io::Error> { let blob_name = cache_file_path(icao, date); let action = fs::CacheAction::from_date(&date); let fetch = globe_history(&icao, date); - Ok(fs_s3::cached_call(&blob_name, fetch, action, client).await?) + Ok(fs::cached_call(&blob_name, fetch, client, action).await?) +} + +fn compute_trace(data: &[u8]) -> Result<(f64, Vec), std::io::Error> { + let mut value = serde_json::from_slice::(&data)?; + let Some(obj) = value.as_object_mut() else { + return Ok((0.0, vec![])); + }; + let Some(timestamp) = obj.get("timestamp") else { + return Ok((0.0, vec![])); + }; + let timestamp = timestamp.as_f64().unwrap(); + let Some(obj) = obj.get_mut("trace") else { + return Ok((0.0, vec![])); + }; + let Some(trace) = obj.as_array_mut() else { + return Ok((0.0, vec![])); + }; + + Ok((timestamp, std::mem::take(trace))) } /// Returns the trace of the icao number of a given day from https://adsbexchange.com. @@ -140,24 +159,46 @@ async fn globe_history_cached( /// # Implementation /// Because these are historical values, this function caches them the first time it is used /// by the two arguments -pub async fn trace_cached( +async fn trace_cached( icao: &str, date: &time::Date, - client: Option<&fs_s3::ContainerClient>, -) -> Result, std::io::Error> { - let data = globe_history_cached(icao, date, client).await?; + client: Option<&dyn BlobStorageProvider>, +) -> Result<(f64, Vec), std::io::Error> { + compute_trace(&globe_history_cached(icao, date, client).await?) +} - let mut value = serde_json::from_slice::(&data)?; - let Some(obj) = value.as_object_mut() else { - return Ok(vec![]); - }; - let Some(obj) = obj.get_mut("trace") else { - return Ok(vec![]); - }; - let Some(trace) = obj.as_array_mut() else { - return Ok(vec![]); - }; - Ok(std::mem::take(trace)) +fn compute_positions(start_trace: (f64, Vec)) -> impl Iterator { + use time::ext::NumericalDuration; + + let (start, trace) = start_trace; + let start = OffsetDateTime::from_unix_timestamp(start as i64).unwrap(); + + trace.into_iter().filter_map(move |entry| { + let delta = entry[0].as_f64().unwrap().seconds(); + let datetime = start + delta; + let latitude = entry[1].as_f64().unwrap(); + let longitude = entry[2].as_f64().unwrap(); + entry[3] + .as_str() + .and_then(|x| { + (x == "ground").then_some(Position { + datetime, + latitude, + longitude, + altitude: None, + }) + }) + .or_else(|| { + entry[3].as_f64().and_then(|altitude| { + Some(Position { + datetime, + latitude, + longitude, + altitude: Some(altitude), + }) + }) + }) + }) } /// Returns an iterator of [`Position`] over the trace of `icao` on day `date` according @@ -165,47 +206,18 @@ pub async fn trace_cached( pub async fn positions( icao_number: &str, date: time::Date, - client: Option<&fs_s3::ContainerClient>, + client: Option<&dyn BlobStorageProvider>, ) -> Result, std::io::Error> { - use time::ext::NumericalDuration; trace_cached(icao_number, &date, client) .await - .map(move |trace| { - trace.into_iter().filter_map(move |entry| { - let time_seconds = entry[0].as_f64().unwrap(); - let time = time::Time::MIDNIGHT + time_seconds.seconds(); - let datetime = OffsetDateTime::new_utc(date.clone(), time); - let latitude = entry[1].as_f64().unwrap(); - let longitude = entry[2].as_f64().unwrap(); - entry[3] - .as_str() - .and_then(|x| { - (x == "ground").then_some(Position { - datetime, - latitude, - longitude, - altitude: None, - }) - }) - .or_else(|| { - entry[3].as_f64().and_then(|altitude| { - Some(Position { - datetime, - latitude, - longitude, - altitude: Some(altitude), - }) - }) - }) - }) - }) + .map(compute_positions) } pub(crate) async fn cached_aircraft_positions( from: Date, to: Date, icao_number: &str, - client: Option<&fs_s3::ContainerClient>, + client: Option<&dyn BlobStorageProvider>, ) -> Result, std::io::Error> { let dates = super::DateIter { from, @@ -230,3 +242,24 @@ pub(crate) async fn cached_aircraft_positions( } pub use crate::trace_month::*; + +#[cfg(test)] +mod test { + use time::macros::date; + + use super::*; + + /// Compare against https://globe.adsbexchange.com/?icao=45860d&showTrace=2019-01-04&leg=1 + #[tokio::test] + async fn work() { + let data = globe_history("45860d", &date!(2019 - 01 - 04)) + .await + .unwrap(); + let first = compute_positions(compute_trace(&data).unwrap()) + .next() + .unwrap(); + assert_eq!(first.datetime.hour(), 6); + assert_eq!(first.datetime.minute(), 54); + assert_eq!(first.grounded(), true); + } +} diff --git a/src/trace_month.rs b/src/trace_month.rs index 67bd333..b230b64 100644 --- a/src/trace_month.rs +++ b/src/trace_month.rs @@ -4,7 +4,7 @@ use futures::{StreamExt, TryStreamExt}; use time::Date; use super::Position; -use crate::{cached_aircraft_positions, fs, fs_s3, BlobStorageProvider}; +use crate::{cached_aircraft_positions, fs, BlobStorageProvider}; static DATABASE: &'static str = "position/"; @@ -66,7 +66,7 @@ fn get_month(current: &time::Date) -> (time::Date, time::Date) { pub async fn month_positions( month: time::Date, icao_number: &str, - client: Option<&fs_s3::ContainerClient>, + client: Option<&dyn BlobStorageProvider>, ) -> Result, std::io::Error> { log::info!("month_positions({month},{icao_number})"); assert_eq!(month.day(), 1); @@ -80,12 +80,12 @@ pub async fn month_positions( let mut positions = cached_aircraft_positions(from, to, icao_number, client).await?; positions.sort_unstable_by_key(|p| p.datetime()); let mut bytes: Vec = Vec::new(); - serde_json::to_writer(&mut bytes, &positions).map_err(std::io::Error::other)?; + serde_json::to_writer(&mut bytes, &positions)?; Ok(bytes) }; - let r = fs_s3::cached_call(&blob_name, fetch, action, client).await?; - serde_json::from_slice(&r).map_err(std::io::Error::other) + let r = fs::cached_call(&blob_name, fetch, client, action).await?; + Ok(serde_json::from_slice(&r)?) } /// Returns a list of positions within two dates ordered by timestamp @@ -98,7 +98,7 @@ pub async fn aircraft_positions( from: Date, to: Date, icao_number: &str, - client: Option<&fs_s3::ContainerClient>, + client: Option<&dyn BlobStorageProvider>, ) -> Result, Box> { let dates = super::DateIter { from, @@ -119,7 +119,7 @@ pub async fn aircraft_positions( let positions = futures::stream::iter(tasks) // limit concurrent tasks - .buffered(1) + .buffered(200) .try_collect::>() .await?; @@ -133,10 +133,10 @@ pub async fn aircraft_positions( } /// Returns the set of (icao number, month) that exist in the container prefixed by `dataset` -pub async fn existing( +pub async fn existing( prefix: &str, - client: &B, -) -> Result, time::Date)>, B::Error> { + client: &dyn BlobStorageProvider, +) -> Result, time::Date)>, std::io::Error> { Ok(client .list(prefix) .await? @@ -148,7 +148,7 @@ pub async fn existing( /// Returns the set of (icao, month) that exists in the db pub async fn existing_months_positions( client: &B, -) -> Result, time::Date)>, B::Error> { +) -> Result, time::Date)>, std::io::Error> { existing(DATABASE, client).await } diff --git a/tests/it/main.rs b/tests/it/main.rs index 841d0fc..459ff1b 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -1,6 +1,6 @@ use std::error::Error; -use flights::Leg; +use flights::{BlobStorageProvider, Leg}; use time::{ macros::{date, datetime}, Date, @@ -71,7 +71,7 @@ async fn legs( from: Date, to: Date, icao_number: &str, - client: Option<&flights::fs_s3::ContainerClient>, + client: Option<&dyn BlobStorageProvider>, ) -> Result, Box> { let positions = flights::aircraft_positions(from, to, icao_number, client).await?; Ok(flights::legs(positions.into_iter()))