Skip to content

An automated, monitored ETL pipeline using AWS and Terraform

Notifications You must be signed in to change notification settings

dulle90griet/gb-terrifictotes-dcf

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Data Warehouse Pipeline for TerrificTotes

CI/CD workflow status coverage Python versions

An automated, monitored ETL (Extract, Transform, Load) pipeline for fictional tote manufacturer TerrificTotes.

🌐 Overview

TerrificTotes's existing commercial and production systems store data in a write-optimized PostgreSQL database, unsuited to querying by analysts. gb-terrifictotes-dcf spins up a complete ecosystem of AWS cloud services in order to process new data from this database at regular intervals. Data is:

  • ingested for storage in JSON files;
  • denormalized and transformed into a new, OLAP-friendly schema, saved in parquet format; and finally
  • loaded into a data warehouse, ready for querying and integration into BI dashboards.

Change history is maintained from the moment of the pipeline's first operation. All stages are monitored, and basic error reporting triggers email notifications on system failure.

Terraform AWS S3 AWS Lambda AWS Secrets Manager AWS EventBridge AWS Step FunctionS AWS CloudWatch AWS Simple Notification Service AWS RDS

πŸ•ΉοΈ Demo

A new pipeline execution runs every 15 minutes. Any rows added to the source database since the time of the last check are grouped by table of origin and ingested into an S3 bucket as JSON packets, named with the time at which the current execution began.

Ingestion bucket Ingested packets in the sales_order directory

The ingestion bucket, with ingested packets in the sales_order/ directory

Output from the ingestion stage lists which tables do and don't have updates along with the current check time, in a clear, human-readable format ideal for either automatic or manual recovery from logs in case of errors.

Ingestion output

Ingestion output

The processing stage fetches the indicated packets, transforming the data and saving it to a second S3 bucket with a new folder structure reflecting the star schema of the destination warehouse. At present, gb-terrifictotes-dcf delivers a minimum viable product covering a single facts table.

Processing bucket Processed packets in the fact_sales_order directory

The processing bucket, with processed packets in the fact_sales_order/ directory

Processing output

Processing output

All operations on the ingestion and processing buckets are write-only. Updates to existing records are processed as new rows, with last_updated date and time columns establishing chronology. Intelligent handling of destination tables constructed using data from two or more interrelated source tables ensures quality and integrity of data.

Finally, the transformed data is loaded into the data warehouse. It can now be queried, analyzed and visualized using Power BI, QuickSight, Superset, Streamlit, or another tool of your choice.

Dashboard in Apache Superset

Dashboard in Apache Superset

Atlas of top sales by country

Atlas of top sales by country

Top 3 designs for each of the top 5 countries by unit sales

Top 3 designs for each of the top 5 countries by unit sales

Top ten staff members, ranked by revenue

Top ten staff members, ranked by revenue

πŸ“œ Prerequisites

This project requires:

  1. Python (3.9 <= version <= 3.12.4)

  2. The git CLI

  3. Terraform (developed using version 1.10.2)

  4. An AWS account

  5. AWS credentials configured locally, including access keys and default region

  6. An S3 bucket for remote storage of Terraform state files

  7. A PostgreSQL OLTP database organized according to the expected schema, accessible remotely via public IP or URL and receiving frequent ongoing updates

  8. A second PostgreSQL database, accessible remotely via public IP or URL, which will be used for the data warehouse

βš™οΈ Setup

πŸ—οΈ Project Setup

Fork the repository on GitHub.

Clone it to your local system.

git clone https://github.com/YOUR-USERNAME-HERE/gb-terrifictotes-dcf

Change into the directory.

cd gb-terrifictotes-dcf

Install dependencies and set up the development environment.

make requirements && make dev-setup

πŸ—ƒοΈ Database Initialization

To create the required tables for your data warehouse, run:

psql -h YOUR_PSQL_IP_OR_URL_HERE -U YOUR_USERNAME_HERE -d YOUR_DATABASE_NAME_HERE -f db/init.sql

πŸ” Secure Credentials Setup

Create two AWS Secrets Manager secrets, both in the following format. In one secret, store credentials for the OLTP PSQL database. In the other, store credentials for the data warehouse.

{
  "PG_USER": "YOUR_USERNAME_HERE",
  "PG_PASSWORD": "YOUR_PASSWORD_HERE",
  "PG_HOST":"YOUR_PSQL_IP_OR_URL_HERE",
  "PG_DATABASE":"YOUR_DATABASE_NAME_HERE",
  "PG_PORT":"5432"
}

In src/ingestion_lambda.py, update connect_to_db() with the name of the secret containing the OLTP credentials.

credentials = retrieve_secret(sm_client, "YOUR-OLTP-SECRET-NAME-HERE")

In src/uploading_lambda.py, similarly update connect_to_db() with the name of the secret containing the data warehouse credentials.

credentials = retrieve_secret(sm_client, "YOUR-DW-SECRET-NAME-HERE")

Create three GitHub Actions secrets to store the AWS credentials already used in your project configuration, namely:

  1. AWS_ACCESS_KEY_ID
  2. AWS_SECRET_ACCESS_KEY
  3. AWS_REGION

🚧 Further Database Initialization

For now, a final hands-on step is required to populate the dim_date table.

In src/utils/dim_date_table.py, update connect_to_dw() with the name of the secret containing the data warehouse credentials.

credentials = retrieve_secret(sm_client, "YOUR-DW-SECRET-NAME-HERE")

Run the script using the following command from the repo's root directory.

python src/utils/dim_date_table.py

In the next version this will be handled automatically as part of the uploading stage.

πŸŒ‹ Terraform Setup

In terraform/main.tf, update backend "s3" to refer to your S3 remote state bucket and AWS region.

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~>5.0"
    }
  }

  backend "s3" {
    bucket = "YOUR-BUCKET-NAME-HERE"
    key    = "terraform.tfstate"
    region = "YOUR-AWS-REGION-HERE"
  }
}

In terraform/vars.tf, first update project-prefix to a unique prefix for your fork's variables.

variable "project_prefix" {
  type    = string
  default = "YOUR-CHOSEN-PREFIX-" 
}

Then update error-alerts-recipient to give the email address you'd like to receive automatic error notifications.

variable "error-alerts-recipient" {
  type    = string
  default = "YOUR.EMAIL.ADDRESS@HERE.COM" 
}

πŸš€ Testing and Deployment

To run full checks, including safety, linting, testing and coverage, run:

make run-checks

If you wish to run individual tests – unlikely to be necessary for the purposes of initial setup and deployment – use the following command:

source ./venv/bin/activate && pytest -vvvrP test/TEST_FILE_NAME_HERE.py

To deploy the full AWS cloud pipeline using Terraform, first change into the terraform directory.

cd terraform

Initialise terraform, then plan and apply.

terraform init
terraform plan
terraform apply

Navigate to Step Functions in the AWS Console, and click on the newly created state machine. Provided your databases are correctly set up and the IAM user associated with your credentials has all the necessary permissions, you should see a successful execution of the pipeline.

Subsequent pushes to the main branch of the GitHub repo will trigger a CI/CD pipeline in GitHub Actions, once again linting, checking and testing the code and deploying any changes to AWS using terraform apply.

πŸ«› Team GreenBeans

gb-terrifictotes-solutions (πŸ”’) was developed in November 2024 by @Rmbkh, @dulle90griet, @contiele1, @tombracey, @ali-shep and @Minalpatil3, as their final project on the Northcoders Data Engineering Bootcamp.

gb-terrifictotes-dcf is a comprehensive refactoring of that project by @dulle90griet. For an overview of current progress, see below.

πŸ›£οΈ Refactor Roadmap

  • πŸš› Create S3 backup tool for pipeline migration | βœ”οΈ Dec 3 2024
  • πŸ”§ Create SQL script to initialize data warehouse | βœ”οΈ Dec 4 2024
  • πŸ’š Fix CI build making unusable layer zip | βœ”οΈ Dec 4 2024
  • βœ… Add missing tests on ingestion functions | βœ”οΈ Dec 11 2024
  • ♻️ Refactor and reorganise ingestion Lambda | βœ”οΈ Dec 11 2024
  • βœ… Add missing tests on processing functions | βœ”οΈ Dec 14 2024
  • ♻️ Refactor and reorganise processing Lambda | βœ”οΈ Dec 14 2024
  • πŸ“ Write a readme | βœ”οΈ Dec 16 2024
  • 🚧 Add missing tests on uploading functions | πŸ‘·β€β™‚οΈ In progress
  • 🚧 Refactor and reorganise uploading Lambda | πŸ‘·β€β™‚οΈ In progress
  • Establish consistency of logging
  • Rationalize nomenclature
  • Remove all deprecated code and modules
  • Implement row deletion handling in change history

Acknowledgements

  • PSQL querying: pg8000
  • Currency code conversions: iso4217
  • Data manipulation and parquet formatting: pandas

About

An automated, monitored ETL pipeline using AWS and Terraform

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published