Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More time-efficient order in timeslice file creation #823

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 99 additions & 20 deletions src/troute-network/troute/nhd_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,63 @@ def _read_timeslice_file(f):
observation_quality = pd.DataFrame()
return timeslice_observations, observation_quality


def _read_timeslice_file_numpy(f):

with netCDF4.Dataset(
filename = f,
mode = 'r',
format = "NETCDF4"
) as ds:

#discharge = ds.variables['discharge'][:].data
discharge = ds.variables['discharge'][:].filled(fill_value = np.nan)
stns = ds.variables['stationId'][:].filled(fill_value = np.nan)
t = ds.variables['time'][:].filled(fill_value = np.nan)
qual = ds.variables['discharge_quality'][:].filled(fill_value = np.nan)

return discharge, stns, t, qual


def _read_timeslice_file(f):

with netCDF4.Dataset(
filename = f,
mode = 'r',
format = "NETCDF4"
) as ds:

discharge = ds.variables['discharge'][:].filled(fill_value = np.nan)
stns = ds.variables['stationId'][:].filled(fill_value = np.nan)
t = ds.variables['time'][:].filled(fill_value = np.nan)
qual = ds.variables['discharge_quality'][:].filled(fill_value = np.nan)

if discharge.size != 0 and stns.size != 0 and t.size != 0:
stationId = np.apply_along_axis(''.join, 1, stns.astype(str))
time_str = np.apply_along_axis(''.join, 1, t.astype(str))
stationId = np.char.strip(stationId)

timeslice_observations = (pd.DataFrame({
'stationId' : stationId,
'datetime' : time_str,
'discharge' : discharge
}).
set_index(['stationId', 'datetime']).
unstack(1, fill_value = np.nan)['discharge'])

observation_quality = (pd.DataFrame({
'stationId' : stationId,
'datetime' : time_str,
'quality' : qual/100
}).
set_index(['stationId', 'datetime']).
unstack(1, fill_value = np.nan)['quality'])
else:
timeslice_observations = pd.DataFrame()
observation_quality = pd.DataFrame()
return timeslice_observations, observation_quality


def _interpolate_one(df, interpolation_limit, frequency):

interp_out = (df.resample('min').
Expand All @@ -1098,6 +1155,7 @@ def _interpolate_one(df, interpolation_limit, frequency):
)
return interp_out


def get_obs_from_timeslices(
crosswalk_df,
crosswalk_gage_field,
Expand All @@ -1109,6 +1167,7 @@ def get_obs_from_timeslices(
t0,
cpu_pool,
):

"""
Read observations from TimeSlice files, interpolate available observations
and organize into a Pandas DataFrame
Expand Down Expand Up @@ -1153,31 +1212,50 @@ def get_obs_from_timeslices(
# - Parallelize this reading for speedup using netCDF4
# - Generecize the function to read USACE or USGS data
# - only return gages that are in the model domain (consider mask application)

# open TimeSlce files, organize data into dataframes
with Parallel(n_jobs=cpu_pool) as parallel:
jobs = []
for f in timeslice_files:
jobs.append(delayed(_read_timeslice_file)(f))
timeslice_dataframes = parallel(jobs)
jobs.append(delayed(_read_timeslice_file_numpy)(f))
result = parallel(jobs)
discharge = [out[0] for out in result]
stns = [out[1] for out in result]
t = [out[2] for out in result]
qual = [out[3] for out in result]

all_empty = all(df.empty for tuple in timeslice_dataframes for df in tuple)
if all_empty:
all_empty = all(len(s)==0 for s in stns)

if (all_empty):
LOG.debug(f'{crosswalk_gage_field} DataFrames is empty, check timeslice files.')
return pd.DataFrame()

# create lists of observations and obs quality dataframes returned
# from _read_timeslice_file
timeslice_obs_frames = []
timeslice_qual_frames = []
for d in timeslice_dataframes:
timeslice_obs_frames.append(d[0]) # TimeSlice gage observation
timeslice_qual_frames.append(d[1]) # TimeSlice observation qual

# concatenate dataframes
timeslice_obs_df = pd.concat(timeslice_obs_frames, axis = 1)
timeslice_qual_df = pd.concat(timeslice_qual_frames, axis = 1)

return pd.DataFrame()

if (not all_empty):

stationIds = [np.apply_along_axis(''.join, 1, stn.astype(str)) for stn in stns]
stationIds = [np.char.strip(stn) for stn in stationIds]
time_strs = [np.apply_along_axis(''.join, 1, t1.astype(str)) for t1 in t]
timeAxis2 = [time_strs[i][0] for i in range(len(time_strs))]
timeAxis = [datetime.strptime(t,"%Y-%m-%d_%H:%M:%S") for t in timeAxis2]

dfList = [None]*len(timeAxis)
qList = [None]*len(timeAxis)

for i in range(len(stationIds)):
dfList[i]=(pd.DataFrame({'stationId': stationIds[i],'discharge':discharge[i]}).set_index(['stationId']))
qList[i]=(pd.DataFrame({'stationId': stationIds[i],'quality':qual[i]/100}).set_index(['stationId']))

# concatenate dataframes
timeslice_obs_df = pd.concat(dfList, axis = 1)
timeslice_qual_df = pd.concat(qList, axis = 1)

timeslice_obs_df.columns = timeAxis
timeslice_qual_df.columns = timeAxis

else:
timeslice_obs_df = pd.DataFrame()
timeslice_qual_df = pd.DataFrame()

# Link <> gage crosswalk data
df = crosswalk_df.reset_index()
df[crosswalk_gage_field] = np.asarray(df[crosswalk_gage_field]).astype('<U15')
Expand Down Expand Up @@ -1209,10 +1287,11 @@ def get_obs_from_timeslices(

# ---- Interpolate USGS observations to the input frequency (frequency_secs)
observation_df_T = observation_df.transpose() # transpose, making time the index

observation_df_T.index = pd.to_datetime(
observation_df_T.index, format = "%Y-%m-%d_%H:%M:%S" # index variable as type datetime
)

# specify resampling frequency
frequency = str(int(frequency_secs/60))+"min"

Expand Down
2 changes: 1 addition & 1 deletion test/LowerColorado_TX/test_AnA.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ compute_parameters:
compute_kernel : V02-structured
assume_short_ts : True
subnetwork_target_size : 10000
cpu_pool : 36
cpu_pool : 1
restart_parameters:
#----------
start_datetime: 2021-08-23_13:00
Expand Down
Loading