Skip to content

Commit

Permalink
Merge branch 'master' into add_config_notes
Browse files Browse the repository at this point in the history
  • Loading branch information
shorvath-noaa authored Sep 10, 2024
2 parents c17f237 + 470c767 commit 23cf241
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 26 deletions.
4 changes: 0 additions & 4 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ git clone --progress --single-branch --branch master http://github.com/NOAA-OWP/
# compile and install
./compiler.sh

# In the event that compilation results does not complete and throws a Cython compile error, rerun with a non-editable flag:
./compiler.sh no-e
# [ this is a known issue, and may happen in fresh t-route clones; please see https://github.com/NOAA-OWP/t-route/issues/675 for details ]

# execute a demonstration test with NHD network
cd test/LowerColorado_TX
python3 -m nwm_routing -f -V4 test_AnA_V4_NHD.yaml
Expand Down
17 changes: 16 additions & 1 deletion src/troute-config/troute/config/output_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class StreamOutput(BaseModel):
simulation. If t-route is run with default dt (300 seconds/5 minutes) for 24 hours, the defaults here would produce 24 output files
(1 per hour of simulation), each containing 12 values for each variable (1 value every 5 minutes in the hour of simulation).
"""
stream_output_directory: Optional[DirectoryPath] = None
stream_output_directory: Optional[Path] = None
"""
Directory to save flowveldepth outputs. If this is not None, this form of output will be written.
"""
Expand All @@ -166,6 +166,21 @@ class StreamOutput(BaseModel):
is in minutes). So if dt=300(sec), this value cannot be smaller than 5(min) and should be a multiple of 5.
"""

@validator('stream_output_directory')
def validate_stream_output_directory(cls, value):
if value is None:
return None

# expand ~/output/dir -> /home/user/output/dir
value = value.expanduser()

if value.exists() and not value.is_dir():
raise ValueError(f"'stream_output_directory'={value!s} is a file, expected directory.")

# make directory (and intermediates) if they don't exist
value.mkdir(parents=True, exist_ok=True)
return value

@validator('stream_output_internal_frequency')
def validate_stream_output_internal_frequency(cls, value, values):
if value is not None:
Expand Down
62 changes: 41 additions & 21 deletions src/troute-network/troute/nhd_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2289,31 +2289,51 @@ def create_mask(ids):

merge_flowveldepth_reset = merge_flowveldepth_reset.dropna(subset=['nex'])
merge_flowveldepth_reset['nex'] = merge_flowveldepth_reset['nex'].astype(int)
# Define custom aggregation functions
def sum_q_columns(group):
q_columns = [col for col in group.columns if col[1] == 'q']
return group[q_columns].sum()

def custom_v(group):
v_columns = [col for col in group.columns if col[1] == 'v']
v_values = group[v_columns]
if len(v_values) > 1:
return pd.Series({col: np.nan for col in v_values.columns})
else:
return v_values.iloc[0]

def optimized_v_data(merge_flowveldepth_reset, v_columns):
# Ensure v_columns is a list
if isinstance(v_columns, tuple):
v_columns = [v_columns]

# Calculate the size of each group by 'nex', to find out the duplicate
group_sizes = merge_flowveldepth_reset.groupby('nex').size()

unique_nex = group_sizes[group_sizes == 1].index
non_unique_nex = group_sizes[group_sizes > 1].index
filtered_unique_df = merge_flowveldepth_reset[merge_flowveldepth_reset['nex'].isin(unique_nex)]
filtered_non_unique_df = merge_flowveldepth_reset[merge_flowveldepth_reset['nex'].isin(non_unique_nex)]
filtered_non_unique_df.loc[:, v_columns] = np.nan
filtered_non_unique_df = filtered_non_unique_df.drop_duplicates(subset=['nex'])

combined_df = pd.concat([filtered_unique_df, filtered_non_unique_df])
v_data = combined_df.set_index('nex')[v_columns]

return v_data


def optimized_agg(merge_flowveldepth_reset):
# Identify columns for each operation
q_columns = [col for col in merge_flowveldepth_reset.columns if col[1] == 'q']
v_columns = [col for col in merge_flowveldepth_reset.columns if col[1] == 'v']
d_columns = [col for col in merge_flowveldepth_reset.columns if col[1] == 'd']

# Sum of flowrate columns
sum_q = merge_flowveldepth_reset.groupby('nex')[q_columns].sum()

# Handle velocity in a way that if there are 2 segments coming into one put NaN value
v_data = optimized_v_data(merge_flowveldepth_reset, v_columns)

# Average depth columns
avg_d = merge_flowveldepth_reset.groupby('nex')[d_columns].mean()

def avg_d_columns(group):
d_columns = [col for col in group.columns if col[1] == 'd']
return group[d_columns].mean()
# Combine all results into a single DataFrame
all_nex_data = pd.concat([sum_q, v_data, avg_d], axis=1)

# Apply the groupby with the custom aggregation functions
def custom_agg(group):
result = pd.concat([sum_q_columns(group), custom_v(group), avg_d_columns(group)])
return result
return all_nex_data

all_nex_data = merge_flowveldepth_reset.groupby('nex').apply(custom_agg)
all_nex_data = optimized_agg(merge_flowveldepth_reset)
all_nex_data.index = all_nex_data.index.rename('featureID')
all_nex_data['Type'] = 'nex'
all_nex_data['Type'] = 'nex'
# Set the new 'Type' column as an index
all_nex_data = all_nex_data.set_index('Type', append=True)

Expand Down

0 comments on commit 23cf241

Please sign in to comment.