Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Second round of updates #85

Merged
merged 8 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions glhe/aggregation/base_agg.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
import os
from abc import ABC, abstractmethod
from pathlib import Path

import numpy as np

from glhe.utilities.functions import Interpolator1D, Interpolator1DFromFile
from glhe.utilities.functions import Interpolator2DFromFile

join = os.path.join
norm = os.path.normpath
cwd = os.getcwd()
from glhe.utilities.functions import InterpolatorBase, Interpolator1D, Interpolator1DFromFile, Interpolator2DFromFile


class BaseAgg(ABC):

def __init__(self, inputs: dict):
# g-function values
if 'g-function-path' in inputs:
path_g = norm(join(cwd, inputs['g-function-path']))
self.interp_g = Interpolator1DFromFile(path_g)
p = Path(inputs['g-function-path'])
if not p.is_absolute():
p = Path.cwd() / inputs['g-function-path']
self.interp_g = Interpolator1DFromFile(p)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is in the code pretty much anywhere we are reading a path from the input dict. I wonder if there is a better way to do it.

And after taking a quick look, it looks like the Path's resolve() method will handle this. If it's relative, it will apply cwd() to make it absolute. And if it's absolute it will leave it. And it will walk through links and stuff. I'll fix that up.

TIL.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I try to resolve() any pathlib object at instantiation, and then it all just works. This was all developed before I was a pathlib convert. 🙏

elif 'lntts' and 'g-values' in inputs:
data_g = np.transpose(np.array([inputs['lntts'], inputs['g-values']]))
self.interp_g = Interpolator1D(data_g[:, 0], data_g[:, 1])
self.interp_g: InterpolatorBase = Interpolator1D(data_g[:, 0], data_g[:, 1])
else:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mimicking some polymorphism here by trying to convince the IDE to treat this as a base class. No functional effect for Python itself...just trying to help with autocompletion.

raise KeyError('g-function data not found.')

# g_b-function values
self.interp_g_b = None
if 'g_b-function-path' in inputs:
p = Path(inputs['g_b-function-path'])
if not p.is_absolute():
p = Path.cwd() / inputs['g_b-function-path']
if 'g_b-flow-rates' in inputs:
self.interp_g_b = Interpolator2DFromFile(inputs['g_b-function-path'], inputs['g_b-flow-rates'])
self.interp_g_b: InterpolatorBase = Interpolator2DFromFile(p, inputs['g_b-flow-rates'])
else:
self.interp_g_b = Interpolator1DFromFile(inputs['g_b-function-path'])
self.interp_g_b: InterpolatorBase = Interpolator1DFromFile(p)
elif 'lntts_b' and 'g_b-values' in inputs:
data_g_b = np.transpose(np.array([inputs['lntts_b'], inputs['g_b-values']]))
self.interp_g_b = Interpolator1D(data_g_b[:, 0], data_g_b[:, 1])
Expand Down
28 changes: 10 additions & 18 deletions glhe/aggregation/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@ def __init__(self, inputs: dict):
self.sub_hr = SubHour(inputs)

# set expansion rate. apply default if needed.
try:
self.exp_rate = 1.62
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor change, which I think should get rid of a few pragma: no covers

if 'expansion-rate' in inputs:
self.exp_rate = inputs['expansion-rate']
except KeyError: # pragma: no cover
self.exp_rate = 1.62 # pragma: no cover

# set the number of bins per level. apply default if needed.
try:
self.bins_per_level = 9
if 'number-bins-per-level' in inputs:
self.bins_per_level = inputs['number-bins-per-level']
except KeyError: # pragma: no cover
self.bins_per_level = 9 # pragma: no cover

# total simulation runtime to make available for method
run_time = inputs['runtime']
Expand All @@ -47,17 +45,13 @@ def __init__(self, inputs: dict):
t = SEC_IN_HOUR

# initialize the dynamic method
initialized = False
while not initialized:
while True:
for _ in range(self.bins_per_level):
t += dt
self.energy = np.insert(self.energy, 0, 0)
self.dts = np.insert(self.dts, 0, dt)

if t >= run_time:
initialized = True
break

return
dt *= self.exp_rate

def aggregate(self, time: int, energy: float):
Expand Down Expand Up @@ -111,22 +105,20 @@ def calc_temporal_superposition(self, time_step: int, flow_rate: float = None) -
if not flow_rate:
g_b = self.interp_g_b.interpolate(lntts)
else:
g_b = np.flipud(self.interp_g_b(lntts, flow_rate))
# TODO: This is not covered by tests, and may need adjusting the interpolator class to work properly
g_b = np.flipud(self.interp_g_b.interpolate(lntts, flow_rate))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still leaving this for now. I think the best thing would be to work up a unit test that exercises it, see it fail, and then fix it :)

return float(np.dot(dq, g)), float(np.dot(dq, g_b))
else:
# convolution for "g" g-functions only
return float(np.dot(dq, g))

def get_g_value(self, time_step: int) -> float:
lntts = np.log(time_step / self.ts)
return float(self.interp_g.interpolate(lntts))
return self.interp_g.interpolate(lntts)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, interpolate is returning a scalar float, so the float cast isn't needed. However, depending on what happens with the interp_g_b stuff above, the interpolate method might need to be more flexible and return an array. So we'll see!


def get_g_b_value(self, time_step: int, flow_rate: float = None) -> float:
lntts = np.log(time_step / self.ts)
if not flow_rate:
return float(self.interp_g_b.interpolate(lntts))
else:
return float(self.interp_g_b.interpolate(lntts, flow_rate))
return self.interp_g_b.interpolate(lntts, flow_rate)

def get_q_prev(self) -> float:
return float(self.sub_hr.energy[-1] / self.sub_hr.dts[-1])
1 change: 1 addition & 0 deletions glhe/aggregation/no_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def calc_temporal_superposition(self, time_step: int, _: float = None) -> float:
# convolution for "g" g-functions only
return float(np.dot(dq, g))

# TODO: Should these abstract methods be populated to do anything? If not, should they actually be abstract methods?
def get_g_value(self, time_step: int) -> float:
pass # pragma: no cover

Expand Down
3 changes: 2 additions & 1 deletion glhe/aggregation/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ def aggregate(self, time: int, energy: float):
# store whatever rolls off of the sub-hourly method

# need to think about this
# TODO: If this is returning early, what about the prev_update_time assignments below?
self.energy[-1] += e_1
return
else:
# aggregate
# TODO: The next two lines are showing as unused in Pycharm, is this IF block needed then?
# dts_flip = np.flipud(self.dts)
# vals, idxs, cnts = np.unique(dts_flip, return_counts=True, return_index=True)
# vals, indexes, counts = np.unique(dts_flip, return_counts=True, return_index=True)
self.energy[-1] += e_1
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This little block of code could use a bit more quality time. I had already commented out the two unused lines, but now I'm not sure if we should collapse it down further.


# numpy split will do a lot of work too
Expand Down
4 changes: 2 additions & 2 deletions glhe/ground_temps/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ class BaseGroundTemp(ABC):
"""

@abstractmethod
def get_temp(self, time: int, depth: float):
def get_temp(self, time: int, depth: float) -> float:
"""
Getter method for ground temperatures

:param time: time for ground temperature [s]
:param time: time for ground temperature [s] # TODO: Should we use float instead of int for time?
:param depth: depth for ground temperature [m]
:return: ground temperature [C]
"""
Expand Down
10 changes: 6 additions & 4 deletions glhe/input_processor/input_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from json import loads
import os
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like another spot to get rid of os. It's just being used to join paths in one function below.

from pathlib import Path

from jsonschema import SchemaError, ValidationError, validate

Expand All @@ -8,7 +10,7 @@

class InputProcessor:

def __init__(self, json_input_path: str):
def __init__(self, json_input_path: Path):
"""
Initialize the input processor, process input file, and store the input information.

Expand All @@ -18,11 +20,11 @@ def __init__(self, json_input_path: str):
"""

# check if file exists
if not os.path.exists(json_input_path):
raise FileNotFoundError("Input file: '{}' does not exist.".format(json_input_path))
if not json_input_path.exists():
raise FileNotFoundError(f"Input file: '{json_input_path}' does not exist.")

# load the input file
self.input_dict = lower_obj(load_json(json_input_path))
self.input_dict: dict = lower_obj(loads(json_input_path.read_text()))

# validate the inputs
self.validate_inputs(self.input_dict)
Expand Down
48 changes: 28 additions & 20 deletions glhe/output_processor/output_processor.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
import datetime as dt
import os
from os.path import join, normpath

import pandas as pd
from pathlib import Path


class OutputProcessor:

def __init__(self, output_dir: str, output_name: str):
def __init__(self, output_dir: Path, output_name: str):
"""
Output processor manages output data
"""

self.output_dir = output_dir
self.output_file = output_name
self.write_path = normpath(join(output_dir, output_name))
self.df = pd.DataFrame()
self.write_path = output_dir / output_name
# self.df = pd.DataFrame()
self.output_data = []
self.idx_count = 0

def collect_output(self, data_dict: dict) -> None:
Expand All @@ -24,30 +22,40 @@ def collect_output(self, data_dict: dict) -> None:

:param data_dict: dictionary of data to be logged
"""

df_temp = pd.DataFrame(data_dict, index=[self.idx_count])
self.df = pd.concat([self.df, df_temp], axis=0, sort=True)
self.idx_count += 1
self.output_data.append(data_dict)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output processor could use some love. It needs a round of efficiency upgrading, but it does seem to be working happily without pandas, which is the right step.

# df_temp = pd.DataFrame(data_dict, index=[self.idx_count])
# self.df = pd.concat([self.df, df_temp], axis=0, sort=True)
# self.idx_count += 1

def write_to_file(self) -> None:
"""
Write the DataFrame holding the simulation data to a file.
"""
if os.path.exists(self.write_path):
os.remove(self.write_path)
if self.write_path.exists():
self.write_path.unlink()

header_row = ['Date/Time']
header_row.extend([key for key in sorted(self.output_data[0].keys()) if key != 'Elapsed Time [s]'])

self.convert_time_to_timestamp()
self.df.to_csv(self.write_path)
time_stamps = self.convert_time_to_timestamp()
with self.write_path.open('w') as f:
for i, d in enumerate(self.output_data):
if i == 0:
f.write(','.join(header_row) + '\n')
row = [time_stamps[i]]
sorted_values_without_time = [str(d[key]) for key in sorted(d.keys()) if key != 'Elapsed Time [s]']
row.extend(sorted_values_without_time)
f.write(','.join(row) + '\n')

def convert_time_to_timestamp(self) -> None:
def convert_time_to_timestamp(self) -> list[str]:
"""
Convert the 'Elapsed Time' column to a standardized date/time format.
"""
try:
dts = [dt.timedelta(seconds=x) for x in self.df['Elapsed Time [s]'].values.tolist()]
raw_dts = [d['Elapsed Time [s]'] for d in self.output_data]
dts = [dt.timedelta(seconds=x) for x in raw_dts]
start_time = dt.datetime(year=dt.datetime.now().year, month=1, day=1, hour=0, minute=0)
time_stamps = [start_time + x for x in dts]
self.df['Date/Time'] = time_stamps
self.df.set_index('Date/Time', inplace=True)
time_stamps = [str(start_time + x) for x in dts]
return time_stamps
except KeyError:
pass
73 changes: 59 additions & 14 deletions glhe/profiles/external_base.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,69 @@
import pandas as pd
from scipy.interpolate import interp1d
from pathlib import Path

import csv
from datetime import datetime
from glhe.utilities.functions import Interpolator1D

class ExternalBase(object):

def __init__(self, path, col_num):

df = pd.read_csv(path, index_col=0, parse_dates=True)
df['delta t'] = df.index.to_series().diff().dt.total_seconds()
df['delta t'].iat[0] = 0
x_range = df['delta t'].cumsum().tolist()
y_range = df.iloc[:, col_num].tolist()
class ExternalBase(object):

def __init__(self, path: Path, col_num: int):
x_range, y_range = self.read_csv_and_compute_delta_t(path, col_num)
# added to allow multi-year simulations
self.max_time = 0
x_range.append(x_range[-1] + (x_range[-1] - x_range[-2]))
y_range.append(y_range[0])
self.max_time = x_range[-1]
self._interp_values = Interpolator1D(x_range, y_range)

@staticmethod
def read_csv_and_compute_delta_t(file_path: Path, col_num: int) -> [list, list]:
delta_t_values = []
y_values = []
last_timestamp = None

# Open CSV file and read data
with open(file_path, 'r') as csvfile:
csvreader = csv.reader(csvfile)
next(csvreader) # Read header

for row in csvreader:
timestamp_str = row[0]

# Convert timestamp string to datetime object
current_timestamp = None
time_stamp_formats = ['%Y-%m-%d %H:%M:%S', '%m/%d/%Y %H:%M', '%m/%d/%Y %H:%M:%S']
for t in time_stamp_formats:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pandas was happy to try some different time stamp formats it seems, because the actual run output was different from the unit test, and they both passed. I just had to add a few options here. I think it would be sufficient to modify the unit test to match the run output, and then we just have our specific format. But this is also fine.

try:
current_timestamp = datetime.strptime(timestamp_str, t)
break
except ValueError:
continue
if not current_timestamp:
raise ValueError(f"Bad timestamp format for timestamp: {timestamp_str}")

if last_timestamp is not None:
# Calculate delta t in seconds
delta_t = (current_timestamp - last_timestamp).total_seconds()
delta_t_values.append(delta_t)

# Append value to y_values
y_values.append(row[col_num + 1])

last_timestamp = current_timestamp

# Initialize x_range with cumulative sum of delta_t_values
x_range = [0]
cumulative_sum = 0
for delta_t in delta_t_values:
cumulative_sum += delta_t
x_range.append(cumulative_sum)

# Append an extrapolated value to x_range
x_range.append(x_range[-1] + (x_range[-1] - x_range[-2]))

# Append the first y_value to y_range
y_values.append(y_values[0])

self._interp_values = interp1d(x_range, y_range)
return x_range, y_values

def get_value(self, time) -> float:
return float(self._interp_values(time % self.max_time))
return float(self._interp_values.interpolate(time % self.max_time))
Loading
Loading