Skip to content

Commit

Permalink
explicitly definte schema and types, use response.json() (#6931)
Browse files Browse the repository at this point in the history
* coerce type of received data to be strings - no need to worry about records of varying schema - explicitly define schema of table, take out import tempfile

* input correct dataset: addon_moderations_derived

* update typed_metadata, applied_policies to be STRING, job_assigned_at as STRING

* explicitly define typed_metadata record, applied_policy record, make entity type JSON, change dates from string to timestamps

* remove explicit definition of schema in query.py, import schema file instead, correct spelling error in schema.yaml

* update description of uuid

* Update sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/query.py

Co-authored-by: Sean Rose <1994030+sean-rose@users.noreply.github.com>

* Update sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/query.py

Co-authored-by: Sean Rose <1994030+sean-rose@users.noreply.github.com>

* Update sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/query.py

Co-authored-by: Sean Rose <1994030+sean-rose@users.noreply.github.com>

---------

Co-authored-by: Sean Rose <1994030+sean-rose@users.noreply.github.com>
  • Loading branch information
Marlene-M-Hirose and sean-rose authored Jan 31, 2025
1 parent 8d21c56 commit 31782e9
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import json
import os
import tempfile
from argparse import ArgumentParser
from datetime import datetime, timedelta
from pathlib import Path

import requests
from google.cloud import bigquery

from bigquery_etl.schema import SCHEMA_FILE, Schema

"""Get the bearer token for Cinder from the environment"""
cinder_bearer_token = os.environ.get("CINDER_TOKEN")

Expand All @@ -28,7 +30,7 @@ def get_response(url, headers, params):
if (response.status_code == 401) or (response.status_code == 400):
print(f"***Error: {response.status_code}***")
print(response.text)
return response
return response.json()


def read_json(filename: str) -> dict:
Expand Down Expand Up @@ -57,20 +59,6 @@ def cinder_addon_decisions_download(date, bearer_token):
return response


def check_json(cinder_addon_decisions_response_text):
"""Script will return an empty dictionary for apps on days when there is no data. Check for that here."""
with tempfile.NamedTemporaryFile() as tmp_json:
with open(tmp_json.name, "w") as f_json:
f_json.write(cinder_addon_decisions_response_text)
try:
query_export = read_json(f_json.name)
except (
ValueError
): # ex. json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
return None
return query_export


def add_date_to_json(query_export_contents, date):
"""Add a date to the entries so we can partition table by this date."""
fields_list = []
Expand All @@ -86,6 +74,7 @@ def add_date_to_json(query_export_contents, date):
"entity_slug": item["entity_slug"],
"job_id": item["job_id"],
"job_assigned_at": item["job_assigned_at"],
"queue_slug": item["queue_slug"],
"typed_metadata": item["typed_metadata"],
"applied_policies": item["applied_policies"],
}
Expand All @@ -97,6 +86,7 @@ def upload_to_bigquery(data, project, dataset, table_name, date):
"""Upload the data to bigquery."""
date = date
partition = f"{date}".replace("-", "")
schema_file_path = Path(__file__).parent / SCHEMA_FILE
client = bigquery.Client(project)
job_config = bigquery.LoadJobConfig(
create_disposition="CREATE_IF_NEEDED",
Expand All @@ -105,6 +95,7 @@ def upload_to_bigquery(data, project, dataset, table_name, date):
type_=bigquery.TimePartitioningType.DAY,
field="date",
),
schema=Schema.from_schema_file(schema_file_path).to_bigquery_schema(),
)
destination = f"{project}.{dataset}.{table_name}${partition}"
job = client.load_table_from_json(data, destination, job_config=job_config)
Expand All @@ -129,14 +120,12 @@ def main():

cinder_data = []

json_file = cinder_addon_decisions_download(date, bearer_token)
"""Data returns as a dictionary with a key called 'items' and the value being a list of data"""
query_export = check_json(json_file.text)
"""Add date to each element in query_export for partitioning"""
query_export = cinder_addon_decisions_download(date, bearer_token)
# Data returns as a dictionary with a key called 'items' and the value being a list of data"""
query_export_contents = query_export["items"]
# Add date to each element in query_export for partitioning"""
cinder_data = add_date_to_json(query_export_contents, date)
"""Pull out the list from query_export["items"] and put that data into the cinder_data list"""

# Pull out[ the list from query_export["items"] and put that data into the cinder_data list"""
upload_to_bigquery(cinder_data, project, dataset, table_name, date)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,125 +6,34 @@ fields:
description: Date job is run

- mode: NULLABLE
name: user
name: decision_type
type: STRING
description: User who submitted the report
description: The type of decision

- mode: NULLABLE
name: uuid
name: user
type: STRING
description: ID of UU
description: User who submitted the report

- mode: NULLABLE
name: created_at
type: TIMESTAMP
description: Date decision made

- mode: NULLABLE
name: decision_type
type: STRING
description: The type of decision

- fields:
- mode: NULLABLE
name: uuid
type: STRING
- mode: NULLABLE
name: name
type: STRING
description: The name of the policy applied
- mode: NULLABLE
name: is_illegal
type: BOOLEAN
description: Signifies if the addon illegal or not
- mode: NULLABLE
name: parent_uuid
type: STRING
description: Top level UUID
- mode: REPEATED
name: enforcement_actions
type: STRING
description: List of enforcement actions
mode: REPEATED
name: applied_policies
type: RECORD
description: Applied policies includes parent_uuid, is_illegal field, name, enforcement_actions, uuid
name: entity_id
type: INTEGER
description: Entity ID

- fields:
- mode: NULLABLE
name: entity_type
type: STRING
description: Type of entity
- fields:
- mode: NULLABLE
name: id
type: INTEGER
description: Attribute ID
- mode: NULLABLE
name: slug
type: STRING
description: Slug
- mode: NULLABLE
name: guid
type: STRING
description: GU ID
- mode: NULLABLE
name: name
type: STRING
description: Name of attribute
- mode: NULLABLE
name: summary
type: STRING
description: Summary of the action taken
- mode: NULLABLE
name: description
type: STRING
description: Description of the issue
- mode: NULLABLE
name: release_notes
type: STRING
description: notes about the release
- mode: NULLABLE
name: average_daily_users
type: INT64
description: Average number of daily users
- mode: NULLABLE
name: last_updated
type: TIMESTAMP
description: when the job was last updated
- mode: NULLABLE
name: version
type: STRING
description: Which version was this addon
- mode: NULLABLE
name: privacy_policy
type: STRING
description: Which privacy policy is used
- mode: NULLABLE
name: created
type: TIMESTAMP
description: When the attribute was created
- mode: NULLABLE
name: promoted
type: STRING
description: Promoted
mode: NULLABLE
name: attributes
type: RECORD
description: All the attributes
- mode: NULLABLE
name: classifier_scores
type: STRING
description: Top level UUID
mode: NULLABLE
- mode: NULLABLE
name: entity
type: RECORD
type: JSON
description: Entity includes classifier_scores, attributes(RECORD), entity_type

- mode: NULLABLE
name: entity_id
type: INTEGER
description: Entity ID
name: uuid
type: STRING
description: Presumably the id of the decision record

- mode: NULLABLE
name: entity_slug
Expand All @@ -138,7 +47,7 @@ fields:

- mode: NULLABLE
name: job_assigned_at
type: STRING
type: TIMESTAMP
description: Date addon report was assigned to a moderator

- mode: NULLABLE
Expand All @@ -148,7 +57,7 @@ fields:

- fields:
- mode: NULLABLE
name: legacy_decisions_labels
name: legacy_decision_labels
type: STRING
description: Legacy decision label
- mode: NULLABLE
Expand All @@ -163,3 +72,28 @@ fields:
name: typed_metadata
type: RECORD
description: Typed Metadata includes escalation_details, policy_map, legacy_decision_labels

- fields:
- mode: NULLABLE
name: uuid
type: STRING
- mode: NULLABLE
name: name
type: STRING
description: The name of the policy applied
- mode: NULLABLE
name: is_illegal
type: BOOLEAN
description: Signifies if the addon illegal or not
- mode: NULLABLE
name: parent_uuid
type: STRING
description: Top level UUID
- mode: REPEATED
name: enforcement_actions
type: STRING
description: List of enforcement actions
mode: REPEATED
name: applied_policies
type: RECORD
description: Applied policies includes parent_uuid, is_illegal field, name, enforcement_actions, uuid

1 comment on commit 31782e9

@dataops-ci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integration report for "explicitly definte schema and types, use response.json() (#6931)"

sql.diff

Click to expand!
diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/query.py /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/query.py
--- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/query.py	2025-01-31 23:04:57.000000000 +0000
+++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/query.py	2025-01-31 23:04:57.000000000 +0000
@@ -2,13 +2,15 @@
 
 import json
 import os
-import tempfile
 from argparse import ArgumentParser
 from datetime import datetime, timedelta
+from pathlib import Path
 
 import requests
 from google.cloud import bigquery
 
+from bigquery_etl.schema import SCHEMA_FILE, Schema
+
 """Get the bearer token for Cinder from the environment"""
 cinder_bearer_token = os.environ.get("CINDER_TOKEN")
 
@@ -28,7 +30,7 @@
     if (response.status_code == 401) or (response.status_code == 400):
         print(f"***Error: {response.status_code}***")
         print(response.text)
-    return response
+    return response.json()
 
 
 def read_json(filename: str) -> dict:
@@ -57,20 +59,6 @@
     return response
 
 
-def check_json(cinder_addon_decisions_response_text):
-    """Script will return an empty dictionary for apps on days when there is no data. Check for that here."""
-    with tempfile.NamedTemporaryFile() as tmp_json:
-        with open(tmp_json.name, "w") as f_json:
-            f_json.write(cinder_addon_decisions_response_text)
-            try:
-                query_export = read_json(f_json.name)
-            except (
-                ValueError
-            ):  # ex. json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
-                return None
-    return query_export
-
-
 def add_date_to_json(query_export_contents, date):
     """Add a date to the entries so we can partition table by this date."""
     fields_list = []
@@ -86,6 +74,7 @@
             "entity_slug": item["entity_slug"],
             "job_id": item["job_id"],
             "job_assigned_at": item["job_assigned_at"],
+            "queue_slug": item["queue_slug"],
             "typed_metadata": item["typed_metadata"],
             "applied_policies": item["applied_policies"],
         }
@@ -97,6 +86,7 @@
     """Upload the data to bigquery."""
     date = date
     partition = f"{date}".replace("-", "")
+    schema_file_path = Path(__file__).parent / SCHEMA_FILE
     client = bigquery.Client(project)
     job_config = bigquery.LoadJobConfig(
         create_disposition="CREATE_IF_NEEDED",
@@ -105,6 +95,7 @@
             type_=bigquery.TimePartitioningType.DAY,
             field="date",
         ),
+        schema=Schema.from_schema_file(schema_file_path).to_bigquery_schema(),
     )
     destination = f"{project}.{dataset}.{table_name}${partition}"
     job = client.load_table_from_json(data, destination, job_config=job_config)
@@ -129,14 +120,12 @@
 
     cinder_data = []
 
-    json_file = cinder_addon_decisions_download(date, bearer_token)
-    """Data returns as a dictionary with a key called 'items' and the value being a list of data"""
-    query_export = check_json(json_file.text)
-    """Add date to each element in query_export for partitioning"""
+    query_export = cinder_addon_decisions_download(date, bearer_token)
+    # Data returns as a dictionary with a key called 'items' and the value being a list of data"""
     query_export_contents = query_export["items"]
+    # Add date to each element in query_export for partitioning"""
     cinder_data = add_date_to_json(query_export_contents, date)
-    """Pull out the list from query_export["items"] and put that data into the cinder_data list"""
-
+    # Pull out[ the list from query_export["items"] and put that data into the cinder_data list"""
     upload_to_bigquery(cinder_data, project, dataset, table_name, date)
 
 
diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/schema.yaml /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/schema.yaml
--- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/schema.yaml	2025-01-31 23:04:57.000000000 +0000
+++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/schema.yaml	2025-01-31 23:04:57.000000000 +0000
@@ -6,14 +6,14 @@
   description: Date job is run
 
 - mode: NULLABLE
-  name: user
+  name: decision_type
   type: STRING
-  description: User who submitted the report
+  description: The type of decision
 
 - mode: NULLABLE
-  name: uuid
+  name: user
   type: STRING
-  description: ID of UU
+  description: User who submitted the report
 
 - mode: NULLABLE
   name: created_at
@@ -21,110 +21,19 @@
   description: Date decision made
 
 - mode: NULLABLE
-  name: decision_type
-  type: STRING
-  description: The type of decision
-
-- fields:
-  - mode: NULLABLE
-    name: uuid
-    type: STRING
-  - mode: NULLABLE
-    name: name
-    type: STRING
-    description: The name of the policy applied
-  - mode: NULLABLE
-    name: is_illegal
-    type: BOOLEAN
-    description: Signifies if the addon illegal or not
-  - mode: NULLABLE
-    name: parent_uuid
-    type: STRING
-    description: Top level UUID
-  - mode: REPEATED
-    name: enforcement_actions
-    type: STRING
-    description: List of enforcement actions
-  mode: REPEATED
-  name: applied_policies
-  type: RECORD
-  description: Applied policies includes parent_uuid, is_illegal field, name, enforcement_actions, uuid
-
-- fields:
-  - mode: NULLABLE
-    name: entity_type
-    type: STRING
-    description: Type of entity
-  - fields:
-    - mode: NULLABLE
-      name: id
+  name: entity_id
       type: INTEGER
-      description: Attribute ID
-    - mode: NULLABLE
-      name: slug
-      type: STRING
-      description: Slug
-    - mode: NULLABLE
-      name: guid
-      type: STRING
-      description: GU ID
-    - mode: NULLABLE
-      name: name
-      type: STRING
-      description: Name of attribute
-    - mode: NULLABLE
-      name: summary
-      type: STRING
-      description: Summary of the action taken
-    - mode: NULLABLE
-      name: description
-      type: STRING
-      description: Description of the issue
-    - mode: NULLABLE
-      name: release_notes
-      type: STRING
-      description: notes about the release
-    - mode: NULLABLE
-      name: average_daily_users
-      type: INT64
-      description: Average number of daily users
-    - mode: NULLABLE
-      name: last_updated
-      type: TIMESTAMP
-      description: when the job was last updated
-    - mode: NULLABLE
-      name: version
-      type: STRING
-      description: Which version was this addon
-    - mode: NULLABLE
-      name: privacy_policy
-      type: STRING
-      description: Which privacy policy is used
-    - mode: NULLABLE
-      name: created
-      type: TIMESTAMP
-      description: When the attribute was created
-    - mode: NULLABLE
-      name: promoted
-      type: STRING
-      description: Promoted
-    mode: NULLABLE
-    name: attributes
-    type: RECORD
-    description: All the attributes
-  - mode: NULLABLE
-    name: classifier_scores
-    type: STRING
-    description: Top level UUID
-  mode: NULLABLE
+  description: Entity ID
+
+- mode: NULLABLE
   name: entity
-  type: RECORD
+  type: JSON
   description: Entity includes classifier_scores, attributes(RECORD), entity_type
 
 - mode: NULLABLE
-  name: entity_id
-  type: INTEGER
-  description: Entity ID
+  name: uuid
+  type: STRING
+  description: Presumably the id of the decision record
 
 - mode: NULLABLE
   name: entity_slug
@@ -138,7 +47,7 @@
 
 - mode: NULLABLE
   name: job_assigned_at
-  type: STRING
+  type: TIMESTAMP
   description: Date addon report was assigned to a moderator
 
 - mode: NULLABLE
@@ -148,7 +57,7 @@
 
 - fields:
   - mode: NULLABLE
-    name: legacy_decisions_labels
+    name: legacy_decision_labels
     type: STRING
     description: Legacy decision label
   - mode: NULLABLE
@@ -163,3 +72,28 @@
   name: typed_metadata
   type: RECORD
   description: Typed Metadata includes escalation_details, policy_map, legacy_decision_labels
+
+- fields:
+  - mode: NULLABLE
+    name: uuid
+    type: STRING
+  - mode: NULLABLE
+    name: name
+    type: STRING
+    description: The name of the policy applied
+  - mode: NULLABLE
+    name: is_illegal
+    type: BOOLEAN
+    description: Signifies if the addon illegal or not
+  - mode: NULLABLE
+    name: parent_uuid
+    type: STRING
+    description: Top level UUID
+  - mode: REPEATED
+    name: enforcement_actions
+    type: STRING
+    description: List of enforcement actions
+  mode: REPEATED
+  name: applied_policies
+  type: RECORD
+  description: Applied policies includes parent_uuid, is_illegal field, name, enforcement_actions, uuid

Link to full diff

Please sign in to comment.