Skip to content

Commit

Permalink
Merge branch 'master' into 1116-allow-manual-gcc_pull_layers-using-ai…
Browse files Browse the repository at this point in the history
…rflow-params
  • Loading branch information
gabrielwol authored Feb 6, 2025
2 parents 04efe58 + b3fa447 commit b7cf220
Show file tree
Hide file tree
Showing 48 changed files with 1,918 additions and 132 deletions.
1 change: 1 addition & 0 deletions dags/assets_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ def pull_traffic_signal():
default_args=DEFAULT_ARGS,
max_active_runs=1,
template_searchpath=[os.path.join(AIRFLOW_ROOT, 'assets/rlc/airflow/tasks')],
tags=["bdit_data-sources", "data_pull", "traffic_signals"],
schedule='0 4 * * 1-5')
# minutes past each hour | Hours (0-23) | Days of the month (1-31) | Months (1-12) | Days of the week (0-7, Sunday represented as either/both 0 and 7)

Expand Down
2 changes: 1 addition & 1 deletion dags/citywide_tti_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
default_args=default_args,
schedule=None, # gets triggered by HERE dag
doc_md = doc_md,
tags=["HERE"],
tags=["HERE", "aggregation"],
catchup=False
)

Expand Down
2 changes: 1 addition & 1 deletion dags/ecocounter_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
schedule='5 10 * * MON', # Run at 10:05 AM on Monday
catchup=True,
template_searchpath=os.path.join(repo_path, 'volumes/ecocounter/data_checks'),
tags=["ecocounter", "data_checks"],
tags=["ecocounter", "data_checks", "weekly"],
doc_md=DOC_MD
)
def ecocounter_check_dag():
Expand Down
228 changes: 228 additions & 0 deletions dags/ecocounter_open_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
r"""### Monthly ecocounter Open Data DAG
Pipeline to run monthly ecocounter aggregations for Open Data.
"""
import sys
import os
from datetime import timedelta
import logging
import pendulum
from functools import partial

from airflow.decorators import dag, task, task_group
from airflow.models import Variable
from airflow.hooks.base import BaseHook
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.sensors.date_time import DateTimeSensor
from airflow.macros import ds_format
from airflow.operators.python import get_current_context

try:
repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.insert(0, repo_path)
from dags.dag_functions import task_fail_slack_alert, send_slack_msg, get_readme_docmd
from dags.custom_operators import SQLCheckOperatorWithReturnValue
except ModuleNotFoundError:
raise ImportError("Cannot import DAG helper functions.")
except ImportError:
raise ImportError("Cannot import DAG helper functions.")

LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

DAG_NAME = 'ecocounter_open_data'
DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"])

README_PATH = os.path.join(repo_path, 'volumes/ecocounter/readme.md')
DOC_MD = get_readme_docmd(README_PATH, DAG_NAME)
EXPORT_PATH = '/home/airflow/open_data/permanent-bike-counters' #'/data/open_data/permanent-bike-counters'

default_args = {
'owner': ','.join(DAG_OWNERS),
'depends_on_past':False,
#set earlier start_date + catchup when ready?
'start_date': pendulum.datetime(2024, 1, 1, tz="America/Toronto"),
'email_on_failure': False,
'email_on_success': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': partial(task_fail_slack_alert, use_proxy = True),
}

@dag(
dag_id=DAG_NAME,
default_args=default_args,
schedule='0 12 1 * *', # 12pm, 1st day of each month
template_searchpath=os.path.join(repo_path,'volumes/ecocounter'),
catchup=False,
max_active_runs=1,
tags=["bdit_data-sources", "ecocounter", "open_data"],
doc_md=DOC_MD
)
def ecocounter_open_data_dag():

check_data_availability = SQLCheckOperatorWithReturnValue(
task_id="check_data_availability",
sql="data_checks/select-data-availability.sql",
conn_id="ecocounter_bot"
)

@task(retries=0, doc_md="""A reminder message.""")
def reminder_message(ds = None, **context):
mnth = ds_format(ds, '%Y-%m-%d', '%Y-%m')
slack_ids = Variable.get("slack_member_id", deserialize_json=True)
list_names = " ".join([slack_ids.get(name, name) for name in DAG_OWNERS])

send_slack_msg(
context=context,
msg=f"{list_names} Remember to check Ecocounter :open_data_to: for {mnth} and label any sites pending validation in anomalous_ranges. :meow_detective:",
use_proxy=True
)

wait_till_10th = DateTimeSensor(
task_id="wait_till_10th",
timeout=10*86400,
mode="reschedule",
poke_interval=3600*24,
target_time="{{ next_execution_date.replace(day=10) }}",
)
wait_till_10th.doc_md = """
Wait until the 10th day of the month to export data. Alternatively mark task as success to proceed immediately.
"""

@task()
def get_years(ds=None):
mnth = pendulum.from_format(ds, 'YYYY-MM-DD')
prev_mnth = mnth.subtract(months=1)
yrs = [mnth.year, prev_mnth.year]
return list(set(yrs)) #unique

update_locations = SQLExecuteQueryOperator(
sql=f"SELECT ecocounter.open_data_locations_insert()",
task_id='update_locations',
conn_id='ecocounter_bot',
autocommit=True,
retries = 0
)

@task_group()
def insert_and_download_data(yr):
@task(map_index_template="{{ yr }}")
def insert_daily(yr):
context = get_current_context()
context["yr"] = yr
t = SQLExecuteQueryOperator(
sql=f"SELECT ecocounter.open_data_daily_counts_insert({yr}::int)",
task_id='insert_daily_open_data',
conn_id='ecocounter_bot',
autocommit=True,
retries = 0
)
return t.execute(context=context)

@task(map_index_template="{{ yr }}")
def insert_15min(yr):
context = get_current_context()
context["yr"] = yr
t = SQLExecuteQueryOperator(
sql=f"SELECT ecocounter.open_data_15min_counts_insert({yr}::int)",
task_id='insert_15min_open_data',
conn_id='ecocounter_bot',
autocommit=True,
retries = 0
)
return t.execute(context=context)

@task.bash(
env = {
'HOST': '{{ conn.ecocounter_bot.host }}',
'LOGIN': '{{ conn.ecocounter_bot.login }}',
'PGPASSWORD': '{{ conn.ecocounter_bot.password }}',
'EXPORT_PATH': EXPORT_PATH
}
)
def download_daily_open_data()->str:
return '''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \
"SELECT
location_dir_id, location_name, direction, linear_name_full,
side_street, dt, daily_volume
FROM open_data.cycling_permanent_counts_daily
WHERE dt < LEAST(date_trunc('month', now()))
ORDER BY location_dir_id, dt;" \
--csv -o "$EXPORT_PATH/cycling_permanent_counts_daily.csv"'''

@task.bash(
env = {
'HOST': '{{ conn.ecocounter_bot.host }}',
'LOGIN': '{{ conn.ecocounter_bot.login }}',
'PGPASSWORD': '{{ conn.ecocounter_bot.password }}',
'EXPORT_PATH': EXPORT_PATH
},
map_index_template="{{ yr }}"
)
def download_15min_open_data(yr)->str:
context = get_current_context()
context["yr"] = yr
return f'''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \
"SELECT location_dir_id, datetime_bin, bin_volume
FROM open_data.cycling_permanent_counts_15min
WHERE
datetime_bin >= to_date({yr}::text, 'yyyy')
AND datetime_bin < LEAST(date_trunc('month', now()), to_date(({yr}+1)::text, 'yyyy'))
ORDER BY location_dir_id, datetime_bin;" \
--csv -o "$EXPORT_PATH/cycling_permanent_counts_15min_{yr}_{yr+1}.csv"'''

#insert only latest year data, but download everything (single file)
insert_daily(yr) >> download_daily_open_data()
insert_15min(yr) >> download_15min_open_data(yr)

@task.bash(
env = {
'HOST': '{{ conn.ecocounter_bot.host }}',
'LOGIN': '{{ conn.ecocounter_bot.login }}',
'PGPASSWORD': '{{ conn.ecocounter_bot.password }}',
'EXPORT_PATH': EXPORT_PATH
}
)
def download_locations_open_data()->str:
return '''/usr/bin/psql -h $HOST -U $LOGIN -d bigdata -c \
"SELECT location_dir_id, location_name, direction, linear_name_full, side_street,
longitude, latitude, centreline_id, bin_size, latest_calibration_study,
first_active, last_active, date_decommissioned, technology
FROM open_data.cycling_permanent_counts_locations
ORDER BY location_dir_id;" \
--csv -o "$EXPORT_PATH/cycling_permanent_counts_locations.csv"'''

@task.bash()
def output_readme()->str:
source='/home/airflow/data_scripts/volumes/open_data/sql/cycling_permanent_counts_readme.md'
dest='cycling_permanent_counts_readme.pdf'
return f'''pandoc -V geometry:margin=1in -o "{EXPORT_PATH}/{dest}" "{source}"'''

@task(
retries=0,
trigger_rule='all_success',
doc_md="""A status message to report DAG success."""
)
def status_message(ds = None, **context):
mnth = ds_format(ds, '%Y-%m-%d', '%Y-%m-01')
send_slack_msg(
context=context,
msg=f"Ecocounter :open_data_to: DAG ran successfully for {mnth} :white_check_mark:. "
f"Remember to `cp {EXPORT_PATH}/* /data/open_data/permanent-bike-counters` as bigdata.",
use_proxy=True
)

yrs = get_years()
(
check_data_availability >>
reminder_message() >>
wait_till_10th >>
update_locations >> [
insert_and_download_data.expand(yr = yrs),
download_locations_open_data(),
output_readme()
] >>
status_message()
)

ecocounter_open_data_dag()
6 changes: 3 additions & 3 deletions dags/ecocounter_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ def update_sites_and_flows(**context):
new_sites, new_flows = [], []
with eco_postgres.get_conn() as conn:
for site in getSites(token):
site_id, site_name = site['id'], site['name']
site_id, site_name, counter = site['id'], site['name'], site['counter']
if not siteIsKnownToUs(site_id, conn):
insertSite(conn, site_id, site_name, site['longitude'], site['latitude'])
insertSite(conn, site_id, site_name, counter, site['longitude'], site['latitude'])
new_sites.append({
'site_id': site_id,
'site_name': site_name
Expand Down Expand Up @@ -178,7 +178,7 @@ def pull_recent_outages():
)
def data_checks():
data_check_params = {
"table": "ecocounter.counts",
"table": "ecocounter.counts_unfiltered",
"lookback": '60 days',
"dt_col": 'datetime_bin',
"threshold": 0.7
Expand Down
2 changes: 1 addition & 1 deletion dags/gcc_layers_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def create_gcc_puller_dag(dag_id, default_args, name, conn_id):
dag_id=dag_id,
default_args=default_args,
catchup=False,
tags=['gcc', name],
params={
"layer_name": Param(
default='',
Expand Down Expand Up @@ -65,6 +64,7 @@ def create_gcc_puller_dag(dag_id, default_args, name, conn_id):
examples=['gis', 'gis_core'],
)
},
tags=["bdit_data-sources", "gcc", name, "quarterly"],
schedule='0 7 1 */3 *' #'@quarterly'
)
def gcc_layers_dag():
Expand Down
1 change: 1 addition & 0 deletions dags/log_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def create_dag(filepath, doc, start_date, schedule_interval):
# Prevent the same DAG from running concurrently more than once.
max_active_runs=1,
schedule=schedule_interval,
tags=['bdit_data-sources', 'maintenance'],
# This allows us to simplify `create_bash_task` below.
template_searchpath=AIRFLOW_TASKS
)
Expand Down
2 changes: 1 addition & 1 deletion dags/miovision_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
schedule='0 4 * * MON', # Run at 4 AM on Monday
catchup=False,
template_searchpath=os.path.join(repo_path,'volumes/miovision/sql/data_checks'),
tags=["miovision", "data_checks"],
tags=["miovision", "data_checks", "weekly"],
doc_md=DOC_MD
)
def miovision_check_dag():
Expand Down
6 changes: 2 additions & 4 deletions dags/miovision_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import os
import pendulum
from datetime import timedelta
import configparser
import dateutil.parser

from airflow.decorators import dag, task, task_group
from airflow.models.param import Param
Expand Down Expand Up @@ -91,7 +89,7 @@ def check_partitions():
sql="""SELECT miovision_api.create_mm_nested_volumes_partitions('volumes'::text, '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, '{{ macros.ds_format(ds, '%Y-%m-%d', '%m') }}'::int)""",
conn_id='miovision_api_bot',
autocommit=True,
trigger_rule='none_failed_min_one_success'
trigger_rule='none_failed'
)

create_annual_partition >> create_month_partition
Expand Down Expand Up @@ -150,7 +148,7 @@ def aggregate_15_min_mvt_task(ds = None, **context):
intersections = get_intersection_info(conn, intersection=INTERSECTIONS)
aggregate_15_min_mvt(conn, time_period=time_period, intersections=intersections)

@task
@task(depends_on_past=True)
def zero_volume_anomalous_ranges_task(ds = None, **context):
mio_postgres = PostgresHook("miovision_api_bot")
time_period = (ds, ds_add(ds, 1))
Expand Down
2 changes: 1 addition & 1 deletion dags/tti_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
default_args=default_args,
schedule=None, # gets triggered by HERE dag
doc_md = doc_md,
tags=["HERE"],
tags=["HERE", "aggregation"],
catchup=False
)

Expand Down
2 changes: 1 addition & 1 deletion dags/vds_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
schedule='0 5 * * FRI', # Run at 5 AM on Friday
catchup=False,
template_searchpath=os.path.join(repo_path,'volumes/vds/sql/select'),
tags=["vds", "data_checks"],
tags=["bdit_data-sources", "vds", "data_checks", "weekly"],
doc_md=DOC_MD
)
def vds_check_dag():
Expand Down
2 changes: 1 addition & 1 deletion dags/vds_pull_vdsdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
os.path.join(repo_path,'dags/sql')
],
doc_md=DOC_MD,
tags=['vds', 'vdsdata', 'data_checks', 'pull', 'detector_inventory'],
tags=["bdit_data-sources", 'vds', 'vdsdata', 'data_checks', 'data_pull', 'detector_inventory'],
schedule='0 4 * * *' #daily at 4am
)
def vdsdata_dag():
Expand Down
2 changes: 1 addition & 1 deletion dags/vds_pull_vdsvehicledata.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
os.path.join(repo_path,'dags/sql')
],
doc_md=DOC_MD,
tags=['vds', 'vdsvehicledata', 'data_checks', 'pull'],
tags=["bdit_data-sources", 'vds', 'vdsvehicledata', 'data_checks', 'data_pull'],
schedule='5 4 * * *' #daily at 4:05am
)
def vdsvehicledata_dag():
Expand Down
Loading

0 comments on commit b7cf220

Please sign in to comment.