From e08c8cd2caf2a4c498775d88efe0bb47d6f0f62f Mon Sep 17 00:00:00 2001 From: JurgenZach-NOAA Date: Thu, 8 Aug 2024 07:50:14 -0700 Subject: [PATCH] More time-efficient order in timeslice file creation --- src/troute-network/troute/nhd_io.py | 115 +++++++++++++++++++++++----- test/LowerColorado_TX/test_AnA.yaml | 2 +- 2 files changed, 95 insertions(+), 22 deletions(-) diff --git a/src/troute-network/troute/nhd_io.py b/src/troute-network/troute/nhd_io.py index b948b379f..76fdcf46f 100644 --- a/src/troute-network/troute/nhd_io.py +++ b/src/troute-network/troute/nhd_io.py @@ -1085,6 +1085,63 @@ def _read_timeslice_file(f): observation_quality = pd.DataFrame() return timeslice_observations, observation_quality + +def _read_timeslice_file_numpy(f): + + with netCDF4.Dataset( + filename = f, + mode = 'r', + format = "NETCDF4" + ) as ds: + + #discharge = ds.variables['discharge'][:].data + discharge = ds.variables['discharge'][:].filled(fill_value = np.nan) + stns = ds.variables['stationId'][:].filled(fill_value = np.nan) + t = ds.variables['time'][:].filled(fill_value = np.nan) + qual = ds.variables['discharge_quality'][:].filled(fill_value = np.nan) + + return discharge, stns, t, qual + + +def _read_timeslice_file(f): + + with netCDF4.Dataset( + filename = f, + mode = 'r', + format = "NETCDF4" + ) as ds: + + discharge = ds.variables['discharge'][:].filled(fill_value = np.nan) + stns = ds.variables['stationId'][:].filled(fill_value = np.nan) + t = ds.variables['time'][:].filled(fill_value = np.nan) + qual = ds.variables['discharge_quality'][:].filled(fill_value = np.nan) + + if discharge.size != 0 and stns.size != 0 and t.size != 0: + stationId = np.apply_along_axis(''.join, 1, stns.astype(str)) + time_str = np.apply_along_axis(''.join, 1, t.astype(str)) + stationId = np.char.strip(stationId) + + timeslice_observations = (pd.DataFrame({ + 'stationId' : stationId, + 'datetime' : time_str, + 'discharge' : discharge + }). + set_index(['stationId', 'datetime']). + unstack(1, fill_value = np.nan)['discharge']) + + observation_quality = (pd.DataFrame({ + 'stationId' : stationId, + 'datetime' : time_str, + 'quality' : qual/100 + }). + set_index(['stationId', 'datetime']). + unstack(1, fill_value = np.nan)['quality']) + else: + timeslice_observations = pd.DataFrame() + observation_quality = pd.DataFrame() + return timeslice_observations, observation_quality + + def _interpolate_one(df, interpolation_limit, frequency): interp_out = (df.resample('min'). @@ -1098,6 +1155,7 @@ def _interpolate_one(df, interpolation_limit, frequency): ) return interp_out + def get_obs_from_timeslices( crosswalk_df, crosswalk_gage_field, @@ -1109,6 +1167,7 @@ def get_obs_from_timeslices( t0, cpu_pool, ): + """ Read observations from TimeSlice files, interpolate available observations and organize into a Pandas DataFrame @@ -1153,31 +1212,44 @@ def get_obs_from_timeslices( # - Parallelize this reading for speedup using netCDF4 # - Generecize the function to read USACE or USGS data # - only return gages that are in the model domain (consider mask application) - + # open TimeSlce files, organize data into dataframes with Parallel(n_jobs=cpu_pool) as parallel: jobs = [] for f in timeslice_files: - jobs.append(delayed(_read_timeslice_file)(f)) - timeslice_dataframes = parallel(jobs) + jobs.append(delayed(_read_timeslice_file_numpy)(f)) + result = parallel(jobs) + discharge = [out[0] for out in result] + stns = [out[1] for out in result] + t = [out[2] for out in result] + qual = [out[3] for out in result] + + if len(discharge) != 0 and len(stns) != 0 and len(t) != 0: + + stationIds = [np.apply_along_axis(''.join, 1, stn.astype(str)) for stn in stns] + stationIds = [np.char.strip(stn) for stn in stationIds] + time_strs = [np.apply_along_axis(''.join, 1, t1.astype(str)) for t1 in t] + timeAxis2 = [time_strs[i][0] for i in range(len(time_strs))] + timeAxis = [datetime.strptime(t,"%Y-%m-%d_%H:%M:%S") for t in timeAxis2] + + dfList = [None]*len(timeAxis) + qList = [None]*len(timeAxis) + + for i in range(len(stationIds)): + dfList[i]=(pd.DataFrame({'stationId': stationIds[i],'discharge':discharge[i]}).set_index(['stationId'])) + qList[i]=(pd.DataFrame({'stationId': stationIds[i],'quality':qual[i]/100}).set_index(['stationId'])) + + # concatenate dataframes + timeslice_obs_df = pd.concat(dfList, axis = 1) + timeslice_qual_df = pd.concat(qList, axis = 1) + + timeslice_obs_df.columns = timeAxis + timeslice_qual_df.columns = timeAxis + + else: + timeslice_obs_df = pd.DataFrame() + timeslice_qual_df = pd.DataFrame() - all_empty = all(df.empty for tuple in timeslice_dataframes for df in tuple) - if all_empty: - LOG.debug(f'{crosswalk_gage_field} DataFrames is empty, check timeslice files.') - return pd.DataFrame() - - # create lists of observations and obs quality dataframes returned - # from _read_timeslice_file - timeslice_obs_frames = [] - timeslice_qual_frames = [] - for d in timeslice_dataframes: - timeslice_obs_frames.append(d[0]) # TimeSlice gage observation - timeslice_qual_frames.append(d[1]) # TimeSlice observation qual - - # concatenate dataframes - timeslice_obs_df = pd.concat(timeslice_obs_frames, axis = 1) - timeslice_qual_df = pd.concat(timeslice_qual_frames, axis = 1) - # Link <> gage crosswalk data df = crosswalk_df.reset_index() df[crosswalk_gage_field] = np.asarray(df[crosswalk_gage_field]).astype('