Skip to content

Commit

Permalink
Fully implements db-centric rfc_max_forecast (#621)
Browse files Browse the repository at this point in the history
This PR, if merged, will replace the existing "rfc_max_forecast" service
with a new version of the service whose backend data is now produced by
direct access to the WRDS RFC Forecast database as opposed to the WRDS
API. As a result:

* The `every_five_minute` EventBridge rule that triggered the
`viz-wrds-api-handler` Lambda has been modified to instead trigger the
`initialize-viz-pipeline` Lambda directly. The viz-wrds-api-handler code
is now deprecated and can be eventually removed.
* The `initialize-viz-pipeline` Lambda code had to be modified to allow
the "rfc" pipeline to be kicked off without referencing any input files
- as there are none. This allows it to act on more of a "cron" basis and
less of a "file-driven-event" basis.
* The new `products/rfc/rfc_max_forecast.sql` query relies upon a few
new database structures:
* Two new ENUM types - `flood_status` and `forecast_ts` - are used for
assigning trend and prioritizing "duplicate" forecasts. These are now
created in the RDS Bastion `postgresql_setup.sh.tftpl` script.
* Two new database views - `rnr.stage_thresholds` and
`rnr.flow_thresholds` - reorganize data in the external/foreign WRDS
Ingest DB threshold table (external.threshold) for more efficient use
here and eventually in other places (e.g. RnR). The SQL for these views
was committed for the record in the
`Core/LAMBDA/rnr_functions/rnr_domain_generator/sql/dba_stuff.sql` file,
but I believe this view should get copied over with the dump of the RnR
schema on deployments and thus should not need to be recreated manually.
* Forecasts that are distributed as flow-only (i.e. have no associated
rating curve to produce stage) are now also included as a value-added
win (addressing issue #312). As a result of this:
* The DB table produced by `rfc_max_forecast.sql` has new/modified
column names, replacing every occurrence of "stage" with "value" (i.e.
`max_stage_timestamp` to `max_value_timestamp`).
* The `rfc_max_forecast.mapx` file was thus also modified to replace
every occurrence of "stage" with "value" in both the "fieldName" and
"alias" fields (i.e. `max_stage_timestamp` to `max_value_timestamp` and
"Forecast Min Stage Timestamp" to "Forecast Min Value Timestamp")

This work will also allow the Replace and Route to be somewhat
redesigned to be completely in-sync with this new rfc_max_forecast
service - likely using the underlying `publish.rfc_max_forecast` table
as its starting point.
  • Loading branch information
shawncrawley authored and nickchadwick-noaa committed Feb 21, 2024
1 parent 2a729fa commit 1ce8290
Show file tree
Hide file tree
Showing 6 changed files with 552 additions and 268 deletions.
6 changes: 5 additions & 1 deletion Core/EC2/RDSBastion/scripts/viz/postgresql_setup.sh.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ psql -h "${viz_db_host}" -U "${viz_db_username}" -p ${viz_db_port} -d "${viz_db_

# Add permissions for aws_s3 extension to viz user.
echo "Adding permissions to aws_s3 extension for viz user..."
psql -h "${viz_db_host}" -U "${viz_db_username}" -p ${viz_db_port} -d "${viz_db_name}" -qtAc "GRANT USAGE ON schema aws_s3 TO ${viz_proc_admin_rw_username}; GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA aws_s3 TO ${viz_proc_admin_rw_username};"
psql -h "${viz_db_host}" -U "${viz_db_username}" -p ${viz_db_port} -d "${viz_db_name}" -qtAc "GRANT USAGE ON schema aws_s3 TO ${viz_proc_admin_rw_username}; GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA aws_s3 TO ${viz_proc_admin_rw_username};"

echo "Creating enum types for use in rfc-based services"
psql -h "${viz_db_host}" -U "${viz_db_username}" -p ${viz_db_port} -d "${viz_db_name}" -qtAc "CREATE TYPE flood_status AS ENUM ('no_flooding', 'action', 'minor', 'moderate', 'major');"
psql -h "${viz_db_host}" -U "${viz_db_username}" -p ${viz_db_port} -d "${viz_db_name}" -qtAc "CREATE TYPE forecast_ts AS ENUM ('FE', 'FF', 'FM', 'FN', 'FP', 'FQ', 'FU', 'FV', 'FW', 'FA', 'FB', 'FC', 'FD', 'FX', 'FG', 'FL', 'FZ');"
100 changes: 99 additions & 1 deletion Core/LAMBDA/rnr_functions/rnr_domain_generator/sql/dba_stuff.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,102 @@ ORDER BY
nwm_feature_id,
nws_station_id,
nws_usgs_crosswalk_dataset_id DESC NULLS LAST,
location_nwm_crosswalk_dataset_id DESC NULLS LAST;
location_nwm_crosswalk_dataset_id DESC NULLS LAST;

-- CREATE FLOW_THRESHOLDS VIEW
DROP VIEW IF EXISTS rnr.flow_thresholds;
CREATE VIEW rnr.flow_thresholds AS

WITH

main AS (
SELECT
station.location_id as nws_station_id,
COALESCE(native.action_flow, usgs.action_flow_calc, nrldb.action_flow_calc) as action,
CASE
WHEN native.action_flow IS NOT NULL
THEN 'Native'
WHEN usgs.action_flow_calc IS NOT NULL
THEN 'USGS'
WHEN nrldb.action_flow_calc IS NOT NULL
THEN 'NRLDB'
END as action_source,
COALESCE(native.minor_flow, usgs.minor_flow_calc, nrldb.minor_flow_calc) as minor,
CASE
WHEN native.minor_flow IS NOT NULL
THEN 'Native'
WHEN usgs.minor_flow_calc IS NOT NULL
THEN 'USGS'
WHEN nrldb.minor_flow_calc IS NOT NULL
THEN 'NRLDB'
END as minor_source,
COALESCE(native.moderate_flow, usgs.moderate_flow_calc, nrldb.moderate_flow_calc) as moderate,
CASE
WHEN native.moderate_flow IS NOT NULL
THEN 'Native'
WHEN usgs.moderate_flow_calc IS NOT NULL
THEN 'USGS'
WHEN nrldb.moderate_flow_calc IS NOT NULL
THEN 'NRLDB'
END as moderate_source,
COALESCE(native.major_flow, usgs.major_flow_calc, nrldb.major_flow_calc) as major,
CASE
WHEN native.major_flow IS NOT NULL
THEN 'Native'
WHEN usgs.major_flow_calc IS NOT NULL
THEN 'USGS'
WHEN nrldb.major_flow_calc IS NOT NULL
THEN 'NRLDB'
END as major_source,
COALESCE(native.record_flow, usgs.record_flow_calc, nrldb.record_flow_calc) as record,
CASE
WHEN native.record_flow IS NOT NULL
THEN 'Native'
WHEN usgs.record_flow_calc IS NOT NULL
THEN 'USGS'
WHEN nrldb.record_flow_calc IS NOT NULL
THEN 'NRLDB'
END as record_source
FROM (SELECT DISTINCT location_id FROM external.threshold) AS station
LEFT JOIN external.threshold native
ON native.location_id = station.location_id
AND native.rating_source = 'NONE'
LEFT JOIN external.threshold usgs
ON usgs.location_id = station.location_id
AND usgs.rating_source = 'USGS Rating Depot'
LEFT JOIN external.threshold nrldb
ON nrldb.location_id = station.location_id
AND nrldb.rating_source = 'NRLDB'
)

SELECT * FROM main
WHERE COALESCE(action, minor, moderate, major, record) IS NOT NULL;

-- CREATE STAGE THRESHOLDS VIEW
DROP VIEW IF EXISTS rnr.stage_thresholds;
CREATE VIEW rnr.stage_thresholds AS

WITH

native_stage_thresholds AS (
SELECT
location_id,
action_stage,
minor_stage,
moderate_stage,
major_stage,
record_stage
FROM external.threshold
WHERE rating_source = 'NONE'
)

SELECT
location_id AS nws_station_id,
action_stage as action,
minor_stage as minor,
moderate_stage as moderate,
major_stage as major,
record_stage as record
FROM external.threshold station
WHERE rating_source = 'NONE'
AND COALESCE(action_stage, minor_stage, moderate_stage, major_stage, record_stage) IS NOT NULL;
5 changes: 3 additions & 2 deletions Core/LAMBDA/viz_functions/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,9 @@ resource "aws_lambda_function" "viz_wrds_api_handler" {

resource "aws_cloudwatch_event_target" "check_lambda_every_five_minutes" {
rule = var.five_minute_trigger.name
target_id = aws_lambda_function.viz_wrds_api_handler.function_name
arn = aws_lambda_function.viz_wrds_api_handler.arn
target_id = aws_lambda_function.viz_initialize_pipeline.function_name
arn = aws_lambda_function.viz_initialize_pipeline.arn
input = "{\"configuration\":\"rfc\"}"
}

resource "aws_lambda_permission" "allow_cloudwatch_to_call_check_lambda" {
Expand Down
Loading

0 comments on commit 1ce8290

Please sign in to comment.