From fa3b1337419d827091af81d0466f6e869cfac9db Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Tue, 17 Dec 2024 15:40:10 +0000 Subject: [PATCH] #1106 only notify on Mondays --- dags/wys_pull.py | 17 +++++++---------- wys/api/readme.md | 6 +++--- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/dags/wys_pull.py b/dags/wys_pull.py index 71aa73c50..720dc55ae 100644 --- a/dags/wys_pull.py +++ b/dags/wys_pull.py @@ -159,11 +159,7 @@ def pull_schedules(): @task_group() def read_google_sheets_tg(): - @task.short_circuit(ignore_downstream_trigger_rules=False, retries=0) #only skip immediately downstream task - def check_if_monday(ds=None): - check_if_dow(1, ds) - - @task(pre_execute = lambda context: check_if_dow(1, context['ds'])) + @task def read_masterlist(**context): wys_postgres = PostgresHook("wys_bot") ward_list = [] @@ -176,7 +172,7 @@ def read_masterlist(**context): @task( retries = 1, - on_failure_callback = None, + on_failure_callback = None, #downstream tasks report failures map_index_template="{{ ward_no }}", doc_md="Reads an individual google sheet and inserts signs into the database. Failures from the mapped tasks are consolidated in follow-up tasks." ) @@ -195,7 +191,8 @@ def read_google_sheet(ward, **context): with wys_postgres.get_conn() as conn: if not pull_from_sheet(conn, service, ward, context): return ward[3] - + + @task.run_if(lambda context: check_if_dow(1, context['ds'])) #only notify on Mondays @task( retries=0, trigger_rule='all_done', @@ -215,6 +212,7 @@ def status_msg_rows(wards, **context): ti.xcom_push(key="extra_msg", value=extra_msg) raise AirflowFailException('Failed to pull some rows.') + @task.run_if(lambda context: check_if_dow(1, context['ds'])) #only notify on Mondays @task( retries=0, trigger_rule='all_done', @@ -235,13 +233,12 @@ def status_msg_sheets(**context): wards=read_masterlist() [ - check_if_monday() >> - read_google_sheet.expand(ward=wards) >> [ + read_google_sheet.expand(ward=wards) >> + [ status_msg_rows(wards=wards), status_msg_sheets() ] ] - check_partitions() >> api_pull() >> agg_speed_counts_hr() >> t_done >> data_checks() pull_schedules() read_google_sheets_tg() diff --git a/wys/api/readme.md b/wys/api/readme.md index aa3548f7e..2eeaab9b7 100644 --- a/wys/api/readme.md +++ b/wys/api/readme.md @@ -392,9 +392,9 @@ This task group contains red card data checks that may require the pipeline to b **`read_google_sheets_tg`** This task group reads data from the mobile sign installation google sheets. - `read_masterlist`: pulls the list of google sheets from the database `wys.ward_masterlist` table. - - `read_google_sheet`: mapped over the output of `read_masterlist`; each task reads an individual google sheet. - - `status_msg_rows`: reports any row failures from `read_google_sheet` mapped tasks. - - `status_msg_sheets`: reports any sheet failures from `read_google_sheet` mapped tasks. + - `read_google_sheet`: mapped over the output of `read_masterlist`; each task reads an individual google sheet. + - `status_msg_rows`: reports any row failures from `read_google_sheet` mapped tasks. Runs only on Mondays. + - `status_msg_sheets`: reports any sheet failures from `read_google_sheet` mapped tasks. Runs only on Mondays. `read_google_sheets`: Pulls mobile sign details from the Google Sheets. See more details under [`wys.mobile_sign_installations`](#wysmobile_sign_installations)