Skip to content

Commit

Permalink
add deterministic probabilistic deduplication solution
Browse files Browse the repository at this point in the history
  • Loading branch information
tdroberto committed Dec 30, 2024
1 parent 9e6ca39 commit 3a7624b
Show file tree
Hide file tree
Showing 6 changed files with 700 additions and 0 deletions.
24 changes: 24 additions & 0 deletions data-box/deduplication_deterministic_probabilistic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Deterministic and probabilistic deduplication

----
## Overview

This project provides a solution to deterministically and probabilistically deduplicate a given dataset that has no reliable identifier.

----
## Implementation
1. Copy and paste this code into Treasure Workflows or run it with TD toolbelt.
2. Set your TD master key as the workflow secret.
3. Change the database and tables in the config/params.yaml file.

----
## Considerations

This project was developed for a Vietnamese automobile customer. Consider changing the cleanse.sql accordingly to normalize characters and the variables in the scripts to better suit your needs.

The probabilistic matching script (pm.py) uses multiprocessing, consider changing the settings according to your dataset size.

----
## Questions

Please feel free to reach out to apac-se@treasure-data.com with any questions you have about using this code.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
db: rotd_dm_pm
source_tbl: customers
clean_tbl: clean_sap_customer
dm_tbl: dm_dedup
pm_tbl: pm_dedup_eval
api_endpoint: https://api.treasuredata.com
positive_threshold: 80
name_weight: 1
address_weight: 1
partition_cnt: 16
process_cnt: 8
50 changes: 50 additions & 0 deletions data-box/deduplication_deterministic_probabilistic/dm_pm.dig
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
timezone: Asia/Tokyo

_export:
!include : config/params.yaml
td:
engine: presto
database: ${db}

+create_db_tbl_if_not_exists:
td_ddl>:
create_databases: [ "${db}" ]
create_tables: [ "${pm_tbl}" ]
empty_tables: [ "${clean_tbl}", "${dm_tbl}", "${pm_tbl}" ]

+create_db_tbl_if_not_exists:
td_ddl>:
create_databases: [ "${db}" ]
create_tables: [ "${clean_tbl}", "${dm_tbl}", "${pm_tbl}" ]
empty_tables: [ "${clean_tbl}", "${dm_tbl}", "${pm_tbl}" ]

+cleansing:
td>: queries/cleanse.sql
create_table: ${clean_tbl}

# Deterministic Matching at tier 4
+dm:
py>: scripts.dm.main
_env:
TD_API_KEY: ${secret:td.apikey}
TD_API_ENDPOINT: ${api_endpoint}
docker:
image: "digdag/digdag-python:3.10"
resource_level: "4"

# Probabilistic Matching with multi-processing & multiple tasks at tier 4
+pm:
loop>: ${partition_cnt}
_parallel: false
_do:
+run_serial:
py>: scripts.pm.main
_env:
TD_API_KEY: ${secret:td.apikey}
TD_API_ENDPOINT: ${api_endpoint}
PROCESS_CNT: ${process_cnt}
PART: ${i}
PARTS: ${partition_cnt}
docker:
image: "digdag/digdag-python:3.10"
resource_level: "4"
176 changes: 176 additions & 0 deletions data-box/deduplication_deterministic_probabilistic/queries/cleanse.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
SELECT
TRIM(CAST(customer_code AS VARCHAR)) AS cl_cc,
regexp_replace(TRIM(mobile_phone), '\s+', '') AS cl_phone,
regexp_replace(TRIM(identification_number), '\s+', '') AS cl_vin,
LOWER(TRIM(email)) AS cl_email,
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
LOWER(TRIM(customer_name)),
'\s+',
' '
),
'[áàảãạăắằẳẵặâấầẩẫậ]',
'a'
),
'[éèẻẽẹêếềểễệ]',
'e'
),
'[íìỉĩị]',
'i'
),
'[óòỏõọôốồổỗộơớờởỡợ]',
'o'
),
'[úùủũụưứừửữự]',
'u'
),
'[ýỳỷỹỵ]',
'y'
),
'đ',
'd'
) AS cl_customer_name,
CONCAT(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
LOWER(TRIM(city)),
'\s+',
' '
),
'[áàảãạăắằẳẵặâấầẩẫậ]',
'a'
),
'[éèẻẽẹêếềểễệ]',
'e'
),
'[íìỉĩị]',
'i'
),
'[óòỏõọôốồổỗộơớờởỡợ]',
'o'
),
'[úùủũụưứừửữự]',
'u'
),
'[ýỳỷỹỵ]',
'y'
),
'đ',
'd'
),
';',
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
LOWER(TRIM(district)),
'\s+',
' '
),
'[áàảãạăắằẳẵặâấầẩẫậ]',
'a'
),
'[éèẻẽẹêếềểễệ]',
'e'
),
'[íìỉĩị]',
'i'
),
'[óòỏõọôốồổỗộơớờởỡợ]',
'o'
),
'[úùủũụưứừửữự]',
'u'
),
'[ýỳỷỹỵ]',
'y'
),
'đ',
'd'
),
';',
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
LOWER(TRIM(customer_address)),
'\s+',
' '
),
'[áàảãạăắằẳẵặâấầẩẫậ]',
'a'
),
'[éèẻẽẹêếềểễệ]',
'e'
),
'[íìỉĩị]',
'i'
),
'[óòỏõọôốồổỗộơớờởỡợ]',
'o'
),
'[úùủũụưứừửữự]',
'u'
),
'[ýỳỷỹỵ]',
'y'
),
'đ',
'd'
)) AS cl_full_address,
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
LOWER(TRIM(city)),
'\s+',
' '
),
'[áàảãạăắằẳẵặâấầẩẫậ]',
'a'
),
'[éèẻẽẹêếềểễệ]',
'e'
),
'[íìỉĩị]',
'i'
),
'[óòỏõọôốồổỗộơớờởỡợ]',
'o'
),
'[úùủũụưứừửữự]',
'u'
),
'[ýỳỷỹỵ]',
'y'
),
'đ',
'd'
) AS cl_city,
*
FROM
${db}.${source_tbl}
Loading

0 comments on commit 3a7624b

Please sign in to comment.