Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 5 new feature road classes to centreline #1157

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions dags/gcc_layers_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ def create_gcc_puller_dag(dag_id, default_args, name, conn_id):
title="is_audited",
description="Is the layer audited?",
),
"include_additional_feature": Param(
default=False,
type="boolean",
title="Include additional feature",
description="Flag to include additional feature type",
),
"layer_id": Param(
default=0,
type="integer",
Expand All @@ -50,7 +56,8 @@ def create_gcc_puller_dag(dag_id, default_args, name, conn_id):
type="integer",
title="mapserver",
description="mapserver for custom pull. Example: 0",
),"pk": Param(
),
"pk": Param(
default='',
type="string",
title="Primary Key.",
Expand Down Expand Up @@ -81,7 +88,8 @@ def get_layers(name, **context):
'layer_id': context["params"]["layer_id"],
'mapserver': context["params"]["mapserver"],
'pk': context["params"]["pk"],
'schema_name': context["params"]["schema_name"]
'schema_name': context["params"]["schema_name"],
'include_additional_feature':context["params"]["include_additional_feature"],
})
return dict({layer_name:layer})

Expand All @@ -93,17 +101,17 @@ def pull_layer(layer, conn_id):
context["table_name"] = layer[0]
#get db connection
conn = PostgresHook(conn_id).get_conn()

print(layer[1].get("include_additional_feature"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

#pull and insert layer
get_layer(
mapserver_n = layer[1].get("mapserver"),
layer_id = layer[1].get("layer_id"),
schema_name = layer[1].get("schema_name"),
is_audited = layer[1].get("is_audited"),
include_additional_feature = layer[1].get("include_additional_feature"),
primary_key = layer[1].get("pk"),
con = conn
)

#refresh mat views as necessary
agg_sql = layer[1].get("agg")
if agg_sql is not None:
Expand Down
39 changes: 26 additions & 13 deletions gis/centreline/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ The centreline data are used by many other groups in the City and it's often imp

## How It's Structured

* The `gis_core.centreline_latest` Materialized View contains the latest set of lines with unique id `centreline_id`. These lines are undirected. All edges have _from_ and _to_ nodes, though this should not be taken to indicate that edges are directed. For a directed centreline layer, check out `gis_core.routing_centreline_directional` ([see more](#centreline-segments-edges)) which has the necessary schema to be used in pg_routing.
* The `gis_core.centreline_latest` Materialized View contains the latest set of lines for road classes that are relevant to transportation. It includes all road classification but **excludes** `Trail` and `Buslane`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Busway

* The `gis_core.centreline_latest_all_feature` Materialized View contains the latest set of lines, **including all features**.
* The `centreline_intersection_point_latest` Materialized View contains the latest set of unique intersections with unique id `intersection_id`. These are any location where two lines intersect, not strictly intersections in the transportation sense ([see more](#intersections-nodes))

## Where It's Stored
Expand All @@ -23,21 +24,33 @@ Centreline data are stored in the `gis_core` schema in the `bigdata` database. B

### Centreline Segments (edges)

Segments are stored in the partitioned table `gis_core.centreline`. The latest version of centreline can be access through this materialized view `gis_core.centreline_latest`.
Segments are stored in the partitioned table `gis_core.centreline`. These lines are undirected. All edges have _from_ and _to_ nodes, though this should not be taken to indicate that edges are directed. For a directed centreline layer, check out `gis_core.routing_centreline_directional` ([see more](#centreline-segments-edges)) which has the necessary schema to be used in pg_routing.

Currently we are including only the following types:

* 'Expressway'
* 'Expressway Ramp'
* 'Major Arterial'
* 'Major Arterial Ramp'
* 'Minor Arterial'
* 'Minor Arterial Ramp'
* 'Collector'
* 'Collector Ramp'
* 'Local'
* 'Pending'
* 'Other' (version >= `2024-02-19`)
> [!IMPORTANT]
> **2025-02-24**: Added `Buslane`, `Trail`, `Access Road`, `Other Ramp`, and `Laneway`, in order to ensure consistency with MOVE.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Busway

>
> **2024-02-19**: Added `Other`.

| Feature Type | Included in `centreline_latest` | Included in `centreline_latest_all_feature` |
|----------------------|--------------------------------|----------------------------------|
| Expressway | ✅ | ✅ |
| Expressway Ramp | ✅ | ✅ |
| Major Arterial | ✅ | ✅ |
| Major Arterial Ramp | ✅ | ✅ |
| Minor Arterial | ✅ | ✅ |
| Minor Arterial Ramp | ✅ | ✅ |
| Collector | ✅ | ✅ |
| Collector Ramp | ✅ | ✅ |
| Local | ✅ | ✅ |
| Pending | ✅ | ✅ |
| Other (added `2024-02-19`) | ✅ | ✅ |
| Buslane (added `2025-02-24`) | ❌ | ✅ |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Busway

| Access Road (added `2025-02-24`) | ✅ | ✅ |
| Trail (added `2025-02-24`) | ❌ | ✅ |
| Other Ramp (added `2025-02-24`) | ✅ | ✅ |
| Laneway (added `2025-02-24`) | ✅ | ✅ |

#### Directionality

Expand Down
17 changes: 16 additions & 1 deletion gis/centreline/sql/create_matview_centreline_latest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,24 @@
FROM gis_core.centreline
WHERE
version_date = (
SELECT MAX(version_date)

Check failure on line 7 in gis/centreline/sql/create_matview_centreline_latest.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

RF02: Unqualified reference 'version_date' found in select with more than one referenced table/view.
FROM gis_core.centreline
);
)
AND feature_code_desc IN (
'Expressway',
'Expressway Ramp',
'Major Arterial',
'Major Arterial Ramp',
'Minor Arterial',
'Minor Arterial Ramp',
'Collector',
'Collector Ramp',
'Local',
'Access Road',
'Other',
'Other Ramp',
'Laneway',
'Pending');

Check failure on line 24 in gis/centreline/sql/create_matview_centreline_latest.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

LT02: Expected line break and indent of 4 spaces before ')'.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a mat view comment specifying which layers are left out?


CREATE TRIGGER refresh_trigger
AFTER INSERT OR UPDATE OR DELETE
Expand All @@ -17,7 +32,7 @@
CREATE INDEX gis_core_centreline_latest_geom ON gis_core.centreline_latest USING gist (geom);

CREATE UNIQUE INDEX centreline_latest_unique
ON gis_core.centreline_latest USING btree(

Check notice on line 35 in gis/centreline/sql/create_matview_centreline_latest.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

LT01: Expected single whitespace between naked identifier and start bracket '('.
centreline_id ASC
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE MATERIALIZED VIEW gis_core.centreline_latest_all_feature AS

SELECT *
FROM gis_core.centreline
WHERE
version_date = (
SELECT MAX(version_date)

Check failure on line 7 in gis/centreline/sql/create_matview_centreline_latest_all_feature.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

RF02: Unqualified reference 'version_date' found in select with more than one referenced table/view.
FROM gis_core.centreline
);


CREATE INDEX gis_core_centreline_latest_all_feature_geom ON gis_core.centreline_latest_all_feature USING gist (geom);

Check notice on line 12 in gis/centreline/sql/create_matview_centreline_latest_all_feature.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

LT05: Line is too long (117 > 100).

CREATE UNIQUE INDEX centreline_latest_all_feature_unique
ON gis_core.centreline_latest_all_feature USING btree(

Check notice on line 15 in gis/centreline/sql/create_matview_centreline_latest_all_feature.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

LT01: Expected single whitespace between naked identifier and start bracket '('.
centreline_id ASC
);

ALTER MATERIALIZED VIEW gis_core.centreline_latest_all_feature OWNER TO gis_admins;

GRANT SELECT ON gis_core.centreline_latest_all_feature TO bdit_humans, bdit_bots;

COMMENT ON MATERIALIZED VIEW gis_core.centreline_latest_all_feature IS E''
'Materialized view containing the latest version of centreline with all feature code, derived from gis_core.centreline.';

Check notice on line 24 in gis/centreline/sql/create_matview_centreline_latest_all_feature.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

LT05: Line is too long (121 > 100).
1 change: 1 addition & 0 deletions gis/centreline/sql/create_trigger_centreline_latest.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
CREATE OR REPLACE FUNCTION gis_core.centreline_latest_trigger()
RETURNS trigger
LANGUAGE 'plpgsql'

Check failure on line 3 in gis/centreline/sql/create_trigger_centreline_latest.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

RF06: Unnecessary quoted identifier 'plpgsql'.
COST 100
VOLATILE NOT LEAKPROOF SECURITY DEFINER
AS $BODY$
BEGIN

REFRESH MATERIALIZED VIEW gis_core.centreline_latest;
REFRESH MATERIALIZED VIEW gis_core.centreline_latest_all_feature;
RETURN NULL;

END;
Expand All @@ -14,3 +15,3 @@

ALTER FUNCTION gis_core.centreline_latest_trigger() OWNER TO gis_admins;

Expand Down
3 changes: 3 additions & 0 deletions gis/gccview/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ The pipeline consists of two files, `gcc_puller_functions.py` for the functions
- con (used when manually pull): the path to the credential config file. Default is ~/db.cfg
- primary_key (used when pulling an audited table): primary key for this layer, returned from dictionary pk_dict when pulling for the Airflow DAG, set it manually when pulling a layer yourself.
- is_partitioned (Boolean): True if the layer will be inserted as a child table part of a parent table, False if the layer will be neither audited nor partitioned.
- include_additional_layers (Boolean): True if pulling additional layers for centreline.

In the DAG file, the arguments for each layer are stored in dictionaries called "bigdata_layers" and "ptc_layers", in the order above. The DAG will be executed once every 3 months, particularly on the 15th of every March, June, September, and December every year. The DAG will pull either audited table or partitioned table since the "is_partitioned" argument is not stored in dictionaries and are set to default value True.

Expand Down Expand Up @@ -140,6 +141,8 @@ There are 7 inputs that can be entered.

`is_partitioned`: Whether table will be a child table of a parent table or with no feature, specify the option on the command line will set this option to True; while not specifying will give the default False.

`include_additional_layers`: Whether additional layer should be pulled (only applicable for centreline, specify the option on the command line will set this option to True; while not specifying will give the default False.

Example of pulling the library layer (table with no feature) to the gis schema.


Expand Down
25 changes: 18 additions & 7 deletions gis/gccview/gcc_puller_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def to_time(input):
time = datetime.datetime.fromtimestamp(abs(input)/1000).strftime('%Y-%m-%d %H:%M:%S')
return time

def get_data(mapserver, layer_id, max_number = None, record_max = None):
def get_data(mapserver, layer_id, include_additional_feature, max_number = None, record_max = None):
"""
Function to retreive layer data from GCCView rest api

Expand All @@ -302,7 +302,10 @@ def get_data(mapserver, layer_id, max_number = None, record_max = None):

record_max : integer
Number for parameter `resultRecordCount` in the query, indicating the number of rows this query is going to fetch


include_additional_feature : bool
Boolean flag to include additional 5 feature codes (Trails, Busway, Laneway, Acess Road, and Other Ramp)

Returns
--------
return_json : json
Expand All @@ -312,7 +315,10 @@ def get_data(mapserver, layer_id, max_number = None, record_max = None):
base_url = f"https://insideto-gis.toronto.ca/arcgis/rest/services/{mapserver}/MapServer/{layer_id}/query"
# Exception if the data we want to get is centreline
if mapserver == 'cot_geospatial' and layer_id == 2:
where = "\"FEATURE_CODE_DESC\" IN ('Collector','Collector Ramp','Expressway','Expressway Ramp','Local','Major Arterial','Major Arterial Ramp','Minor Arterial','Minor Arterial Ramp','Pending', 'Other')"
if include_additional_feature:
where = "\"FEATURE_CODE_DESC\" IN ('Collector','Collector Ramp','Expressway','Expressway Ramp','Local','Major Arterial','Major Arterial Ramp','Minor Arterial','Minor Arterial Ramp','Pending', 'Other', 'Trail', 'Busway', 'Laneway', 'Other Ramp', 'Access Road')"
else:
where = "\"FEATURE_CODE_DESC\" IN ('Collector','Collector Ramp','Expressway','Expressway Ramp','Local','Major Arterial','Major Arterial Ramp','Minor Arterial','Minor Arterial Ramp','Pending', 'Other')"
Comment on lines +319 to +321
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this section a bit more clear about what's being added, could you just have the if include_additional_feature: clause append OR feature_code_desc IN (...) to the default?

elif mapserver == 'cot_geospatial27' and layer_id == 41:
where = "OBJECTID>0"
else:
Expand Down Expand Up @@ -536,7 +542,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche
return successful_execution
#-------------------------------------------------------------------------------------------------------
# base main function, also compatible with Airflow
def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_partitioned = True):
def get_layer(mapserver_n, layer_id, schema_name, is_audited, include_additional_feature, cred = None, con = None, primary_key = None, is_partitioned = True):
"""
This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database.

Expand Down Expand Up @@ -590,7 +596,7 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con =
LOGGER.error("Non-audited tables do not use the primary key.")
#--------------------------------
#get first data pull (no offset), create tables.
return_json = get_data(mapserver, layer_id)
return_json = get_data(mapserver, layer_id, include_additional_feature)
if is_audited:
(insert_column, excluded_column) = create_audited_table(output_table, return_json, schema_name, primary_key, con)
elif is_partitioned:
Expand All @@ -605,7 +611,7 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con =
keep_adding = find_limit(return_json) #checks if all records fetched
if keep_adding:
#get next batch using offset (max_number)
return_json = get_data(mapserver, layer_id, max_number = total, record_max = record_count)
return_json = get_data(mapserver, layer_id, include_additional_feature, max_number = total, record_max = record_count)
LOGGER.info('%s records from [mapserver: %s, layerID: %d] have been inserted into %s', total, mapserver, layer_id, output_table)

if is_audited:
Expand All @@ -630,7 +636,9 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con =
help = 'The path to the credential config file')
@click.option('--is-partitioned', '-p', is_flag=True, show_default=True, default=False,
help = 'Whether the table is supposed to be partitioned (T) or not partitioned (F)')
def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key, con, is_partitioned=True):
@click.option('--include_additional_feature', '-a', is_flag=True, show_default=True, default=False,
help = 'Whether additional layer should be pulled (only applicable for centreline')
def manual_get_layer(mapserver, layer_id, schema_name, is_audited, include_additional_feature, primary_key, con, is_partitioned=True):
"""
This script pulls a GIS layer from GCC servers into the databases of
the Data and Analytics Unit.
Expand All @@ -644,11 +652,14 @@ def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key,
dbset = CONFIG['DBSETTINGS']
connection_obj = connect(**dbset)
# get_layer function
LOGGER.info("include_additional_feature flag: %s", include_additional_feature)

Comment on lines +655 to +656
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

get_layer(
mapserver_n = mapserver,
layer_id = layer_id,
schema_name = schema_name,
is_audited = is_audited,
include_additional_feature = include_additional_feature,
primary_key = primary_key,
con=connection_obj,
is_partitioned = is_partitioned
Expand Down