diff --git a/.flake8 b/.flake8
new file mode 100644
index 0000000..619c1b7
--- /dev/null
+++ b/.flake8
@@ -0,0 +1,5 @@
+[flake8]
+ignore = E203, E266, E501, W503, F403, F401
+max-line-length = 120
+max-complexity = 20
+select = B,C,E,F,W,T4,B9
diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
new file mode 100644
index 0000000..1b82d87
--- /dev/null
+++ b/.github/workflows/main.yml
@@ -0,0 +1,76 @@
+name: Python application
+
+on: [pull_request]
+
+jobs:
+ build:
+ name: tests-pass
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up Python 3.7
+ uses: actions/setup-python@v1
+ with:
+ python-version: 3.7
+# - name: Install dependencies
+# run: |
+# python -m pip install --upgrade pip
+# pip install -r requirements.txt
+# - name: Test with pytest
+# run: |
+# coverage run -m pytest tests
+# coverage xml -o coverage/python.xml
+# - name: Report python coverage
+# uses: orgoro/coverage@v3
+# with:
+# coverageFile: coverage/python.xml
+# token: ${{ secrets.GITHUB_TOKEN }}
+ # The next few steps only apply if you have javascript files
+ # - name: Setup node
+ # uses: actions/setup-node@v3
+ # with:
+ # node-version: '18'
+ # - name: Test with jest
+ # shell: bash
+ # run: |
+ # npm install
+ # npm test -- --coverage --coverageReporters="json-summary" --coverageReporters="text" | tee ./coverage.txt
+ # shell: bash
+ # - name: Report javascript coverage
+ # uses: MishaKav/jest-coverage-comment@v1.0.20
+ # with:
+ # title: "JavaScript Coverage"
+ # summary-title: "Summary"
+ # coverage-title: "Modified Files"
+ # github-token: ${{ secrets.GITHUB_TOKEN }}
+ # report-only-changed-files: true
+ # coverage-path: ./JS-FOLDER-NAME/coverage.txt
+ # coverage-summary-path: ./JS-FOLDER-NAME/coverage/coverage-summary.json
+ # coverage-path-prefix: JS-FOLDER-NAME/src/
+ # - name: Build output files
+ # run: |
+ # npm run build
+ # - name: Check links in built files
+ # id: link_check
+ # run: |
+ # find public -name "*.js" -exec grep -Eo "(http|https):\/\/[^]\{\}\"'\\\(\)\> ]+" {} \; | sort -u > linklist.txt
+ # printf '%s\n%s\n%s\n' "# LinkChecker URL list" "# " "$(cat linklist.txt)" > linklist.txt
+ # linkchecker linklist.txt --check-extern --ignore-url="https://.*\.fastly\.net/.*" --ignore-url="https://.*\.mapbox\..*" --ignore-url=".*//a\W.*" --ignore-url="http://(a|x|ัะตัั)" -o failures > output.txt || true
+ # cat output.txt
+ # echo "num_links=$(wc -l < output.txt | sed 's/^ *//g')" >> $GITHUB_OUTPUT
+ # echo "links<> $GITHUB_OUTPUT
+ # echo "$(cat output.txt)" >> $GITHUB_OUTPUT
+ # echo "EOFdelimiter" >> $GITHUB_OUTPUT
+ # - name: Edit PR comment about link checking
+ # if: steps.link_check.outputs.num_links > 0
+ # uses: thollander/actions-comment-pull-request@v2
+ # with:
+ # message: |
+ # There are ${{ steps.link_check.outputs.num_links }} broken links. Check the code for these links:
+ # ${{ steps.link_check.outputs.links }}
+ # comment_tag: link_check_msg
+ - name: Run linting
+ run: |
+ pip install pre-commit
+ pre-commit run --all-files
diff --git a/.github/workflows/rebase-reminder.yml b/.github/workflows/rebase-reminder.yml
new file mode 100644
index 0000000..0c30392
--- /dev/null
+++ b/.github/workflows/rebase-reminder.yml
@@ -0,0 +1,42 @@
+name: Rebase reminder
+on: [pull_request, pull_request_review]
+
+jobs:
+ build:
+ name: rebuild-reminder
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v3
+ with:
+ fetch-depth: 0
+ - name: Find behind count
+ id: behind_count
+ run: |
+ echo "behind_count=$(git rev-list --count ${{ github.event.pull_request.head.sha }}..${{ github.event.pull_request.base.sha }})" >> $GITHUB_OUTPUT
+ - name: Find ahead count
+ id: ahead_count
+ run: |
+ echo "ahead_count=$(git rev-list --count ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }})" >> $GITHUB_OUTPUT
+ - name: Find combined count
+ id: combined_count
+ run: |
+ echo "combined_count=$(expr ${{steps.behind_count.outputs.behind_count}} + ${{steps.ahead_count.outputs.ahead_count}})" >> $GITHUB_OUTPUT
+ - name: Edit PR comment - rebasing
+ if: steps.behind_count.outputs.behind_count > 0 && steps.combined_count.outputs.combined_count > 3
+ uses: thollander/actions-comment-pull-request@v1
+ with:
+ message: |
+ Needs rebasing :bangbang:
+ behind_count is ${{ steps.behind_count.outputs.behind_count }}
+ ahead_count is ${{ steps.ahead_count.outputs.ahead_count }}
+ comment_includes: 'rebasing'
+ - name: Edit PR comment - no rebasing
+ if: steps.behind_count.outputs.behind_count == 0 || steps.combined_count.outputs.combined_count <= 3
+ uses: thollander/actions-comment-pull-request@v1
+ with:
+ message: |
+ No need for rebasing :+1:
+ behind_count is ${{ steps.behind_count.outputs.behind_count }}
+ ahead_count is ${{ steps.ahead_count.outputs.ahead_count }}
+ comment_includes: 'rebasing'
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 0000000..2b0e452
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,37 @@
+# See https://pre-commit.com for more information
+# See https://pre-commit.com/hooks.html for more hooks
+repos:
+- repo: https://github.com/pre-commit/pre-commit-hooks
+ rev: v4.1.0
+ hooks:
+ - id: trailing-whitespace
+ exclude: "__snapshots__"
+ - id: end-of-file-fixer
+ - id: check-yaml
+ - id: check-added-large-files
+ - id: check-json
+# The next step only applies if you have javascript files.
+# There should be a package.json that installs eslint
+# (or eslint-config-react-app if you are using gatsby).
+# - repo: https://github.com/pre-commit/mirrors-eslint
+# rev: v8.24.0
+# hooks:
+# - id: eslint
+- repo: https://github.com/PyCQA/isort
+ rev: 5.11.5
+ hooks:
+ - id: isort
+- repo: https://github.com/ambv/black
+ rev: 22.3.0
+ hooks:
+ - id: black
+ language_version: python3
+- repo: https://github.com/PyCQA/flake8
+ rev: 4.0.1
+ hooks:
+ - id: flake8
+#- repo: https://github.com/sqlfluff/sqlfluff
+# rev: 0.10.1
+# hooks:
+# - id: sqlfluff-lint
+# - id: sqlfluff-fix
diff --git a/push_to_airflow.sh b/push_to_airflow.sh
index 0f5c4c0..a48d964 100755
--- a/push_to_airflow.sh
+++ b/push_to_airflow.sh
@@ -1,4 +1,4 @@
-gsutil cp ror_dag.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/
-gsutil -m cp -r ror_scripts gs://us-east1-production2023-cc1-01d75926-bucket/dags/
-gsutil -m cp -r schemas/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/ror/
+gsutil cp ror_dag.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/
+gsutil -m cp -r ror_scripts/* gs://airflow-data-exchange/ror/scripts/
+gsutil -m cp -r schemas/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/ror/
gsutil -m cp -r schemas gs://airflow-data-exchange/ror/
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..43139b7
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,14 @@
+[tool.black]
+py36 = true
+include = '\.pyi?$'
+exclude = '''
+/(
+\.git
+| \.venv
+| build
+| dist
+)/
+'''
+
+[tool.isort]
+profile = "black"
diff --git a/ror_dag.py b/ror_dag.py
index 81342dc..938364c 100644
--- a/ror_dag.py
+++ b/ror_dag.py
@@ -1,83 +1,122 @@
import json
-import os
+from datetime import datetime
from airflow import DAG
-from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryCheckOperator
-from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator
-from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
-from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.python import PythonOperator
-from airflow.hooks.base_hook import BaseHook
-from airflow.providers.slack.operators.slack import SlackAPIPostOperator
-from datetime import timedelta, datetime
-
-from dataloader.airflow_utils.slack import task_fail_slack_alert
+from airflow.providers.google.cloud.operators.bigquery import (
+ BigQueryCheckOperator,
+ BigQueryInsertJobOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
+from airflow.providers.google.cloud.operators.kubernetes_engine import (
+ GKEStartPodOperator,
+)
+from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import (
+ BigQueryToBigQueryOperator,
+)
+from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
+ GCSToBigQueryOperator,
+)
+from dataloader.airflow_utils.defaults import (
+ DAGS_DIR,
+ DATA_BUCKET,
+ GCP_ZONE,
+ PROJECT_ID,
+ get_default_args,
+ get_post_success,
+)
from dataloader.scripts.populate_documentation import update_table_descriptions
-from ror_scripts.fetch import fetch
+args = get_default_args(pocs=["Jennifer"])
+args["retries"] = 1
-default_args = {
- "owner": "airflow",
- "depends_on_past": False,
- "start_date": datetime(2022, 1, 7),
- "email": ["jennifer.melot@georgetown.edu"],
- "email_on_failure": True,
- "email_on_retry": True,
- "retries": 1,
- "retry_delay": timedelta(minutes=5),
- "on_failure_callback": task_fail_slack_alert
-}
-
-with DAG("ror_updater",
- default_args=default_args,
- description="Links articles across our scholarly lit holdings.",
- schedule_interval="0 0 * * 5",
- catchup=False
- ) as dag:
- slack_webhook = BaseHook.get_connection("slack")
- bucket = "airflow-data-exchange"
+with DAG(
+ "ror_updater",
+ default_args=args,
+ description="Links articles across our scholarly lit holdings.",
+ schedule_interval="0 0 * * 5",
+ catchup=False,
+) as dag:
gcs_folder = "ror"
tmp_dir = f"{gcs_folder}/tmp"
raw_data_dir = f"{gcs_folder}/data"
schema_dir = f"{gcs_folder}/schemas"
sql_dir = f"sql/{gcs_folder}"
production_dataset = "gcp_cset_ror"
- staging_dataset = "staging_"+production_dataset
- backup_dataset = production_dataset+"_backups"
- project_id = "gcp-cset-projects"
- gce_zone = "us-east1-c"
- dags_dir = os.environ.get("DAGS_FOLDER")
+ staging_dataset = "staging_" + production_dataset
+ backup_dataset = production_dataset + "_backups"
# We keep several intermediate outputs in a tmp dir on gcs, so clean it out at the start of each run. We clean at
# the start of the run so if the run fails we can examine the failed data
clear_tmp_dir = GCSDeleteObjectsOperator(
- task_id="clear_tmp_gcs_dir",
- bucket_name=bucket,
- prefix=tmp_dir + "/"
+ task_id="clear_tmp_gcs_dir", bucket_name=DATA_BUCKET, prefix=tmp_dir + "/"
)
# Retrieve and expand the data
- json_loc = tmp_dir+"/ror.jsonl"
- fetch = PythonOperator(
- task_id="fetch",
- op_kwargs={
- "output_bucket": bucket,
- "output_loc": json_loc,
+ json_loc = tmp_dir + "/ror.jsonl"
+ working_dir = "ror_working_dir"
+ setup_commands = f"rm -rf {working_dir};" + " && ".join(
+ [
+ f"mkdir {working_dir}",
+ f"cd {working_dir}",
+ f"gsutil -m cp -r gs://{DATA_BUCKET}/{gcs_folder}/scripts/* .",
+ "virtualenv venv",
+ ". venv/bin/activate",
+ "python3 -m pip install google-cloud-storage",
+ ]
+ )
+ download_data = GKEStartPodOperator(
+ task_id="download_data",
+ name="1790_er_download_data",
+ project_id=PROJECT_ID,
+ location=GCP_ZONE,
+ cluster_name="cc2-task-pool",
+ do_xcom_push=True,
+ cmds=["/bin/bash"],
+ arguments=[
+ "-c",
+ (
+ setup_commands
+ + f" && python3 fetch.py --output_bucket '{DATA_BUCKET}' --output_loc '{json_loc}'"
+ ),
+ ],
+ namespace="default",
+ image=f"gcr.io/{PROJECT_ID}/cc2-task-pool",
+ get_logs=True,
+ startup_timeout_seconds=300,
+ on_finish_action="delete_pod",
+ affinity={
+ "nodeAffinity": {
+ "requiredDuringSchedulingIgnoredDuringExecution": {
+ "nodeSelectorTerms": [
+ {
+ "matchExpressions": [
+ {
+ "key": "cloud.google.com/gke-nodepool",
+ "operator": "In",
+ "values": [
+ "default-pool",
+ ],
+ }
+ ]
+ }
+ ]
+ }
+ }
},
- python_callable=fetch
)
# Load into GCS
load_staging = GCSToBigQueryOperator(
task_id="load_staging",
- bucket=bucket,
+ bucket=DATA_BUCKET,
source_objects=[json_loc],
schema_object=f"{schema_dir}/ror.json",
destination_project_dataset_table=f"{staging_dataset}.ror",
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
- write_disposition="WRITE_TRUNCATE"
+ write_disposition="WRITE_TRUNCATE",
)
# Check that the number of ids is >= what we have in production and that the ids are unique
@@ -85,55 +124,60 @@
BigQueryCheckOperator(
task_id="check_unique_ids",
sql=(f"select count(distinct(id)) = count(id) from {staging_dataset}.ror"),
- use_legacy_sql=False
+ use_legacy_sql=False,
),
BigQueryCheckOperator(
task_id="check_monotonic_increase",
- sql=(f"select (select count(0) from {staging_dataset}.ror) >= "
- f"(select count(0) from {production_dataset}.ror)"),
- use_legacy_sql=False
- )
+ sql=(
+ f"select (select count(0) from {staging_dataset}.ror) >= "
+ f"(select count(0) from {production_dataset}.ror)"
+ ),
+ use_legacy_sql=False,
+ ),
]
# Load into production
load_production = BigQueryToBigQueryOperator(
- task_id=f"load_production",
+ task_id="load_production",
source_project_dataset_tables=[f"{staging_dataset}.ror"],
destination_project_dataset_table=f"{production_dataset}.ror",
create_disposition="CREATE_IF_NEEDED",
- write_disposition="WRITE_TRUNCATE"
+ write_disposition="WRITE_TRUNCATE",
)
# Update descriptions
- with open(f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/table_descriptions.json") as f:
+ with open(f"{DAGS_DIR}/schemas/{gcs_folder}/table_descriptions.json") as f:
table_desc = json.loads(f.read())
pop_descriptions = PythonOperator(
task_id="populate_column_documentation",
op_kwargs={
- "input_schema": f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/ror.json",
+ "input_schema": f"{DAGS_DIR}/schemas/{gcs_folder}/ror.json",
"table_name": f"{production_dataset}.ror",
- "table_description": table_desc["ror"]
+ "table_description": table_desc["ror"],
},
- python_callable=update_table_descriptions
+ python_callable=update_table_descriptions,
)
# Copy to backups
curr_date = datetime.now().strftime("%Y%m%d")
backup = BigQueryToBigQueryOperator(
- task_id=f"snapshot_ror",
+ task_id="snapshot_ror",
source_project_dataset_tables=[f"{production_dataset}.ror"],
destination_project_dataset_table=f"{backup_dataset}.ror_{curr_date}",
create_disposition="CREATE_IF_NEEDED",
- write_disposition="WRITE_TRUNCATE"
+ write_disposition="WRITE_TRUNCATE",
)
# Declare victory
- success_alert = SlackAPIPostOperator(
- task_id="post_success",
- token=slack_webhook.password,
- text="ROR update succeeded!",
- channel=slack_webhook.login,
- username="airflow"
- )
+ success_alert = get_post_success("ROR update succeeded!", dag)
- clear_tmp_dir >> fetch >> load_staging >> checks >> load_production >> pop_descriptions >> backup >> success_alert
+ (
+ clear_tmp_dir
+ >> download_data
+ >> load_staging
+ >> checks
+ >> load_production
+ >> pop_descriptions
+ >> backup
+ >> success_alert
+ )
diff --git a/ror_scripts/fetch.py b/ror_scripts/fetch.py
index 2d8e18f..d7f4638 100644
--- a/ror_scripts/fetch.py
+++ b/ror_scripts/fetch.py
@@ -1,10 +1,11 @@
+import argparse
import json
import os
-import requests
import tempfile
+from zipfile import ZipFile
+import requests
from google.cloud import storage
-from zipfile import ZipFile
def fetch(output_bucket: str, output_loc: str) -> None:
@@ -15,7 +16,9 @@ def fetch(output_bucket: str, output_loc: str) -> None:
:param output_loc: blob name where data should be written on GCS
:return: None
"""
- resp = requests.get("https://zenodo.org/api/records/?communities=ror-data&sort=mostrecent")
+ resp = requests.get(
+ "https://zenodo.org/api/records/?communities=ror-data&sort=mostrecent"
+ )
dataset_js = resp.json()
latest_delivery = dataset_js["hits"]["hits"][0]["files"][0]["links"]["self"]
zip_resp = requests.get(latest_delivery)
@@ -33,8 +36,21 @@ def fetch(output_bucket: str, output_loc: str) -> None:
js = json.loads(f.read())
with open(output_file, mode="w") as out:
for elt in js:
- out.write(json.dumps(elt)+"\n")
+ out.write(json.dumps(elt) + "\n")
storage_client = storage.Client()
bucket = storage_client.bucket(output_bucket)
blob = bucket.blob(output_loc)
blob.upload_from_filename(output_file)
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--output_bucket", help="GCS bucket where data should be written", required=True
+ )
+ parser.add_argument(
+ "--output_loc", help="Blob name where data shuld be written", required=True
+ )
+ args = parser.parse_args()
+
+ fetch(args.output_bucket, args.output_loc)
diff --git a/schemas/ror.json b/schemas/ror.json
index 621b564..614c256 100644
--- a/schemas/ror.json
+++ b/schemas/ror.json
@@ -461,4 +461,4 @@
"name": "external_ids",
"type": "RECORD"
}
-]
\ No newline at end of file
+]
diff --git a/schemas/table_descriptions.json b/schemas/table_descriptions.json
index 1ae8ef7..79be848 100644
--- a/schemas/table_descriptions.json
+++ b/schemas/table_descriptions.json
@@ -1,3 +1,3 @@
{
"ror": "https://ror.org"
-}
\ No newline at end of file
+}