Skip to content

Commit

Permalink
More time-efficient order in timeslice file creation
Browse files Browse the repository at this point in the history
  • Loading branch information
JurgenZach-NOAA committed Aug 8, 2024
1 parent 5865910 commit e08c8cd
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 22 deletions.
115 changes: 94 additions & 21 deletions src/troute-network/troute/nhd_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,63 @@ def _read_timeslice_file(f):
observation_quality = pd.DataFrame()
return timeslice_observations, observation_quality


def _read_timeslice_file_numpy(f):

with netCDF4.Dataset(
filename = f,
mode = 'r',
format = "NETCDF4"
) as ds:

#discharge = ds.variables['discharge'][:].data
discharge = ds.variables['discharge'][:].filled(fill_value = np.nan)
stns = ds.variables['stationId'][:].filled(fill_value = np.nan)
t = ds.variables['time'][:].filled(fill_value = np.nan)
qual = ds.variables['discharge_quality'][:].filled(fill_value = np.nan)

return discharge, stns, t, qual


def _read_timeslice_file(f):

with netCDF4.Dataset(
filename = f,
mode = 'r',
format = "NETCDF4"
) as ds:

discharge = ds.variables['discharge'][:].filled(fill_value = np.nan)
stns = ds.variables['stationId'][:].filled(fill_value = np.nan)
t = ds.variables['time'][:].filled(fill_value = np.nan)
qual = ds.variables['discharge_quality'][:].filled(fill_value = np.nan)

if discharge.size != 0 and stns.size != 0 and t.size != 0:
stationId = np.apply_along_axis(''.join, 1, stns.astype(str))
time_str = np.apply_along_axis(''.join, 1, t.astype(str))
stationId = np.char.strip(stationId)

timeslice_observations = (pd.DataFrame({
'stationId' : stationId,
'datetime' : time_str,
'discharge' : discharge
}).
set_index(['stationId', 'datetime']).
unstack(1, fill_value = np.nan)['discharge'])

observation_quality = (pd.DataFrame({
'stationId' : stationId,
'datetime' : time_str,
'quality' : qual/100
}).
set_index(['stationId', 'datetime']).
unstack(1, fill_value = np.nan)['quality'])
else:
timeslice_observations = pd.DataFrame()
observation_quality = pd.DataFrame()
return timeslice_observations, observation_quality


def _interpolate_one(df, interpolation_limit, frequency):

interp_out = (df.resample('min').
Expand All @@ -1098,6 +1155,7 @@ def _interpolate_one(df, interpolation_limit, frequency):
)
return interp_out


def get_obs_from_timeslices(
crosswalk_df,
crosswalk_gage_field,
Expand All @@ -1109,6 +1167,7 @@ def get_obs_from_timeslices(
t0,
cpu_pool,
):

"""
Read observations from TimeSlice files, interpolate available observations
and organize into a Pandas DataFrame
Expand Down Expand Up @@ -1153,31 +1212,44 @@ def get_obs_from_timeslices(
# - Parallelize this reading for speedup using netCDF4
# - Generecize the function to read USACE or USGS data
# - only return gages that are in the model domain (consider mask application)

# open TimeSlce files, organize data into dataframes
with Parallel(n_jobs=cpu_pool) as parallel:
jobs = []
for f in timeslice_files:
jobs.append(delayed(_read_timeslice_file)(f))
timeslice_dataframes = parallel(jobs)
jobs.append(delayed(_read_timeslice_file_numpy)(f))
result = parallel(jobs)
discharge = [out[0] for out in result]
stns = [out[1] for out in result]
t = [out[2] for out in result]
qual = [out[3] for out in result]

if len(discharge) != 0 and len(stns) != 0 and len(t) != 0:

stationIds = [np.apply_along_axis(''.join, 1, stn.astype(str)) for stn in stns]
stationIds = [np.char.strip(stn) for stn in stationIds]
time_strs = [np.apply_along_axis(''.join, 1, t1.astype(str)) for t1 in t]
timeAxis2 = [time_strs[i][0] for i in range(len(time_strs))]
timeAxis = [datetime.strptime(t,"%Y-%m-%d_%H:%M:%S") for t in timeAxis2]

dfList = [None]*len(timeAxis)
qList = [None]*len(timeAxis)

for i in range(len(stationIds)):
dfList[i]=(pd.DataFrame({'stationId': stationIds[i],'discharge':discharge[i]}).set_index(['stationId']))
qList[i]=(pd.DataFrame({'stationId': stationIds[i],'quality':qual[i]/100}).set_index(['stationId']))

# concatenate dataframes
timeslice_obs_df = pd.concat(dfList, axis = 1)
timeslice_qual_df = pd.concat(qList, axis = 1)

timeslice_obs_df.columns = timeAxis
timeslice_qual_df.columns = timeAxis

else:
timeslice_obs_df = pd.DataFrame()
timeslice_qual_df = pd.DataFrame()

all_empty = all(df.empty for tuple in timeslice_dataframes for df in tuple)
if all_empty:
LOG.debug(f'{crosswalk_gage_field} DataFrames is empty, check timeslice files.')
return pd.DataFrame()

# create lists of observations and obs quality dataframes returned
# from _read_timeslice_file
timeslice_obs_frames = []
timeslice_qual_frames = []
for d in timeslice_dataframes:
timeslice_obs_frames.append(d[0]) # TimeSlice gage observation
timeslice_qual_frames.append(d[1]) # TimeSlice observation qual

# concatenate dataframes
timeslice_obs_df = pd.concat(timeslice_obs_frames, axis = 1)
timeslice_qual_df = pd.concat(timeslice_qual_frames, axis = 1)

# Link <> gage crosswalk data
df = crosswalk_df.reset_index()
df[crosswalk_gage_field] = np.asarray(df[crosswalk_gage_field]).astype('<U15')
Expand Down Expand Up @@ -1209,10 +1281,11 @@ def get_obs_from_timeslices(

# ---- Interpolate USGS observations to the input frequency (frequency_secs)
observation_df_T = observation_df.transpose() # transpose, making time the index

observation_df_T.index = pd.to_datetime(
observation_df_T.index, format = "%Y-%m-%d_%H:%M:%S" # index variable as type datetime
)

# specify resampling frequency
frequency = str(int(frequency_secs/60))+"min"

Expand Down
2 changes: 1 addition & 1 deletion test/LowerColorado_TX/test_AnA.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ compute_parameters:
compute_kernel : V02-structured
assume_short_ts : True
subnetwork_target_size : 10000
cpu_pool : 36
cpu_pool : 1
restart_parameters:
#----------
start_datetime: 2021-08-23_13:00
Expand Down

0 comments on commit e08c8cd

Please sign in to comment.