Skip to content

Commit

Permalink
Minor improvement to example (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecardleitao authored Feb 28, 2024
1 parent ffc4c7f commit ca31b97
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 87 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ cargo run --example country -- --from=2024-01-13 --to=2024-01-21 --country=portu

# Story about German private jets that flew in 2023, where secret is on a file
cargo run --example country -- --from=2023-01-01 --to=2024-01-01 --country=germany --access-key=DO00AUDGL32QLFKV8CEP --secret-access-key=$(cat secrets.txt)

# Build database of positions `[2020, 2023]`
cargo run --release --example etl_positions -- --access-key=DO00AUDGL32QLFKV8CEP --secret-access-key=$(cat secrets.txt)
# they are available at
# https://private-jets.fra1.digitaloceanspaces.com/position/icao_number={icao}/month={year}-{month}/data.json

# Build database of legs `[2020, 2023]` (over existing positions computed by `etl_positions`)
cargo run --release --example etl_legs -- --access-key=DO00AUDGL32QLFKV8CEP --secret-access-key=$(cat secrets.txt)
# they are available at
# https://private-jets.fra1.digitaloceanspaces.com/leg/v1/data/icao_number={icao}/month={year}-{month}/data.csv
```

## Methodology
Expand Down
181 changes: 101 additions & 80 deletions examples/etl_legs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ use clap::Parser;
use flights::{Aircraft, AircraftModels, BlobStorageProvider, Leg};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use serde::Serialize;
use serde::{de::DeserializeOwned, Serialize};
use simple_logger::SimpleLogger;

static DATABASE_ROOT: &'static str = "leg/v1/";
static DATABASE: &'static str = "leg/v1/data/";

#[derive(serde::Serialize, serde::Deserialize)]
struct LegOut {
icao_number: String,
tail_number: String,
model: String,
#[serde(with = "time::serde::rfc3339")]
Expand Down Expand Up @@ -46,9 +47,7 @@ async fn write_json(
let mut bytes: Vec<u8> = Vec::new();
serde_json::to_writer(&mut bytes, &d).map_err(std::io::Error::other)?;

Ok(client
.put(&format!("{DATABASE_ROOT}{key}.json"), bytes)
.await?)
Ok(client.put(key, bytes).await?)
}

async fn write_csv<B: BlobStorageProvider>(
Expand All @@ -65,17 +64,16 @@ async fn write_csv<B: BlobStorageProvider>(
Ok(())
}

async fn write(
icao_number: &Arc<str>,
month: time::Date,
fn transform<'a>(
icao_number: &'a Arc<str>,
legs: Vec<Leg>,
private_jets: &HashMap<Arc<str>, Aircraft>,
models: &AircraftModels,
client: &impl BlobStorageProvider,
) -> Result<(), Box<dyn Error>> {
let legs = legs.into_iter().map(|leg| {
private_jets: &'a HashMap<Arc<str>, Aircraft>,
models: &'a AircraftModels,
) -> impl Iterator<Item = LegOut> + 'a {
legs.into_iter().map(|leg| {
let aircraft = private_jets.get(icao_number).expect(icao_number);
LegOut {
icao_number: icao_number.to_string(),
tail_number: aircraft.tail_number.to_string(),
model: aircraft.model.to_string(),
start: leg.from().datetime(),
Expand All @@ -96,8 +94,15 @@ async fn write(
leg.duration(),
) as usize,
}
});
})
}

async fn write(
icao_number: &Arc<str>,
month: time::Date,
legs: impl Iterator<Item = impl Serialize>,
client: &impl BlobStorageProvider,
) -> Result<(), Box<dyn Error>> {
let key = format!(
"{DATABASE}icao_number={icao_number}/month={}/data.csv",
flights::month_to_part(&month)
Expand All @@ -108,29 +113,25 @@ async fn write(
Ok(())
}

async fn read(
async fn read<D: DeserializeOwned>(
icao_number: &Arc<str>,
month: time::Date,
client: &impl BlobStorageProvider,
) -> Result<Vec<LegOut>, Box<dyn Error>> {
) -> Result<Vec<D>, Box<dyn Error>> {
let key = format!(
"{DATABASE}icao_number={icao_number}/month={}/data.csv",
flights::month_to_part(&month)
);
let content = client.maybe_get(&key).await?.expect("File to be present");

csv::Reader::from_reader(&content[..])
.deserialize()
.map(|x| {
let record: LegOut = x?;
Ok(record)
})
.deserialize::<D>()
.map(|x| Ok(x?))
.collect()
}

async fn private_jets(
client: Option<&flights::fs_s3::ContainerClient>,
country: Option<&str>,
) -> Result<Vec<Aircraft>, Box<dyn std::error::Error>> {
// load datasets to memory
let aircrafts = flights::load_aircrafts(client).await?;
Expand All @@ -139,11 +140,6 @@ async fn private_jets(
Ok(aircrafts
.into_iter()
// its primary use is to be a private jet
.filter(|(_, a)| {
country
.map(|country| a.country.as_deref() == Some(country))
.unwrap_or(true)
})
.filter_map(|(_, a)| models.contains_key(&a.model).then_some(a))
.collect())
}
Expand All @@ -164,6 +160,73 @@ struct Cli {
country: Option<String>,
}

async fn etl_task(
icao_number: &Arc<str>,
month: time::Date,
private_jets: &HashMap<Arc<str>, Aircraft>,
models: &AircraftModels,
client: Option<&flights::fs_s3::ContainerClient>,
) -> Result<(), Box<dyn Error>> {
// extract
let positions = flights::month_positions(month, &icao_number, client).await?;
// transform
let legs = transform(
&icao_number,
flights::legs(positions.into_iter()),
&private_jets,
&models,
);
// load
write(&icao_number, month, legs, client.unwrap()).await
}

async fn aggregate(
private_jets: Vec<Aircraft>,
required: usize,
client: &impl BlobStorageProvider,
) -> Result<(), Box<dyn Error>> {
let private_jets = private_jets
.into_iter()
.map(|a| a.icao_number)
.collect::<HashSet<_>>();

let completed = flights::existing(DATABASE, client)
.await?
.into_iter()
.filter(|(icao, _)| private_jets.contains(icao))
.collect::<HashSet<_>>();

let tasks = completed
.iter()
.map(|(icao, date)| async move { read::<LegOut>(icao, *date, client).await });

log::info!("Gettings all legs");
let legs = futures::stream::iter(tasks)
.buffered(20)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten();

log::info!("Writing all legs");
let key = format!("{DATABASE_ROOT}all.csv");
write_csv(legs, &key, client).await?;
log::info!("Written {key}");

let key = format!("{DATABASE_ROOT}status.json");
write_json(
client,
Metadata {
icao_months_to_process: required,
icao_months_processed: completed.len(),
},
&key,
)
.await?;
log::info!("status written");
Ok(())
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
SimpleLogger::new()
Expand All @@ -178,9 +241,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
let models = flights::load_private_jet_models()?;

let months = (2020..2024).cartesian_product(1..=12u8).count();
let relevant_jets = private_jets(Some(&client), cli.country.as_deref())
.await?
let private_jets = private_jets(Some(&client)).await?;
let relevant_jets = private_jets
.clone()
.into_iter()
// in the country
.filter(|a| {
cli.country
.as_deref()
.map(|country| a.country.as_deref() == Some(country))
.unwrap_or(true)
})
.map(|a| (a.icao_number.clone(), a))
.collect::<HashMap<_, _>>();
let required = relevant_jets.len() * months;
Expand Down Expand Up @@ -209,63 +280,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
let models = &models;

let tasks = todo.into_iter().map(|(icao_number, month)| async move {
let positions = flights::month_positions(*month, &icao_number, client).await?;
let legs = flights::legs(positions.into_iter());
write(
&icao_number,
*month,
legs,
&relevant_jets,
&models,
client.unwrap(),
)
.await
etl_task(icao_number, *month, &relevant_jets, &models, client).await
});

let _ = futures::stream::iter(tasks)
.buffered(20)
.try_collect::<Vec<_>>()
.await?;

let private_jets = private_jets(client, None)
.await?
.into_iter()
.map(|a| (a.icao_number.clone(), a))
.collect::<HashMap<_, _>>();

let client = client.unwrap();
let completed = flights::existing(DATABASE, client)
.await?
.into_iter()
.filter(|(icao, _)| private_jets.contains_key(icao))
.collect::<HashSet<_>>();

let tasks = completed
.iter()
.map(|(icao, date)| async move { read(icao, *date, client).await });

log::info!("Gettings all legs");
let legs = futures::stream::iter(tasks)
.buffered(20)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten();

let key = format!("{DATABASE_ROOT}all.csv");
write_csv(legs, &key, client).await?;
log::info!("Written {key}");

write_json(
client,
Metadata {
icao_months_to_process: required,
icao_months_processed: completed.len(),
},
"status",
)
.await?;
log::info!("status written");

Ok(())
aggregate(private_jets, required, client.unwrap()).await
}
File renamed without changes.
14 changes: 7 additions & 7 deletions methodology.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ in extracting the database of all aircrafts in https://globe.adsbexchange.com.

Details are available in the source code, [src/aircraft_db.rs](./src/aircraft_db.rs).

### M-2: Identify aircraft types whose primary use is to be a private flying
### M-2: Identify aircraft types whose primary use is to be a private use

This was performed by a human, and consisted in going through different aircraft
manufacturers' websites and identifying the aircrafts that were advertised as used
for private flying.
for private use.

For example, `Dassault Falcon 2000` is advertised as a
private jet on https://www.dassaultfalcon.com/aircraft/overview-of-the-fleet/.
private use on https://www.dassaultfalcon.com/aircraft/overview-of-the-fleet/.

This is stored in [`./src/models.csv`](./src/models.csv).

**NOTE**: not all uses of a model whose primary use is to be a private jet is
private jet. For example, models are sometimes used for emergency services.
for private use. For example, models are sometimes used for emergency services.

### M-3: Identify ICAO number's route in a day

Expand All @@ -55,8 +55,8 @@ Source code is available at [src/icao_to_trace.rs](./src/icao_to_trace.rs).

### M-4: Identify legs of a route

This is performed automatically by the computer program. A leg is defined in this methodology
has a continuous sequence of ADS-B positions in time where the aircraft is flying.
This is performed automatically by the solution. A leg is defined in this methodology
as a continuous sequence of ADS-B positions in time where the aircraft is flying.

The aircraft at a given segment between two ADS-B positions is considered grounded (not flying) when any of:
1. both positions are on the ground
Expand Down Expand Up @@ -132,7 +132,7 @@ This was performed by a human, and consisted in extracting the ownership of the
tail number from website https://www.danishaircraft.dk.

For example `OY-CKK` results in 3 records, whose most recent, `OY-CKK(3)`, is registered
to owned by `Kirkbi Invest A/S`.
to be owned by `Kirkbi Invest A/S`.

This is stored in [`./src/owners.csv`](./src/owners.csv).

Expand Down

0 comments on commit ca31b97

Please sign in to comment.