From e08c8cd2caf2a4c498775d88efe0bb47d6f0f62f Mon Sep 17 00:00:00 2001 From: JurgenZach-NOAA Date: Thu, 8 Aug 2024 07:50:14 -0700 Subject: [PATCH 1/3] More time-efficient order in timeslice file creation --- src/troute-network/troute/nhd_io.py | 115 +++++++++++++++++++++++----- test/LowerColorado_TX/test_AnA.yaml | 2 +- 2 files changed, 95 insertions(+), 22 deletions(-) diff --git a/src/troute-network/troute/nhd_io.py b/src/troute-network/troute/nhd_io.py index b948b379f..76fdcf46f 100644 --- a/src/troute-network/troute/nhd_io.py +++ b/src/troute-network/troute/nhd_io.py @@ -1085,6 +1085,63 @@ def _read_timeslice_file(f): observation_quality = pd.DataFrame() return timeslice_observations, observation_quality + +def _read_timeslice_file_numpy(f): + + with netCDF4.Dataset( + filename = f, + mode = 'r', + format = "NETCDF4" + ) as ds: + + #discharge = ds.variables['discharge'][:].data + discharge = ds.variables['discharge'][:].filled(fill_value = np.nan) + stns = ds.variables['stationId'][:].filled(fill_value = np.nan) + t = ds.variables['time'][:].filled(fill_value = np.nan) + qual = ds.variables['discharge_quality'][:].filled(fill_value = np.nan) + + return discharge, stns, t, qual + + +def _read_timeslice_file(f): + + with netCDF4.Dataset( + filename = f, + mode = 'r', + format = "NETCDF4" + ) as ds: + + discharge = ds.variables['discharge'][:].filled(fill_value = np.nan) + stns = ds.variables['stationId'][:].filled(fill_value = np.nan) + t = ds.variables['time'][:].filled(fill_value = np.nan) + qual = ds.variables['discharge_quality'][:].filled(fill_value = np.nan) + + if discharge.size != 0 and stns.size != 0 and t.size != 0: + stationId = np.apply_along_axis(''.join, 1, stns.astype(str)) + time_str = np.apply_along_axis(''.join, 1, t.astype(str)) + stationId = np.char.strip(stationId) + + timeslice_observations = (pd.DataFrame({ + 'stationId' : stationId, + 'datetime' : time_str, + 'discharge' : discharge + }). + set_index(['stationId', 'datetime']). + unstack(1, fill_value = np.nan)['discharge']) + + observation_quality = (pd.DataFrame({ + 'stationId' : stationId, + 'datetime' : time_str, + 'quality' : qual/100 + }). + set_index(['stationId', 'datetime']). + unstack(1, fill_value = np.nan)['quality']) + else: + timeslice_observations = pd.DataFrame() + observation_quality = pd.DataFrame() + return timeslice_observations, observation_quality + + def _interpolate_one(df, interpolation_limit, frequency): interp_out = (df.resample('min'). @@ -1098,6 +1155,7 @@ def _interpolate_one(df, interpolation_limit, frequency): ) return interp_out + def get_obs_from_timeslices( crosswalk_df, crosswalk_gage_field, @@ -1109,6 +1167,7 @@ def get_obs_from_timeslices( t0, cpu_pool, ): + """ Read observations from TimeSlice files, interpolate available observations and organize into a Pandas DataFrame @@ -1153,31 +1212,44 @@ def get_obs_from_timeslices( # - Parallelize this reading for speedup using netCDF4 # - Generecize the function to read USACE or USGS data # - only return gages that are in the model domain (consider mask application) - + # open TimeSlce files, organize data into dataframes with Parallel(n_jobs=cpu_pool) as parallel: jobs = [] for f in timeslice_files: - jobs.append(delayed(_read_timeslice_file)(f)) - timeslice_dataframes = parallel(jobs) + jobs.append(delayed(_read_timeslice_file_numpy)(f)) + result = parallel(jobs) + discharge = [out[0] for out in result] + stns = [out[1] for out in result] + t = [out[2] for out in result] + qual = [out[3] for out in result] + + if len(discharge) != 0 and len(stns) != 0 and len(t) != 0: + + stationIds = [np.apply_along_axis(''.join, 1, stn.astype(str)) for stn in stns] + stationIds = [np.char.strip(stn) for stn in stationIds] + time_strs = [np.apply_along_axis(''.join, 1, t1.astype(str)) for t1 in t] + timeAxis2 = [time_strs[i][0] for i in range(len(time_strs))] + timeAxis = [datetime.strptime(t,"%Y-%m-%d_%H:%M:%S") for t in timeAxis2] + + dfList = [None]*len(timeAxis) + qList = [None]*len(timeAxis) + + for i in range(len(stationIds)): + dfList[i]=(pd.DataFrame({'stationId': stationIds[i],'discharge':discharge[i]}).set_index(['stationId'])) + qList[i]=(pd.DataFrame({'stationId': stationIds[i],'quality':qual[i]/100}).set_index(['stationId'])) + + # concatenate dataframes + timeslice_obs_df = pd.concat(dfList, axis = 1) + timeslice_qual_df = pd.concat(qList, axis = 1) + + timeslice_obs_df.columns = timeAxis + timeslice_qual_df.columns = timeAxis + + else: + timeslice_obs_df = pd.DataFrame() + timeslice_qual_df = pd.DataFrame() - all_empty = all(df.empty for tuple in timeslice_dataframes for df in tuple) - if all_empty: - LOG.debug(f'{crosswalk_gage_field} DataFrames is empty, check timeslice files.') - return pd.DataFrame() - - # create lists of observations and obs quality dataframes returned - # from _read_timeslice_file - timeslice_obs_frames = [] - timeslice_qual_frames = [] - for d in timeslice_dataframes: - timeslice_obs_frames.append(d[0]) # TimeSlice gage observation - timeslice_qual_frames.append(d[1]) # TimeSlice observation qual - - # concatenate dataframes - timeslice_obs_df = pd.concat(timeslice_obs_frames, axis = 1) - timeslice_qual_df = pd.concat(timeslice_qual_frames, axis = 1) - # Link <> gage crosswalk data df = crosswalk_df.reset_index() df[crosswalk_gage_field] = np.asarray(df[crosswalk_gage_field]).astype(' Date: Thu, 8 Aug 2024 10:17:22 -0700 Subject: [PATCH 2/3] Changed cpu_pool back to default --- test/LowerColorado_TX/test_AnA.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/LowerColorado_TX/test_AnA.yaml b/test/LowerColorado_TX/test_AnA.yaml index 5d025002d..7a6d5d069 100644 --- a/test/LowerColorado_TX/test_AnA.yaml +++ b/test/LowerColorado_TX/test_AnA.yaml @@ -31,7 +31,7 @@ compute_parameters: compute_kernel : V02-structured assume_short_ts : True subnetwork_target_size : 10000 - cpu_pool : 1 + cpu_pool : 36 restart_parameters: #---------- start_datetime: 2021-08-23_13:00 From 4d0af2b637acbe6a5aeccc69f12f0bae26cfa3f0 Mon Sep 17 00:00:00 2001 From: JurgenZach-NOAA Date: Mon, 12 Aug 2024 07:44:31 -0700 Subject: [PATCH 3/3] Fixed situation with empty dataframes --- src/troute-network/troute/nhd_io.py | 8 +++++++- test/LowerColorado_TX/test_AnA.yaml | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/troute-network/troute/nhd_io.py b/src/troute-network/troute/nhd_io.py index 76fdcf46f..736b6c024 100644 --- a/src/troute-network/troute/nhd_io.py +++ b/src/troute-network/troute/nhd_io.py @@ -1224,7 +1224,13 @@ def get_obs_from_timeslices( t = [out[2] for out in result] qual = [out[3] for out in result] - if len(discharge) != 0 and len(stns) != 0 and len(t) != 0: + all_empty = all(len(s)==0 for s in stns) + + if (all_empty): + LOG.debug(f'{crosswalk_gage_field} DataFrames is empty, check timeslice files.') + return pd.DataFrame() + + if (not all_empty): stationIds = [np.apply_along_axis(''.join, 1, stn.astype(str)) for stn in stns] stationIds = [np.char.strip(stn) for stn in stationIds] diff --git a/test/LowerColorado_TX/test_AnA.yaml b/test/LowerColorado_TX/test_AnA.yaml index 7a6d5d069..5d025002d 100644 --- a/test/LowerColorado_TX/test_AnA.yaml +++ b/test/LowerColorado_TX/test_AnA.yaml @@ -31,7 +31,7 @@ compute_parameters: compute_kernel : V02-structured assume_short_ts : True subnetwork_target_size : 10000 - cpu_pool : 36 + cpu_pool : 1 restart_parameters: #---------- start_datetime: 2021-08-23_13:00