An automated, monitored ETL (Extract, Transform, Load) pipeline for fictional tote manufacturer TerrificTotes.
TerrificTotes's existing commercial and production systems store data in a write-optimized PostgreSQL database, unsuited to querying by analysts. gb-terrifictotes-dcf
spins up a complete ecosystem of AWS cloud services in order to process new data from this database at regular intervals. Data is:
- ingested for storage in JSON files;
- denormalized and transformed into a new, OLAP-friendly schema, saved in parquet format; and finally
- loaded into a data warehouse, ready for querying and integration into BI dashboards.
Change history is maintained from the moment of the pipeline's first operation. All stages are monitored, and basic error reporting triggers email notifications on system failure.
A new pipeline execution runs every 15 minutes. Any rows added to the source database since the time of the last check are grouped by table of origin and ingested into an S3 bucket as JSON packets, named with the time at which the current execution began.
The ingestion bucket, with ingested packets in the sales_order/ directory
Output from the ingestion stage lists which tables do and don't have updates along with the current check time, in a clear, human-readable format ideal for either automatic or manual recovery from logs in case of errors.
Ingestion output
The processing stage fetches the indicated packets, transforming the data and saving it to a second S3 bucket with a new folder structure reflecting the star schema of the destination warehouse. At present, gb-terrifictotes-dcf
delivers a minimum viable product covering a single facts table.
The processing bucket, with processed packets in the fact_sales_order/ directory
Processing output
All operations on the ingestion and processing buckets are write-only. Updates to existing records are processed as new rows, with last_updated
date and time columns establishing chronology. Intelligent handling of destination tables constructed using data from two or more interrelated source tables ensures quality and integrity of data.
Finally, the transformed data is loaded into the data warehouse. It can now be queried, analyzed and visualized using Power BI, QuickSight, Superset, Streamlit, or another tool of your choice.
Dashboard in Apache Superset
Atlas of top sales by country
Top 3 designs for each of the top 5 countries by unit sales
Top ten staff members, ranked by revenue
This project requires:
-
Python (3.9 <= version <= 3.12.4)
-
The git CLI
-
Terraform (developed using version 1.10.2)
-
An AWS account
-
AWS credentials configured locally, including access keys and default region
-
An S3 bucket for remote storage of Terraform state files
-
A PostgreSQL OLTP database organized according to the expected schema, accessible remotely via public IP or URL and receiving frequent ongoing updates
-
A second PostgreSQL database, accessible remotely via public IP or URL, which will be used for the data warehouse
Fork the repository on GitHub.
Clone it to your local system.
git clone https://github.com/YOUR-USERNAME-HERE/gb-terrifictotes-dcf
Change into the directory.
cd gb-terrifictotes-dcf
Install dependencies and set up the development environment.
make requirements && make dev-setup
To create the required tables for your data warehouse, run:
psql -h YOUR_PSQL_IP_OR_URL_HERE -U YOUR_USERNAME_HERE -d YOUR_DATABASE_NAME_HERE -f db/init.sql
Create two AWS Secrets Manager secrets, both in the following format. In one secret, store credentials for the OLTP PSQL database. In the other, store credentials for the data warehouse.
{
"PG_USER": "YOUR_USERNAME_HERE",
"PG_PASSWORD": "YOUR_PASSWORD_HERE",
"PG_HOST":"YOUR_PSQL_IP_OR_URL_HERE",
"PG_DATABASE":"YOUR_DATABASE_NAME_HERE",
"PG_PORT":"5432"
}
In src/ingestion_lambda.py
, update connect_to_db()
with the name of the secret containing the OLTP credentials.
credentials = retrieve_secret(sm_client, "YOUR-OLTP-SECRET-NAME-HERE")
In src/uploading_lambda.py
, similarly update connect_to_db()
with the name of the secret containing the data warehouse credentials.
credentials = retrieve_secret(sm_client, "YOUR-DW-SECRET-NAME-HERE")
Create three GitHub Actions secrets to store the AWS credentials already used in your project configuration, namely:
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_REGION
For now, a final hands-on step is required to populate the dim_date
table.
In src/utils/dim_date_table.py
, update connect_to_dw()
with the name of the secret containing the data warehouse credentials.
credentials = retrieve_secret(sm_client, "YOUR-DW-SECRET-NAME-HERE")
Run the script using the following command from the repo's root directory.
python src/utils/dim_date_table.py
In the next version this will be handled automatically as part of the uploading stage.
In terraform/main.tf
, update backend "s3"
to refer to your S3 remote state bucket and AWS region.
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~>5.0"
}
}
backend "s3" {
bucket = "YOUR-BUCKET-NAME-HERE"
key = "terraform.tfstate"
region = "YOUR-AWS-REGION-HERE"
}
}
In terraform/vars.tf
, first update project-prefix
to a unique prefix for your fork's variables.
variable "project_prefix" {
type = string
default = "YOUR-CHOSEN-PREFIX-"
}
Then update error-alerts-recipient
to give the email address you'd like to receive automatic error notifications.
variable "error-alerts-recipient" {
type = string
default = "YOUR.EMAIL.ADDRESS@HERE.COM"
}
To run full checks, including safety, linting, testing and coverage, run:
make run-checks
If you wish to run individual tests β unlikely to be necessary for the purposes of initial setup and deployment β use the following command:
source ./venv/bin/activate && pytest -vvvrP test/TEST_FILE_NAME_HERE.py
To deploy the full AWS cloud pipeline using Terraform, first change into the terraform
directory.
cd terraform
Initialise terraform, then plan and apply.
terraform init
terraform plan
terraform apply
Navigate to Step Functions in the AWS Console, and click on the newly created state machine. Provided your databases are correctly set up and the IAM user associated with your credentials has all the necessary permissions, you should see a successful execution of the pipeline.
Subsequent pushes to the main
branch of the GitHub repo will trigger a CI/CD pipeline in GitHub Actions, once again linting, checking and testing the code and deploying any changes to AWS using terraform apply
.
gb-terrifictotes-solutions
(π) was developed in November 2024 by @Rmbkh, @dulle90griet, @contiele1, @tombracey, @ali-shep and @Minalpatil3, as their final project on the Northcoders Data Engineering Bootcamp.
gb-terrifictotes-dcf
is a comprehensive refactoring of that project by @dulle90griet. For an overview of current progress, see below.
- π Create S3 backup tool for pipeline migration | βοΈ Dec 3 2024
- π§ Create SQL script to initialize data warehouse | βοΈ Dec 4 2024
- π Fix CI build making unusable layer zip | βοΈ Dec 4 2024
- β Add missing tests on ingestion functions | βοΈ Dec 11 2024
- β»οΈ Refactor and reorganise ingestion Lambda | βοΈ Dec 11 2024
- β Add missing tests on processing functions | βοΈ Dec 14 2024
- β»οΈ Refactor and reorganise processing Lambda | βοΈ Dec 14 2024
- π Write a readme | βοΈ Dec 16 2024
- π§ Add missing tests on uploading functions | π·ββοΈ In progress
- π§ Refactor and reorganise uploading Lambda | π·ββοΈ In progress
- Establish consistency of logging
- Rationalize nomenclature
- Remove all deprecated code and modules
- Implement row deletion handling in change history