Skip to content

technqvi/SMartDataHub-DBToBigQuery

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

90 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

SMartDataHub-DBToBigQuery

Ingest data from PostgreSQL database that stores data for SMartApp to BigQuery by capturing every transaction in the database periodically to maintain data integrity and consistency between Source(PostgreSQL) and Target(BigQuery). Please review how it works as figure and description below. Primarily, there are 2 processes the as described below process

1. Identifying and Capturing All Changed Data in a Django Database

  • Collect every changed data by object id of the content type (project,inventory,pm plan and pm item) from models_logging_change table (Django Models Logging) to identify the actual action type such as added, deleted and chanaged status.
  • Take list of all objectIds collected from previous step including action type to pull data from transactional table (project,inventory,pm plan and pm item) .

2.1 Merge Solution To BigQuery (Sol1)

  • Import data as dataframe to temporary table
  • Run stored procedure to merge data from temporary table to target table based on action type condition as below.
    • Added : run sql insert statement.
    • Changed : run sql update statement.
    • Deleted: run sql update statement to set only is_deleted culumn to be True as opposed to using sql delete statement.
  • Truncate temporary table.

2.2 Bigquery Storage-API Solution to BigQuery(Sol2)

  • Create .proto file aligned with your data schema and compile file to .py to comply with Protocol Buffer.
  • Read csv file as dataframe and transform dataframe to get data ready for ingesting to Bigquery.
    • Convert datetime to timestamp as Microseconds.
    • Add _CHANGE_TYPE(action type) such as UPSERT,DELETE.
    • Fill null value with default value.
  • Write JSON file from DataFrame such as Upsert file and Delete file.
  • Write JSON data as buffer protocol stream to BigQuery via BigQuery Storage-API.

Web Administration

admin

Program Structure

  • LoadPGToBQ.py : Collect changed data for importing as dataframe to temp table on BigQuery.
  • LoadPGToBQ_BQStorageAPI.py : convert dataframe to JSON file aligned with Protocal Buffer BigQuery Storage-API.
  • smart_bq_storage_api : writte json file to BigQuery Storage-API.
  • CheckDataCons_DB_BQ.py : Run test data consistency between PostgreSQL and BigQuery.
  • etl_web_admin : Web administration by Django to store table view configuration metadata and log ETL Transaction.
  • table_schema_script : Script to create table , constraint and view on Database and BigQuery including sample sql query.
  • unittest : Unit test for LoadPGToBQ.py and LoadPGToBQ_BQStorageAPI.py.
  • google_ai_py3.10.yml : Create python anaconda envrionment for building this project.

References Solution

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published