Ingest data from PostgreSQL database that stores data for SMartApp to BigQuery by capturing every transaction in the database periodically to maintain data integrity and consistency between Source(PostgreSQL) and Target(BigQuery). Please review how it works as figure and description below. Primarily, there are 2 processes the as described below
- Collect every changed data by object id of the content type (project,inventory,pm plan and pm item) from models_logging_change table (Django Models Logging) to identify the actual action type such as added, deleted and chanaged status.
- Take list of all objectIds collected from previous step including action type to pull data from transactional table (project,inventory,pm plan and pm item) .
- Import data as dataframe to temporary table
- Run stored procedure to merge data from temporary table to target table based on action type condition as below.
- Added : run sql insert statement.
- Changed : run sql update statement.
- Deleted: run sql update statement to set only is_deleted culumn to be True as opposed to using sql delete statement.
- Truncate temporary table.
- Create .proto file aligned with your data schema and compile file to .py to comply with Protocol Buffer.
- Read csv file as dataframe and transform dataframe to get data ready for ingesting to Bigquery.
- Convert datetime to timestamp as Microseconds.
- Add _CHANGE_TYPE(action type) such as UPSERT,DELETE.
- Fill null value with default value.
- Write JSON file from DataFrame such as Upsert file and Delete file.
- Write JSON data as buffer protocol stream to BigQuery via BigQuery Storage-API.
- LoadPGToBQ.py : Collect changed data for importing as dataframe to temp table on BigQuery.
- LoadPGToBQ_BQStorageAPI.py : convert dataframe to JSON file aligned with Protocal Buffer BigQuery Storage-API.
- smart_bq_storage_api : writte json file to BigQuery Storage-API.
- CheckDataCons_DB_BQ.py : Run test data consistency between PostgreSQL and BigQuery.
- etl_web_admin : Web administration by Django to store table view configuration metadata and log ETL Transaction.
- table_schema_script : Script to create table , constraint and view on Database and BigQuery including sample sql query.
- unittest : Unit test for LoadPGToBQ.py and LoadPGToBQ_BQStorageAPI.py.
- google_ai_py3.10.yml : Create python anaconda envrionment for building this project.
- Merge Table Solutions
- MERGE statement on Bigquery
- Merging data for Change Data Capture with GCP BigQuery (Best Summarizaton)
- Bigquery Storage-API Solutions