Skip to content

Commit

Permalink
Improved
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecardleitao committed Aug 9, 2024
1 parent 3092c85 commit cc9efaf
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 85 deletions.
29 changes: 20 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,26 @@
[![Coverage](https://codecov.io/gh/jorgecardleitao/private-jets/graph/badge.svg?token=DT7C376OKH)](https://codecov.io/gh/jorgecardleitao/private-jets)

This repository contains a CLI application to analyze flights of private jets.
See [`methodology.md`](./methodology.md) for details of what it does and where data is available for consumption.

It is supported by an S3 Blob storage container for caching data, thereby
reducing its impact to [https://adsbexchange.com/](https://adsbexchange.com/).

![Design](./design.drawio.png)
## How to use the data

## Risk and impact
The data is available in an https/s3 endpoint. See [analysis.sql](./analysis.sql) for an example of how to use it (in [duckdb SQL](https://duckdb.org/docs/sql/introduction.html)).

```bash
pip install dudckdb

python3 run_sql.py analysis.sql
```

See [`methodology.md`](./methodology.md) for details of the full methodology and where data is available for consumption at different levels
of aggregations.

## Contributing

### Risk and impact

This code performs API calls to [https://adsbexchange.com/](https://adsbexchange.com/),
a production website of a company.
Expand All @@ -29,7 +41,7 @@ All cached data is available on S3 blob storage at endpoint
and has anonymous and public read permissions. See [`methodology.md`](./methodology.md) for details.

## Getting starter
### How to use

1. Install Rust
2. run `cargo run --features="build-binary" --release --bin etl_aircrafts`
Expand All @@ -43,22 +55,21 @@ In general:
* Use the default parameters when creating ad-hoc stories
* Use `--access-key` when improving the database with new data.

As of today, the flag `--access-key` is only available when the code is executed
from `main`, as writing to the blob storage must be done through a controlled code base
that preserves data integrity.
As of today, the flag `--access-key` is only available to the owner,
as writing to the blob storage must be done through a controlled code base that preserves data integrity.

### Examples:

```bash
# Create new snapshot of database of all aircrafts
cargo run --features="build-binary" --release --bin etl_aircrafts -- --access-key=DO00AUDGL32QLFKV8CEP --secret-access-key=$(cat secrets.txt)

# Build database of positions `[2020, 2023]`
# Build database of positions `[2019, 2024]`
cargo run --features="build-binary" --release --bin etl_positions -- --access-key=DO00AUDGL32QLFKV8CEP --secret-access-key=$(cat secrets.txt)
# they are available at
# https://private-jets.fra1.digitaloceanspaces.com/position/icao_number={icao}/month={year}-{month}/data.json

# Build database of legs `[2020, 2023]` (over existing positions computed by `etl_positions`)
# Build database of legs `[2019, 2024]` (over existing positions computed by `etl_positions`)
cargo run --features="build-binary" --release --bin etl_legs -- --access-key=DO00AUDGL32QLFKV8CEP --secret-access-key=$(cat secrets.txt)
# they are available at
# https://private-jets.fra1.digitaloceanspaces.com/leg/v1/data/icao_number={icao}/month={year}-{month}/data.csv
Expand Down
24 changes: 15 additions & 9 deletions analysis.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
WITH "tmp" AS (
SET s3_endpoint='fra1.digitaloceanspaces.com';

SELECT
SUM(CASE WHEN "happened" = 'FlownWithLadd' THEN 1 ELSE 0 END) AS "with_ladd_count",
SUM(CASE WHEN "happened" = 'FlownWithoutLadd' THEN 1 ELSE 0 END) AS "without_ladd_count"
FROM
read_csv_auto("ladd_10.csv", header = true)
)
SELECT
"with_ladd_count" / ("without_ladd_count" + "with_ladd_count")
FROM "tmp"
"year",
COUNT(DISTINCT("tail_number")) AS "number of aircrafts",
COUNT(*) AS "number of legs",
SUM("co2_emissions")/1000/1000/1000 AS "Mt CO2 emitted",
SUM("distance")/1000/1000/1000 AS "M km flown",
SUM("great_circle_distance")/1000/1000/1000 AS "M km traveled",
SUM("duration")/24 AS "days flown",
SUM("hours_above_30000")/24 AS "days above 30k feet",
SUM("hours_above_40000")/24 AS "days above 40k feet",
FROM read_csv_auto("s3://private-jets/leg/v2/all/year=*/data.csv", header = true)
WHERE "start" < DATE '2024-06-01'
GROUP BY "year"
ORDER BY "year"
Binary file modified design.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
35 changes: 27 additions & 8 deletions methodology.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@ This document describes the general methodology used by this solution.
* ICAO number: an identifier (e.g. `4596B2`) set on the transponder
* ADS-B event: a signal emitted by a ADS-B transponder of an aircraft containing the ICAO number, time and position (including altitude)
* ADS-B receiver: a device that receives / listens to ADS-B signals
* [adsbexchange](https://globe.adsbexchange.com): a commercial platform used to aggregate real time ADS-B signals listened by receivers worldwide
* [adsbexchange](https://globe.adsbexchange.com): a commercial platform that aggregates real time ADS-B signals listened by receivers worldwide
* leg: a continuous sequence of ADS-B positions in time where the aircraft is flying

## Assumptions

* At any given point in time, an aircraft is assigned a unique a tail number (e.g. `OY-EUR`).
* At any given point in time, an aircraft is assigned a unique ICAO number (e.g. `4596B2`).
* At any given point in time, there is a one-to-one relationship between the assigned ICAO number and the assigned assigned tail number (`OY-EUR <-> 4596B2`).
* At any given point in time, [adsbexchange](https://globe.adsbexchange.com) contains the current set of aircrafts with:
At any given point in time:

* an aircraft is assigned a unique a tail number (e.g. `OY-EUR`).
* an aircraft is assigned a unique ICAO number (e.g. `4596B2`).
* there is a one-to-one relationship between the assigned ICAO number and the assigned assigned tail number (`OY-EUR <-> 4596B2`).
* [adsbexchange](https://globe.adsbexchange.com) contains the current set of aircrafts with:
* tail number
* ICAO number
* Model name
* At any given point in time, a flying aircraft has its ADS-B transponder turned on.
* a flying aircraft has its ADS-B transponder turned on.
* [adsbexchange](https://globe.adsbexchange.com) maintains and operates the historical datasets of ADS-B signals received in the platform.

## Design
Expand All @@ -34,6 +36,8 @@ This document describes the general methodology used by this solution.
* be publicly available via https and s3 protocols (S3 endpoint `fra1.digitaloceanspaces.com`, bucket `private-jets`)
* be serialized in computer and human-readable formats (`csv` or `json`)

![Design](./design.drawio.png)

## Methodology

The methodology used to support this solution is the follows:
Expand Down Expand Up @@ -198,6 +202,12 @@ columns:
icao_number:
type: string
description: The ICAO number (e.g. 4596b2)
tail_number:
type: string
description: The tail number associated to this ICAO number
aircraft_model:
type: string
description: The aircraft model associated to this ICAO number
start:
type: string
description: The datetime of the start of the leg in rfc3339 in UTC
Expand All @@ -222,15 +232,24 @@ columns:
end_altitude:
type: f64
description: The altitude at the end of the leg in feet
length:
duration:
type: f64
description: The duration of the leg in hours
distance:
type: f64
description: The total actual flown distance in km (always bigger than the great-circle distance between start and end)
description: The total actual flown distance in km
great_circle_distance:
type: f64
description: The great circle distance between the start and end of the leg in km
hours_above_30000:
type: f64
description: number of hours flown above 30.000 feet
hours_above_40000:
type: f64
description: number of hours flown above 40.000 feet
co2_emissions:
type: f64
description: CO2 emissions in kg
constraints:
- type: uniqueness
columns: [icao_number, start]
Expand Down
80 changes: 28 additions & 52 deletions src/bin/etl_legs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,13 @@ async fn write(
Ok(())
}

async fn read<D: DeserializeOwned>(
async fn read_u8(
icao: &Arc<str>,
month: time::Date,
client: &dyn BlobStorageProvider,
) -> Result<Vec<D>, std::io::Error> {
flights::io::get_csv(&pk_to_blob_name(icao, month), client).await
) -> Result<Option<Vec<u8>>, std::io::Error> {
log::info!("Read icao={icao} month={month}");
client.maybe_get(&pk_to_blob_name(icao, month)).await
}

fn pk_to_blob_name(icao: &str, month: time::Date) -> String {
Expand Down Expand Up @@ -212,17 +213,10 @@ async fn aggregate(
required: HashMap<(Arc<str>, time::Date), Aircraft>,
client: &dyn BlobStorageProvider,
) -> Result<(), Box<dyn Error>> {
let all_completed = list(client).await?;

let completed = all_completed
.into_iter()
.filter(|key| required.contains_key(key))
.collect::<HashSet<_>>();

// group completed by year
let completed_by_year =
completed
.into_iter()
// group by year
let required_by_year =
required
.into_keys()
.fold(HashMap::<i32, HashSet<_>>::new(), |mut acc, v| {
acc.entry(v.1.year())
.and_modify(|entries| {
Expand All @@ -231,31 +225,26 @@ async fn aggregate(
.or_insert(HashSet::from([v]));
acc
});
let required_by_year =
required
.into_iter()
.fold(HashMap::<i32, usize>::new(), |mut acc, (v, _)| {
acc.entry(v.1.year())
.and_modify(|entries| {
*entries += 1;
})
.or_insert(0);
acc
});

// run tasks by year
let mut metadata = HashMap::<i32, Metadata>::new();
for (year, completed) in completed_by_year {
let tasks = completed.iter().map(|(icao_number, date)| async move {
read::<LegOut>(icao_number, *date, client).await
});
for (year, completed) in required_by_year {
let tasks = completed
.iter()
.map(|(icao_number, date)| async move { read_u8(icao_number, *date, client).await });

log::info!("Gettings all legs for year={year}");
let legs = futures::stream::iter(tasks)
.buffered(100)
.buffered(1000)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten() // drop those that do not exist
.map(|content| {
flights::csv::deserialize::<LegOut>(&content)
.collect::<Result<Vec<_>, _>>()
.unwrap()
})
.flatten();

log::info!("Writing all legs for year={year}");
Expand All @@ -265,7 +254,7 @@ async fn aggregate(
metadata.insert(
year,
Metadata {
icao_months_to_process: *required_by_year.get(&year).unwrap(),
icao_months_to_process: completed.len(),
icao_months_processed: completed.len(),
url: format!("https://private-jets.fra1.digitaloceanspaces.com/{key}"),
},
Expand Down Expand Up @@ -318,13 +307,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
let (private_jets, models) = private_jets(client, maybe_country).await?;
let models = &models;

let months = (2019..2024)
let years = 2019..2025;
let now = time::OffsetDateTime::now_utc().date();
let now =
time::Date::from_calendar_date(now.year(), now.month(), 1).expect("day 1 never errors");
let months = years
.cartesian_product(1..=12u8)
.map(|(year, month)| {
time::Date::from_calendar_date(year, time::Month::try_from(month).unwrap(), 1)
.expect("day 1 never errors")
})
.collect::<Vec<_>>();
.filter(|month| month < &now);

let required = private_jets
.into_iter()
Expand All @@ -339,24 +332,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

log::info!("required : {}", required.len());

//log::info!("computing completed tasks...");
//let completed = HashSet::new(); //list(client).await?.into_iter().collect::<HashSet<_>>();
//log::info!("completed: {}", completed.len());

//log::info!("computing ready tasks...");
/*let ready = flights::icao_to_trace::list_months_positions(client)
.await?
.into_iter()
.filter(|key| required.contains_key(key))
.collect::<HashSet<_>>();*/
//let ready = required.iter()
//log::info!("ready : {}", ready.len());

//let mut todo = ready.difference(&completed).collect::<Vec<_>>();
//todo.sort_unstable_by_key(|(icao, date)| (date, icao));
//log::info!("todo : {}", todo.len());

log::info!("executing todos...");
log::info!("executing required...");
let tasks = required
.clone()
.into_iter()
Expand All @@ -377,7 +353,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
})
.collect::<Vec<_>>()
.await;
log::info!("todos completed");
log::info!("execution completed");

log::info!("aggregating...");
aggregate(required, client).await
Expand Down
9 changes: 4 additions & 5 deletions src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ pub(crate) fn load<H: Hash + Eq, D: for<'de> Deserialize<'de>, PK: Fn(D) -> (H,
) -> Result<HashMap<H, D>, Box<dyn Error>> {
let data = std::fs::read(path)?;

let data = deserialize(&data).map(map).collect();
Ok(data)
Ok(deserialize(&data).map(|x| x.unwrap()).map(map).collect())
}

pub fn serialize(items: impl Iterator<Item = impl serde::Serialize>) -> Vec<u8> {
Expand All @@ -25,12 +24,12 @@ pub fn serialize(items: impl Iterator<Item = impl serde::Serialize>) -> Vec<u8>

pub fn deserialize<'a, D: serde::de::DeserializeOwned + 'a>(
data: &'a [u8],
) -> impl Iterator<Item = D> + 'a {
) -> impl Iterator<Item = Result<D, std::io::Error>> + 'a {
let rdr = csv::ReaderBuilder::new()
.delimiter(b',')
.from_reader(std::io::Cursor::new(data));
rdr.into_deserialize().into_iter().map(|r| {
let record: D = r.unwrap();
record
let record: D = r?;
Ok(record)
})
}
2 changes: 1 addition & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ pub async fn get_csv<D: DeserializeOwned>(
) -> Result<Vec<D>, std::io::Error> {
let content = client.maybe_get(key).await?.expect("File to be present");

Ok(super::csv::deserialize::<D>(&content).collect::<Vec<_>>())
super::csv::deserialize::<D>(&content).collect()
}
2 changes: 1 addition & 1 deletion tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async fn gets_db_month() -> Result<(), Box<dyn Error>> {
}

#[tokio::test]
async fn number_of_jets() -> Result<(), Box<dyn Error>> {
async fn private_jets_in_month() -> Result<(), Box<dyn Error>> {
let client = flights::fs_s3::anonymous_client().await;

let aircraft = flights::private_jets_in_month(2022..2024, None, &client).await?;
Expand Down

0 comments on commit cc9efaf

Please sign in to comment.