Skip to content

Commit

Permalink
Use new LocationVisits schema and determine if imputed based on locat…
Browse files Browse the repository at this point in the history
…ion hours
  • Loading branch information
aaronfriedman6 committed Apr 18, 2024
1 parent 9616ff4 commit 9a4ab71
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 199 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 2024-04-11 -- v1.0.0
## 2024-04-18 -- v1.0.0
### Added
- Perform recovery queries on past thirty days of missing data

Expand Down
48 changes: 28 additions & 20 deletions helpers/query_helper.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
_REDSHIFT_HOURS_QUERY = """
SELECT sierra_code, weekday, regular_open, regular_close
FROM {hours_table} LEFT JOIN {codes_table}
ON {hours_table}.drupal_location_id = {codes_table}.drupal_code
WHERE date_of_change IS NULL;"""

_REDSHIFT_CREATE_TABLE_QUERY = """
CREATE TEMPORARY TABLE #recoverable_site_dates AS
WITH recovered_data_timestamps AS (
SELECT shoppertrak_site_id, orbit, increment_start
FROM {table}
WHERE is_recovery_data
AND increment_start >= '{start_date}'
AND increment_start < '{end_date}'
), recoverable_data_timestamps AS (
SELECT shoppertrak_site_id, orbit, increment_start
FROM {table}
WHERE NOT is_healthy_orbit
AND increment_start >= '{start_date}'
AND increment_start < '{end_date}'
EXCEPT
SELECT * FROM recovered_data_timestamps
)
SELECT shoppertrak_site_id, increment_start::DATE AS increment_date
FROM recoverable_data_timestamps
FROM {table}
WHERE NOT is_healthy_data
AND is_fresh
AND increment_start >= '{start_date}'
AND increment_start < '{end_date}'
GROUP BY shoppertrak_site_id, increment_date;"""

_REDSHIFT_KNOWN_QUERY = """
SELECT #recoverable_site_dates.shoppertrak_site_id, orbit, increment_start, enters,
exits
SELECT #recoverable_site_dates.shoppertrak_site_id, orbit, increment_start,
id, is_healthy_data, enters, exits
FROM #recoverable_site_dates LEFT JOIN {table}
ON #recoverable_site_dates.shoppertrak_site_id = {table}.shoppertrak_site_id
AND #recoverable_site_dates.increment_date = {table}.increment_start::DATE
WHERE is_healthy_orbit
ORDER BY poll_date;"""
WHERE is_fresh;"""

_REDSHIFT_UPDATE_QUERY = """
UPDATE {table} SET is_fresh = False
WHERE id IN ({ids});"""

REDSHIFT_DROP_QUERY = "DROP TABLE #recoverable_site_dates;"

Expand All @@ -36,6 +34,12 @@
ORDER BY increment_date, shoppertrak_site_id;"""


def build_redshift_hours_query(hours_table, codes_table):
return _REDSHIFT_HOURS_QUERY.format(
hours_table=hours_table, codes_table=codes_table
)


def build_redshift_create_table_query(table, start_date, end_date):
return _REDSHIFT_CREATE_TABLE_QUERY.format(
table=table, start_date=start_date, end_date=end_date
Expand All @@ -44,3 +48,7 @@ def build_redshift_create_table_query(table, start_date, end_date):

def build_redshift_known_query(table):
return _REDSHIFT_KNOWN_QUERY.format(table=table)


def build_redshift_update_query(table, ids):
return _REDSHIFT_UPDATE_QUERY.format(table=table, ids=ids)
136 changes: 84 additions & 52 deletions lib/pipeline_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from datetime import datetime, timedelta
from helpers.query_helper import (
build_redshift_create_table_query,
build_redshift_hours_query,
build_redshift_known_query,
build_redshift_update_query,
REDSHIFT_DROP_QUERY,
REDSHIFT_RECOVERABLE_QUERY,
)
Expand All @@ -31,17 +33,18 @@ def __init__(self):
os.environ["REDSHIFT_DB_USER"],
os.environ["REDSHIFT_DB_PASSWORD"],
)
self.shoppertrak_api_client = ShopperTrakApiClient(
os.environ["SHOPPERTRAK_USERNAME"], os.environ["SHOPPERTRAK_PASSWORD"]
)
self.avro_encoder = AvroEncoder(os.environ["LOCATION_VISITS_SCHEMA_URL"])
self.shoppertrak_api_client = None

self.yesterday = datetime.now(pytz.timezone("US/Eastern")).date() - timedelta(
days=1
)
self.redshift_table = "location_visits"
redshift_suffix = ""
if os.environ["REDSHIFT_DB_NAME"] != "production":
self.redshift_table += "_" + os.environ["REDSHIFT_DB_NAME"]
redshift_suffix = "_" + os.environ["REDSHIFT_DB_NAME"]
self.redshift_visits_table = "location_visits" + redshift_suffix
self.redshift_hours_table = "location_hours" + redshift_suffix
self.redshift_branch_codes_table = "branch_codes_map" + redshift_suffix

self.ignore_cache = os.environ.get("IGNORE_CACHE", False) == "True"
if not self.ignore_cache:
Expand All @@ -57,6 +60,14 @@ def __init__(self):

def run(self):
"""Main method for the class -- runs the pipeline"""
self.logger.info("Getting regular branch hours from Redshift")
location_hours_dict = self.get_location_hours_dict()
self.shoppertrak_api_client = ShopperTrakApiClient(
os.environ["SHOPPERTRAK_USERNAME"],
os.environ["SHOPPERTRAK_PASSWORD"],
location_hours_dict,
)

all_sites_start_date = self._get_poll_date(0) + timedelta(days=1)
all_sites_end_date = (
datetime.strptime(os.environ["END_DATE"], "%Y-%m-%d").date()
Expand All @@ -74,27 +85,39 @@ def run(self):

broken_start_date = self.yesterday - timedelta(days=29)
self.logger.info(
f"Attempting to recover previously missing data from {broken_start_date} "
f"Attempting to recover previously unhealthy data from {broken_start_date} "
f"up to {all_sites_start_date}"
)
self.process_broken_orbits(broken_start_date, all_sites_start_date)
self.logger.info("Finished attempting to recover missing data")
self.logger.info("Finished attempting to recover unhealthy data")
if not self.ignore_kinesis:
self.kinesis_client.close()

def get_location_hours_dict(self):
"""
Queries Redshift for each location's current regular hours and returns a map
from (branch_code, weekday) to (regular_open, regular_close)
"""
self.redshift_client.connect()
raw_hours = self.redshift_client.execute_query(
build_redshift_hours_query(
self.redshift_hours_table, self.redshift_branch_codes_table
)
)
self.redshift_client.close_connection()
return {(row[0], row[1]): (row[2], row[3]) for row in raw_hours}

def process_all_sites_data(self, end_date, batch_num):
"""Gets visits data from all available sites for the given day(s)"""
last_poll_date = self._get_poll_date(batch_num)
poll_date = last_poll_date + timedelta(days=1)
if poll_date <= end_date:
poll_date_str = poll_date.strftime("%Y%m%d")
self.logger.info(f"Beginning batch {batch_num+1}: {poll_date_str}")

self.logger.info(f"Beginning batch {batch_num+1}: {poll_date.isoformat()}")
all_sites_xml_root = self.shoppertrak_api_client.query(
ALL_SITES_ENDPOINT, poll_date_str
ALL_SITES_ENDPOINT, poll_date
)
results = self.shoppertrak_api_client.parse_response(
all_sites_xml_root, poll_date_str
all_sites_xml_root, poll_date
)

encoded_records = self.avro_encoder.encode_batch(results)
Expand All @@ -103,84 +126,93 @@ def process_all_sites_data(self, end_date, batch_num):
if not self.ignore_cache:
self.s3_client.set_cache({"last_poll_date": poll_date.isoformat()})

self.logger.info(f"Finished batch {batch_num+1}: {poll_date_str}")
self.logger.info(f"Finished batch {batch_num+1}: {poll_date.isoformat()}")
self.process_all_sites_data(end_date, batch_num + 1)

def process_broken_orbits(self, start_date, end_date):
"""
Re-queries individual sites with missing data from the past 30 days (a limit set
by the API) to see if any data has since been recovered
Re-queries individual sites with unhealthy data from the past 30 days (a limit
set by the API) to see if any data has since been recovered
"""
create_table_query = build_redshift_create_table_query(
self.redshift_table, start_date, end_date
self.redshift_visits_table, start_date, end_date
)
self.redshift_client.connect()
self.redshift_client.execute_transaction([(create_table_query, None)])
recoverable_site_dates = self.redshift_client.execute_query(
REDSHIFT_RECOVERABLE_QUERY
)
known_data = self.redshift_client.execute_query(
build_redshift_known_query(self.redshift_table)
build_redshift_known_query(self.redshift_visits_table)
)
self.redshift_client.execute_transaction([(REDSHIFT_DROP_QUERY, None)])
self.redshift_client.close_connection()

# For all the site/date pairs with missing data, form a dictionary of the
# currently stored healthy data for those sites on those dates where the key is
# (site ID, orbit, timestamp) and the value is (enters, exits). This is to
# prevent sending duplicate records when only some of the data for a site needs
# to be recovered on a particular date (e.g. when only one of several orbits is
# broken, or when an orbit goes down in the middle of the day).
# For all the site/date pairs with unhealthy data, form a dictionary of the
# currently stored data for those sites on those dates where the key is (site
# ID, orbit, timestamp) and the value is (Redshift ID, is_healthy_data, enters,
# exits). This is to mark old rows as stale and to prevent sending duplicate
# records when only some of the data for a site needs to be recovered on a
# particular date (e.g. when only one of several orbits is broken, or when an
# orbit goes down in the middle of the day).
known_data_dict = dict()
if known_data:
known_data_dict = {
(row[0], row[1], row[2]): (row[3], row[4]) for row in known_data
(row[0], row[1], row[2]): (row[3], row[4], row[5], row[6])
for row in known_data
}
self._recover_data(recoverable_site_dates, known_data_dict)
self.redshift_client.close_connection()

def _recover_data(self, site_dates, known_data_dict):
"""
Individually query the ShopperTrak API for each site/date pair with any missing
data. Then check to see if the returned data is actually "recovered" data, as it
may have never been missing to begin with. If so, send to Kinesis.
Individually query the ShopperTrak API for each site/date pair with any
unhealthy data. Then check to see if the returned data is actually "recovered"
data, as it may have never been unhealthy to begin with. If so, send to Kinesis.
"""
for row in site_dates:
date_str = row[1].strftime("%Y%m%d")
site_xml_root = self.shoppertrak_api_client.query(
SINGLE_SITE_ENDPOINT + row[0], date_str
SINGLE_SITE_ENDPOINT + row[0], row[1]
)
if site_xml_root:
site_results = self.shoppertrak_api_client.parse_response(
site_xml_root, date_str, is_recovery_mode=True
site_xml_root, row[1], is_recovery_mode=True
)
self._check_recovered_data(site_results, known_data_dict)
self._process_recovered_data(site_results, known_data_dict)

def _check_recovered_data(self, recovered_data, known_data_dict):
def _process_recovered_data(self, recovered_data, known_data_dict):
"""
Check that ShopperTrak "recovered" data was actually missing to begin with and,
if so, encode and send to Kinesis
Check that ShopperTrak "recovered" data was actually unhealthy to begin with
and, if so, encode, send to Kinesis, and mark old Redshift rows as stale
"""
results = []
for row in recovered_data:
fresh_key = (
row["shoppertrak_site_id"],
row["orbit"],
datetime.strptime(row["increment_start"], "%Y-%m-%d %H:%M:%S"),
stale_ids = []
for fresh_row in recovered_data:
key = (
fresh_row["shoppertrak_site_id"],
fresh_row["orbit"],
datetime.strptime(fresh_row["increment_start"], "%Y-%m-%d %H:%M:%S"),
)
fresh_value = (row["enters"], row["exits"])
if fresh_key in known_data_dict:
# If the data was already healthy in Redshift, check that the new API
# value matches what's in Redshift. Note that this assumes there is only
# a single healthy row in Redshift for each key. Otherwise, only the
# Redshift row with the most recent poll_date is checked.
if known_data_dict[fresh_key] != fresh_value:
if key not in known_data_dict:
results.append(fresh_row)
else:
known_row = known_data_dict[key]
if not known_row[1]: # previously unhealthy data
results.append(fresh_row)
stale_ids.append(str(known_row[0]))
elif ( # previously healthy data that doesn't match the new API data
fresh_row["enters"] != known_row[2]
or fresh_row["exits"] != known_row[3]
):
self.logger.warning(
f"Different healthy data found in API and Redshift: "
f"{fresh_key} mapped to {fresh_value} in the API and "
f"{known_data_dict[fresh_key]} in Redshift"
f"Different healthy data found in API and Redshift: {key} "
f"mapped to {fresh_row} in the API and {known_row} in Redshift"
)
else:
results.append(row)

# Mark old rows for successfully recovered data as stale
update_query = build_redshift_update_query(
self.redshift_visits_table, ",".join(stale_ids)
)
self.redshift_client.execute_transaction([(update_query, None)])

if results:
encoded_records = self.avro_encoder.encode_batch(results)
Expand Down
Loading

0 comments on commit 9a4ab71

Please sign in to comment.