From 525ae1c13d13b285b25e615380b659c9a86633a2 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Tue, 27 Feb 2024 11:07:23 -0500 Subject: [PATCH 1/4] Initial attempt to migrate to cc2, untested --- push_to_airflow.sh | 6 +-- ror_dag.py | 112 +++++++++++++++++++++++++++---------------- ror_scripts/fetch.py | 10 ++++ 3 files changed, 85 insertions(+), 43 deletions(-) diff --git a/push_to_airflow.sh b/push_to_airflow.sh index 0f5c4c0..a48d964 100755 --- a/push_to_airflow.sh +++ b/push_to_airflow.sh @@ -1,4 +1,4 @@ -gsutil cp ror_dag.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/ -gsutil -m cp -r ror_scripts gs://us-east1-production2023-cc1-01d75926-bucket/dags/ -gsutil -m cp -r schemas/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/ror/ +gsutil cp ror_dag.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/ +gsutil -m cp -r ror_scripts/* gs://airflow-data-exchange/ror/scripts/ +gsutil -m cp -r schemas/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/ror/ gsutil -m cp -r schemas gs://airflow-data-exchange/ror/ diff --git a/ror_dag.py b/ror_dag.py index 81342dc..3d9f524 100644 --- a/ror_dag.py +++ b/ror_dag.py @@ -1,42 +1,39 @@ import json -import os from airflow import DAG from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryCheckOperator from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator +from airflow.providers.google.cloud.operators.kubernetes_engine import ( + GKEStartPodOperator, +) from airflow.operators.python import PythonOperator -from airflow.hooks.base_hook import BaseHook -from airflow.providers.slack.operators.slack import SlackAPIPostOperator -from datetime import timedelta, datetime +from datetime import datetime -from dataloader.airflow_utils.slack import task_fail_slack_alert +from dataloader.airflow_utils.defaults import ( + DATA_BUCKET, + DAGS_DIR, + GCP_ZONE, + PROJECT_ID, + get_default_args, + get_post_success, +) from dataloader.scripts.populate_documentation import update_table_descriptions from ror_scripts.fetch import fetch -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2022, 1, 7), - "email": ["jennifer.melot@georgetown.edu"], - "email_on_failure": True, - "email_on_retry": True, - "retries": 1, - "retry_delay": timedelta(minutes=5), - "on_failure_callback": task_fail_slack_alert -} +args = get_default_args(pocs=["Jennifer"]) +args["retries"] = 1 +args["on_failure_callback"] = None with DAG("ror_updater", - default_args=default_args, + default_args=args, description="Links articles across our scholarly lit holdings.", schedule_interval="0 0 * * 5", catchup=False ) as dag: - slack_webhook = BaseHook.get_connection("slack") - bucket = "airflow-data-exchange" gcs_folder = "ror" tmp_dir = f"{gcs_folder}/tmp" raw_data_dir = f"{gcs_folder}/data" @@ -45,33 +42,73 @@ production_dataset = "gcp_cset_ror" staging_dataset = "staging_"+production_dataset backup_dataset = production_dataset+"_backups" - project_id = "gcp-cset-projects" - gce_zone = "us-east1-c" - dags_dir = os.environ.get("DAGS_FOLDER") # We keep several intermediate outputs in a tmp dir on gcs, so clean it out at the start of each run. We clean at # the start of the run so if the run fails we can examine the failed data clear_tmp_dir = GCSDeleteObjectsOperator( task_id="clear_tmp_gcs_dir", - bucket_name=bucket, + bucket_name=DATA_BUCKET, prefix=tmp_dir + "/" ) # Retrieve and expand the data json_loc = tmp_dir+"/ror.jsonl" - fetch = PythonOperator( - task_id="fetch", - op_kwargs={ - "output_bucket": bucket, - "output_loc": json_loc, + working_dir = "ror_working_dir" + setup_commands = f"rm -rf {working_dir};" + " && ".join( + [ + f"mkdir {working_dir}", + f"cd {working_dir}", + f"gsutil -m cp -r gs://{DATA_BUCKET}/{gcs_folder}/scripts/* .", + "virtualenv venv", + ". venv/bin/activate", + "python3 -m pip install google-cloud-storage zipfile", + ] + ) + download_data = GKEStartPodOperator( + task_id="download_data", + name="1790_er_download_data", + project_id=PROJECT_ID, + location=GCP_ZONE, + cluster_name="cc2-task-pool", + do_xcom_push=True, + cmds=["/bin/bash"], + arguments=[ + "-c", + ( + setup_commands + + f" && python3 fetch.py --output_bucket '{DATA_BUCKET}' --output_loc '{json_loc}'" + ), + ], + namespace="default", + image=f"gcr.io/{PROJECT_ID}/cc2-task-pool", + get_logs=True, + startup_timeout_seconds=300, + on_finish_action="delete_pod", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": [ + "default-pool", + ], + } + ] + } + ] + } + } }, - python_callable=fetch ) # Load into GCS load_staging = GCSToBigQueryOperator( task_id="load_staging", - bucket=bucket, + bucket=DATA_BUCKET, source_objects=[json_loc], schema_object=f"{schema_dir}/ror.json", destination_project_dataset_table=f"{staging_dataset}.ror", @@ -105,12 +142,12 @@ ) # Update descriptions - with open(f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/table_descriptions.json") as f: + with open(f"{DAGS_DIR}/schemas/{gcs_folder}/table_descriptions.json") as f: table_desc = json.loads(f.read()) pop_descriptions = PythonOperator( task_id="populate_column_documentation", op_kwargs={ - "input_schema": f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/ror.json", + "input_schema": f"{DAGS_DIR}/schemas/{gcs_folder}/ror.json", "table_name": f"{production_dataset}.ror", "table_description": table_desc["ror"] }, @@ -128,12 +165,7 @@ ) # Declare victory - success_alert = SlackAPIPostOperator( - task_id="post_success", - token=slack_webhook.password, - text="ROR update succeeded!", - channel=slack_webhook.login, - username="airflow" - ) + success_alert = get_post_success("ROR update succeeded!", dag) - clear_tmp_dir >> fetch >> load_staging >> checks >> load_production >> pop_descriptions >> backup >> success_alert + (clear_tmp_dir >> download_data >> load_staging >> checks >> load_production >> pop_descriptions >> backup >> + success_alert) diff --git a/ror_scripts/fetch.py b/ror_scripts/fetch.py index 2d8e18f..19ad21f 100644 --- a/ror_scripts/fetch.py +++ b/ror_scripts/fetch.py @@ -1,3 +1,4 @@ +import argparse import json import os import requests @@ -38,3 +39,12 @@ def fetch(output_bucket: str, output_loc: str) -> None: bucket = storage_client.bucket(output_bucket) blob = bucket.blob(output_loc) blob.upload_from_filename(output_file) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--output_bucket", help="GCS bucket where data should be written", required=True) + parser.add_argument("--output_loc", help="Blob name where data shuld be written", required=True) + args = parser.parse_args() + + fetch(args.output_bucket, args.output_loc) From d6f41c7a29ce39178428e113f2a5cb30d1846275 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Tue, 27 Feb 2024 12:04:28 -0500 Subject: [PATCH 2/4] Remove unused imports --- ror_dag.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ror_dag.py b/ror_dag.py index 3d9f524..dc8e502 100644 --- a/ror_dag.py +++ b/ror_dag.py @@ -20,7 +20,6 @@ get_post_success, ) from dataloader.scripts.populate_documentation import update_table_descriptions -from ror_scripts.fetch import fetch args = get_default_args(pocs=["Jennifer"]) @@ -61,7 +60,7 @@ f"gsutil -m cp -r gs://{DATA_BUCKET}/{gcs_folder}/scripts/* .", "virtualenv venv", ". venv/bin/activate", - "python3 -m pip install google-cloud-storage zipfile", + "python3 -m pip install google-cloud-storage", ] ) download_data = GKEStartPodOperator( From 6e18184bdcb53e6f7681af2729dae99e7a9ba8f0 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Tue, 27 Feb 2024 12:11:22 -0500 Subject: [PATCH 3/4] Add linters and run linting --- .flake8 | 5 ++ .github/workflows/main.yml | 76 +++++++++++++++++++++++++ .github/workflows/rebase-reminder.yml | 42 ++++++++++++++ .pre-commit-config.yaml | 37 ++++++++++++ pyproject.toml | 14 +++++ ror_dag.py | 82 ++++++++++++++++----------- ror_scripts/fetch.py | 18 ++++-- schemas/ror.json | 2 +- schemas/table_descriptions.json | 2 +- 9 files changed, 236 insertions(+), 42 deletions(-) create mode 100644 .flake8 create mode 100644 .github/workflows/main.yml create mode 100644 .github/workflows/rebase-reminder.yml create mode 100644 .pre-commit-config.yaml create mode 100644 pyproject.toml diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..619c1b7 --- /dev/null +++ b/.flake8 @@ -0,0 +1,5 @@ +[flake8] +ignore = E203, E266, E501, W503, F403, F401 +max-line-length = 120 +max-complexity = 20 +select = B,C,E,F,W,T4,B9 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..1b82d87 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,76 @@ +name: Python application + +on: [pull_request] + +jobs: + build: + name: tests-pass + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.7 + uses: actions/setup-python@v1 + with: + python-version: 3.7 +# - name: Install dependencies +# run: | +# python -m pip install --upgrade pip +# pip install -r requirements.txt +# - name: Test with pytest +# run: | +# coverage run -m pytest tests +# coverage xml -o coverage/python.xml +# - name: Report python coverage +# uses: orgoro/coverage@v3 +# with: +# coverageFile: coverage/python.xml +# token: ${{ secrets.GITHUB_TOKEN }} + # The next few steps only apply if you have javascript files + # - name: Setup node + # uses: actions/setup-node@v3 + # with: + # node-version: '18' + # - name: Test with jest + # shell: bash + # run: | + # npm install + # npm test -- --coverage --coverageReporters="json-summary" --coverageReporters="text" | tee ./coverage.txt + # shell: bash + # - name: Report javascript coverage + # uses: MishaKav/jest-coverage-comment@v1.0.20 + # with: + # title: "JavaScript Coverage" + # summary-title: "Summary" + # coverage-title: "Modified Files" + # github-token: ${{ secrets.GITHUB_TOKEN }} + # report-only-changed-files: true + # coverage-path: ./JS-FOLDER-NAME/coverage.txt + # coverage-summary-path: ./JS-FOLDER-NAME/coverage/coverage-summary.json + # coverage-path-prefix: JS-FOLDER-NAME/src/ + # - name: Build output files + # run: | + # npm run build + # - name: Check links in built files + # id: link_check + # run: | + # find public -name "*.js" -exec grep -Eo "(http|https):\/\/[^]\{\}\"'\\\(\)\> ]+" {} \; | sort -u > linklist.txt + # printf '%s\n%s\n%s\n' "# LinkChecker URL list" "# " "$(cat linklist.txt)" > linklist.txt + # linkchecker linklist.txt --check-extern --ignore-url="https://.*\.fastly\.net/.*" --ignore-url="https://.*\.mapbox\..*" --ignore-url=".*//a\W.*" --ignore-url="http://(a|x|ั‚ะตัั‚)" -o failures > output.txt || true + # cat output.txt + # echo "num_links=$(wc -l < output.txt | sed 's/^ *//g')" >> $GITHUB_OUTPUT + # echo "links<> $GITHUB_OUTPUT + # echo "$(cat output.txt)" >> $GITHUB_OUTPUT + # echo "EOFdelimiter" >> $GITHUB_OUTPUT + # - name: Edit PR comment about link checking + # if: steps.link_check.outputs.num_links > 0 + # uses: thollander/actions-comment-pull-request@v2 + # with: + # message: | + # There are ${{ steps.link_check.outputs.num_links }} broken links. Check the code for these links: + # ${{ steps.link_check.outputs.links }} + # comment_tag: link_check_msg + - name: Run linting + run: | + pip install pre-commit + pre-commit run --all-files diff --git a/.github/workflows/rebase-reminder.yml b/.github/workflows/rebase-reminder.yml new file mode 100644 index 0000000..0c30392 --- /dev/null +++ b/.github/workflows/rebase-reminder.yml @@ -0,0 +1,42 @@ +name: Rebase reminder +on: [pull_request, pull_request_review] + +jobs: + build: + name: rebuild-reminder + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Find behind count + id: behind_count + run: | + echo "behind_count=$(git rev-list --count ${{ github.event.pull_request.head.sha }}..${{ github.event.pull_request.base.sha }})" >> $GITHUB_OUTPUT + - name: Find ahead count + id: ahead_count + run: | + echo "ahead_count=$(git rev-list --count ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }})" >> $GITHUB_OUTPUT + - name: Find combined count + id: combined_count + run: | + echo "combined_count=$(expr ${{steps.behind_count.outputs.behind_count}} + ${{steps.ahead_count.outputs.ahead_count}})" >> $GITHUB_OUTPUT + - name: Edit PR comment - rebasing + if: steps.behind_count.outputs.behind_count > 0 && steps.combined_count.outputs.combined_count > 3 + uses: thollander/actions-comment-pull-request@v1 + with: + message: | + Needs rebasing :bangbang: + behind_count is ${{ steps.behind_count.outputs.behind_count }} + ahead_count is ${{ steps.ahead_count.outputs.ahead_count }} + comment_includes: 'rebasing' + - name: Edit PR comment - no rebasing + if: steps.behind_count.outputs.behind_count == 0 || steps.combined_count.outputs.combined_count <= 3 + uses: thollander/actions-comment-pull-request@v1 + with: + message: | + No need for rebasing :+1: + behind_count is ${{ steps.behind_count.outputs.behind_count }} + ahead_count is ${{ steps.ahead_count.outputs.ahead_count }} + comment_includes: 'rebasing' diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..2b0e452 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,37 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.1.0 + hooks: + - id: trailing-whitespace + exclude: "__snapshots__" + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files + - id: check-json +# The next step only applies if you have javascript files. +# There should be a package.json that installs eslint +# (or eslint-config-react-app if you are using gatsby). +# - repo: https://github.com/pre-commit/mirrors-eslint +# rev: v8.24.0 +# hooks: +# - id: eslint +- repo: https://github.com/PyCQA/isort + rev: 5.11.5 + hooks: + - id: isort +- repo: https://github.com/ambv/black + rev: 22.3.0 + hooks: + - id: black + language_version: python3 +- repo: https://github.com/PyCQA/flake8 + rev: 4.0.1 + hooks: + - id: flake8 +#- repo: https://github.com/sqlfluff/sqlfluff +# rev: 0.10.1 +# hooks: +# - id: sqlfluff-lint +# - id: sqlfluff-fix diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..43139b7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,14 @@ +[tool.black] +py36 = true +include = '\.pyi?$' +exclude = ''' +/( +\.git +| \.venv +| build +| dist +)/ +''' + +[tool.isort] +profile = "black" diff --git a/ror_dag.py b/ror_dag.py index dc8e502..7f45b8f 100644 --- a/ror_dag.py +++ b/ror_dag.py @@ -1,19 +1,25 @@ import json +from datetime import datetime from airflow import DAG -from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryCheckOperator -from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator +from airflow.operators.python import PythonOperator +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCheckOperator, + BigQueryInsertJobOperator, +) from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator -from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator from airflow.providers.google.cloud.operators.kubernetes_engine import ( GKEStartPodOperator, ) -from airflow.operators.python import PythonOperator -from datetime import datetime - +from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import ( + BigQueryToBigQueryOperator, +) +from airflow.providers.google.cloud.transfers.gcs_to_bigquery import ( + GCSToBigQueryOperator, +) from dataloader.airflow_utils.defaults import ( - DATA_BUCKET, DAGS_DIR, + DATA_BUCKET, GCP_ZONE, PROJECT_ID, get_default_args, @@ -21,37 +27,35 @@ ) from dataloader.scripts.populate_documentation import update_table_descriptions - args = get_default_args(pocs=["Jennifer"]) args["retries"] = 1 args["on_failure_callback"] = None -with DAG("ror_updater", - default_args=args, - description="Links articles across our scholarly lit holdings.", - schedule_interval="0 0 * * 5", - catchup=False - ) as dag: +with DAG( + "ror_updater", + default_args=args, + description="Links articles across our scholarly lit holdings.", + schedule_interval="0 0 * * 5", + catchup=False, +) as dag: gcs_folder = "ror" tmp_dir = f"{gcs_folder}/tmp" raw_data_dir = f"{gcs_folder}/data" schema_dir = f"{gcs_folder}/schemas" sql_dir = f"sql/{gcs_folder}" production_dataset = "gcp_cset_ror" - staging_dataset = "staging_"+production_dataset - backup_dataset = production_dataset+"_backups" + staging_dataset = "staging_" + production_dataset + backup_dataset = production_dataset + "_backups" # We keep several intermediate outputs in a tmp dir on gcs, so clean it out at the start of each run. We clean at # the start of the run so if the run fails we can examine the failed data clear_tmp_dir = GCSDeleteObjectsOperator( - task_id="clear_tmp_gcs_dir", - bucket_name=DATA_BUCKET, - prefix=tmp_dir + "/" + task_id="clear_tmp_gcs_dir", bucket_name=DATA_BUCKET, prefix=tmp_dir + "/" ) # Retrieve and expand the data - json_loc = tmp_dir+"/ror.jsonl" + json_loc = tmp_dir + "/ror.jsonl" working_dir = "ror_working_dir" setup_commands = f"rm -rf {working_dir};" + " && ".join( [ @@ -113,7 +117,7 @@ destination_project_dataset_table=f"{staging_dataset}.ror", source_format="NEWLINE_DELIMITED_JSON", create_disposition="CREATE_IF_NEEDED", - write_disposition="WRITE_TRUNCATE" + write_disposition="WRITE_TRUNCATE", ) # Check that the number of ids is >= what we have in production and that the ids are unique @@ -121,23 +125,25 @@ BigQueryCheckOperator( task_id="check_unique_ids", sql=(f"select count(distinct(id)) = count(id) from {staging_dataset}.ror"), - use_legacy_sql=False + use_legacy_sql=False, ), BigQueryCheckOperator( task_id="check_monotonic_increase", - sql=(f"select (select count(0) from {staging_dataset}.ror) >= " - f"(select count(0) from {production_dataset}.ror)"), - use_legacy_sql=False - ) + sql=( + f"select (select count(0) from {staging_dataset}.ror) >= " + f"(select count(0) from {production_dataset}.ror)" + ), + use_legacy_sql=False, + ), ] # Load into production load_production = BigQueryToBigQueryOperator( - task_id=f"load_production", + task_id="load_production", source_project_dataset_tables=[f"{staging_dataset}.ror"], destination_project_dataset_table=f"{production_dataset}.ror", create_disposition="CREATE_IF_NEEDED", - write_disposition="WRITE_TRUNCATE" + write_disposition="WRITE_TRUNCATE", ) # Update descriptions @@ -148,23 +154,31 @@ op_kwargs={ "input_schema": f"{DAGS_DIR}/schemas/{gcs_folder}/ror.json", "table_name": f"{production_dataset}.ror", - "table_description": table_desc["ror"] + "table_description": table_desc["ror"], }, - python_callable=update_table_descriptions + python_callable=update_table_descriptions, ) # Copy to backups curr_date = datetime.now().strftime("%Y%m%d") backup = BigQueryToBigQueryOperator( - task_id=f"snapshot_ror", + task_id="snapshot_ror", source_project_dataset_tables=[f"{production_dataset}.ror"], destination_project_dataset_table=f"{backup_dataset}.ror_{curr_date}", create_disposition="CREATE_IF_NEEDED", - write_disposition="WRITE_TRUNCATE" + write_disposition="WRITE_TRUNCATE", ) # Declare victory success_alert = get_post_success("ROR update succeeded!", dag) - (clear_tmp_dir >> download_data >> load_staging >> checks >> load_production >> pop_descriptions >> backup >> - success_alert) + ( + clear_tmp_dir + >> download_data + >> load_staging + >> checks + >> load_production + >> pop_descriptions + >> backup + >> success_alert + ) diff --git a/ror_scripts/fetch.py b/ror_scripts/fetch.py index 19ad21f..d7f4638 100644 --- a/ror_scripts/fetch.py +++ b/ror_scripts/fetch.py @@ -1,11 +1,11 @@ import argparse import json import os -import requests import tempfile +from zipfile import ZipFile +import requests from google.cloud import storage -from zipfile import ZipFile def fetch(output_bucket: str, output_loc: str) -> None: @@ -16,7 +16,9 @@ def fetch(output_bucket: str, output_loc: str) -> None: :param output_loc: blob name where data should be written on GCS :return: None """ - resp = requests.get("https://zenodo.org/api/records/?communities=ror-data&sort=mostrecent") + resp = requests.get( + "https://zenodo.org/api/records/?communities=ror-data&sort=mostrecent" + ) dataset_js = resp.json() latest_delivery = dataset_js["hits"]["hits"][0]["files"][0]["links"]["self"] zip_resp = requests.get(latest_delivery) @@ -34,7 +36,7 @@ def fetch(output_bucket: str, output_loc: str) -> None: js = json.loads(f.read()) with open(output_file, mode="w") as out: for elt in js: - out.write(json.dumps(elt)+"\n") + out.write(json.dumps(elt) + "\n") storage_client = storage.Client() bucket = storage_client.bucket(output_bucket) blob = bucket.blob(output_loc) @@ -43,8 +45,12 @@ def fetch(output_bucket: str, output_loc: str) -> None: if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--output_bucket", help="GCS bucket where data should be written", required=True) - parser.add_argument("--output_loc", help="Blob name where data shuld be written", required=True) + parser.add_argument( + "--output_bucket", help="GCS bucket where data should be written", required=True + ) + parser.add_argument( + "--output_loc", help="Blob name where data shuld be written", required=True + ) args = parser.parse_args() fetch(args.output_bucket, args.output_loc) diff --git a/schemas/ror.json b/schemas/ror.json index 621b564..614c256 100644 --- a/schemas/ror.json +++ b/schemas/ror.json @@ -461,4 +461,4 @@ "name": "external_ids", "type": "RECORD" } -] \ No newline at end of file +] diff --git a/schemas/table_descriptions.json b/schemas/table_descriptions.json index 1ae8ef7..79be848 100644 --- a/schemas/table_descriptions.json +++ b/schemas/table_descriptions.json @@ -1,3 +1,3 @@ { "ror": "https://ror.org" -} \ No newline at end of file +} From 887515ce54dce81b5687c8c868590604293739d4 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Tue, 27 Feb 2024 12:13:12 -0500 Subject: [PATCH 4/4] Re-enable failure notifications --- ror_dag.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ror_dag.py b/ror_dag.py index 7f45b8f..938364c 100644 --- a/ror_dag.py +++ b/ror_dag.py @@ -29,7 +29,6 @@ args = get_default_args(pocs=["Jennifer"]) args["retries"] = 1 -args["on_failure_callback"] = None with DAG(