-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Second round of updates #85
Changes from all commits
141ebdd
de819fa
f48ac14
a4521d5
0945bce
3384582
b4e25b7
0f246da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,36 +1,36 @@ | ||
import os | ||
from abc import ABC, abstractmethod | ||
from pathlib import Path | ||
|
||
import numpy as np | ||
|
||
from glhe.utilities.functions import Interpolator1D, Interpolator1DFromFile | ||
from glhe.utilities.functions import Interpolator2DFromFile | ||
|
||
join = os.path.join | ||
norm = os.path.normpath | ||
cwd = os.getcwd() | ||
from glhe.utilities.functions import InterpolatorBase, Interpolator1D, Interpolator1DFromFile, Interpolator2DFromFile | ||
|
||
|
||
class BaseAgg(ABC): | ||
|
||
def __init__(self, inputs: dict): | ||
# g-function values | ||
if 'g-function-path' in inputs: | ||
path_g = norm(join(cwd, inputs['g-function-path'])) | ||
self.interp_g = Interpolator1DFromFile(path_g) | ||
p = Path(inputs['g-function-path']) | ||
if not p.is_absolute(): | ||
p = Path.cwd() / inputs['g-function-path'] | ||
self.interp_g = Interpolator1DFromFile(p) | ||
elif 'lntts' and 'g-values' in inputs: | ||
data_g = np.transpose(np.array([inputs['lntts'], inputs['g-values']])) | ||
self.interp_g = Interpolator1D(data_g[:, 0], data_g[:, 1]) | ||
self.interp_g: InterpolatorBase = Interpolator1D(data_g[:, 0], data_g[:, 1]) | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mimicking some polymorphism here by trying to convince the IDE to treat this as a base class. No functional effect for Python itself...just trying to help with autocompletion. |
||
raise KeyError('g-function data not found.') | ||
|
||
# g_b-function values | ||
self.interp_g_b = None | ||
if 'g_b-function-path' in inputs: | ||
p = Path(inputs['g_b-function-path']) | ||
if not p.is_absolute(): | ||
p = Path.cwd() / inputs['g_b-function-path'] | ||
if 'g_b-flow-rates' in inputs: | ||
self.interp_g_b = Interpolator2DFromFile(inputs['g_b-function-path'], inputs['g_b-flow-rates']) | ||
self.interp_g_b: InterpolatorBase = Interpolator2DFromFile(p, inputs['g_b-flow-rates']) | ||
else: | ||
self.interp_g_b = Interpolator1DFromFile(inputs['g_b-function-path']) | ||
self.interp_g_b: InterpolatorBase = Interpolator1DFromFile(p) | ||
elif 'lntts_b' and 'g_b-values' in inputs: | ||
data_g_b = np.transpose(np.array([inputs['lntts_b'], inputs['g_b-values']])) | ||
self.interp_g_b = Interpolator1D(data_g_b[:, 0], data_g_b[:, 1]) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,16 +23,14 @@ def __init__(self, inputs: dict): | |
self.sub_hr = SubHour(inputs) | ||
|
||
# set expansion rate. apply default if needed. | ||
try: | ||
self.exp_rate = 1.62 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor change, which I think should get rid of a few |
||
if 'expansion-rate' in inputs: | ||
self.exp_rate = inputs['expansion-rate'] | ||
except KeyError: # pragma: no cover | ||
self.exp_rate = 1.62 # pragma: no cover | ||
|
||
# set the number of bins per level. apply default if needed. | ||
try: | ||
self.bins_per_level = 9 | ||
if 'number-bins-per-level' in inputs: | ||
self.bins_per_level = inputs['number-bins-per-level'] | ||
except KeyError: # pragma: no cover | ||
self.bins_per_level = 9 # pragma: no cover | ||
|
||
# total simulation runtime to make available for method | ||
run_time = inputs['runtime'] | ||
|
@@ -47,17 +45,13 @@ def __init__(self, inputs: dict): | |
t = SEC_IN_HOUR | ||
|
||
# initialize the dynamic method | ||
initialized = False | ||
while not initialized: | ||
while True: | ||
for _ in range(self.bins_per_level): | ||
t += dt | ||
self.energy = np.insert(self.energy, 0, 0) | ||
self.dts = np.insert(self.dts, 0, dt) | ||
|
||
if t >= run_time: | ||
initialized = True | ||
break | ||
|
||
return | ||
dt *= self.exp_rate | ||
|
||
def aggregate(self, time: int, energy: float): | ||
|
@@ -111,22 +105,20 @@ def calc_temporal_superposition(self, time_step: int, flow_rate: float = None) - | |
if not flow_rate: | ||
g_b = self.interp_g_b.interpolate(lntts) | ||
else: | ||
g_b = np.flipud(self.interp_g_b(lntts, flow_rate)) | ||
# TODO: This is not covered by tests, and may need adjusting the interpolator class to work properly | ||
g_b = np.flipud(self.interp_g_b.interpolate(lntts, flow_rate)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still leaving this for now. I think the best thing would be to work up a unit test that exercises it, see it fail, and then fix it :) |
||
return float(np.dot(dq, g)), float(np.dot(dq, g_b)) | ||
else: | ||
# convolution for "g" g-functions only | ||
return float(np.dot(dq, g)) | ||
|
||
def get_g_value(self, time_step: int) -> float: | ||
lntts = np.log(time_step / self.ts) | ||
return float(self.interp_g.interpolate(lntts)) | ||
return self.interp_g.interpolate(lntts) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now, |
||
|
||
def get_g_b_value(self, time_step: int, flow_rate: float = None) -> float: | ||
lntts = np.log(time_step / self.ts) | ||
if not flow_rate: | ||
return float(self.interp_g_b.interpolate(lntts)) | ||
else: | ||
return float(self.interp_g_b.interpolate(lntts, flow_rate)) | ||
return self.interp_g_b.interpolate(lntts, flow_rate) | ||
|
||
def get_q_prev(self) -> float: | ||
return float(self.sub_hr.energy[-1] / self.sub_hr.dts[-1]) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,13 +63,14 @@ def aggregate(self, time: int, energy: float): | |
# store whatever rolls off of the sub-hourly method | ||
|
||
# need to think about this | ||
# TODO: If this is returning early, what about the prev_update_time assignments below? | ||
self.energy[-1] += e_1 | ||
return | ||
else: | ||
# aggregate | ||
# TODO: The next two lines are showing as unused in Pycharm, is this IF block needed then? | ||
# dts_flip = np.flipud(self.dts) | ||
# vals, idxs, cnts = np.unique(dts_flip, return_counts=True, return_index=True) | ||
# vals, indexes, counts = np.unique(dts_flip, return_counts=True, return_index=True) | ||
self.energy[-1] += e_1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This little block of code could use a bit more quality time. I had already commented out the two unused lines, but now I'm not sure if we should collapse it down further. |
||
|
||
# numpy split will do a lot of work too | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
from json import loads | ||
import os | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like another spot to get rid of |
||
from pathlib import Path | ||
|
||
from jsonschema import SchemaError, ValidationError, validate | ||
|
||
|
@@ -8,7 +10,7 @@ | |
|
||
class InputProcessor: | ||
|
||
def __init__(self, json_input_path: str): | ||
def __init__(self, json_input_path: Path): | ||
""" | ||
Initialize the input processor, process input file, and store the input information. | ||
|
||
|
@@ -18,11 +20,11 @@ def __init__(self, json_input_path: str): | |
""" | ||
|
||
# check if file exists | ||
if not os.path.exists(json_input_path): | ||
raise FileNotFoundError("Input file: '{}' does not exist.".format(json_input_path)) | ||
if not json_input_path.exists(): | ||
raise FileNotFoundError(f"Input file: '{json_input_path}' does not exist.") | ||
|
||
# load the input file | ||
self.input_dict = lower_obj(load_json(json_input_path)) | ||
self.input_dict: dict = lower_obj(loads(json_input_path.read_text())) | ||
|
||
# validate the inputs | ||
self.validate_inputs(self.input_dict) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,19 @@ | ||
import datetime as dt | ||
import os | ||
from os.path import join, normpath | ||
|
||
import pandas as pd | ||
from pathlib import Path | ||
|
||
|
||
class OutputProcessor: | ||
|
||
def __init__(self, output_dir: str, output_name: str): | ||
def __init__(self, output_dir: Path, output_name: str): | ||
""" | ||
Output processor manages output data | ||
""" | ||
|
||
self.output_dir = output_dir | ||
self.output_file = output_name | ||
self.write_path = normpath(join(output_dir, output_name)) | ||
self.df = pd.DataFrame() | ||
self.write_path = output_dir / output_name | ||
# self.df = pd.DataFrame() | ||
self.output_data = [] | ||
self.idx_count = 0 | ||
|
||
def collect_output(self, data_dict: dict) -> None: | ||
|
@@ -24,30 +22,40 @@ def collect_output(self, data_dict: dict) -> None: | |
|
||
:param data_dict: dictionary of data to be logged | ||
""" | ||
|
||
df_temp = pd.DataFrame(data_dict, index=[self.idx_count]) | ||
self.df = pd.concat([self.df, df_temp], axis=0, sort=True) | ||
self.idx_count += 1 | ||
self.output_data.append(data_dict) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The output processor could use some love. It needs a round of efficiency upgrading, but it does seem to be working happily without pandas, which is the right step. |
||
# df_temp = pd.DataFrame(data_dict, index=[self.idx_count]) | ||
# self.df = pd.concat([self.df, df_temp], axis=0, sort=True) | ||
# self.idx_count += 1 | ||
|
||
def write_to_file(self) -> None: | ||
""" | ||
Write the DataFrame holding the simulation data to a file. | ||
""" | ||
if os.path.exists(self.write_path): | ||
os.remove(self.write_path) | ||
if self.write_path.exists(): | ||
self.write_path.unlink() | ||
|
||
header_row = ['Date/Time'] | ||
header_row.extend([key for key in sorted(self.output_data[0].keys()) if key != 'Elapsed Time [s]']) | ||
|
||
self.convert_time_to_timestamp() | ||
self.df.to_csv(self.write_path) | ||
time_stamps = self.convert_time_to_timestamp() | ||
with self.write_path.open('w') as f: | ||
for i, d in enumerate(self.output_data): | ||
if i == 0: | ||
f.write(','.join(header_row) + '\n') | ||
row = [time_stamps[i]] | ||
sorted_values_without_time = [str(d[key]) for key in sorted(d.keys()) if key != 'Elapsed Time [s]'] | ||
row.extend(sorted_values_without_time) | ||
f.write(','.join(row) + '\n') | ||
|
||
def convert_time_to_timestamp(self) -> None: | ||
def convert_time_to_timestamp(self) -> list[str]: | ||
""" | ||
Convert the 'Elapsed Time' column to a standardized date/time format. | ||
""" | ||
try: | ||
dts = [dt.timedelta(seconds=x) for x in self.df['Elapsed Time [s]'].values.tolist()] | ||
raw_dts = [d['Elapsed Time [s]'] for d in self.output_data] | ||
dts = [dt.timedelta(seconds=x) for x in raw_dts] | ||
start_time = dt.datetime(year=dt.datetime.now().year, month=1, day=1, hour=0, minute=0) | ||
time_stamps = [start_time + x for x in dts] | ||
self.df['Date/Time'] = time_stamps | ||
self.df.set_index('Date/Time', inplace=True) | ||
time_stamps = [str(start_time + x) for x in dts] | ||
return time_stamps | ||
except KeyError: | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,69 @@ | ||
import pandas as pd | ||
from scipy.interpolate import interp1d | ||
from pathlib import Path | ||
|
||
import csv | ||
from datetime import datetime | ||
from glhe.utilities.functions import Interpolator1D | ||
|
||
class ExternalBase(object): | ||
|
||
def __init__(self, path, col_num): | ||
|
||
df = pd.read_csv(path, index_col=0, parse_dates=True) | ||
df['delta t'] = df.index.to_series().diff().dt.total_seconds() | ||
df['delta t'].iat[0] = 0 | ||
x_range = df['delta t'].cumsum().tolist() | ||
y_range = df.iloc[:, col_num].tolist() | ||
class ExternalBase(object): | ||
|
||
def __init__(self, path: Path, col_num: int): | ||
x_range, y_range = self.read_csv_and_compute_delta_t(path, col_num) | ||
# added to allow multi-year simulations | ||
self.max_time = 0 | ||
x_range.append(x_range[-1] + (x_range[-1] - x_range[-2])) | ||
y_range.append(y_range[0]) | ||
self.max_time = x_range[-1] | ||
self._interp_values = Interpolator1D(x_range, y_range) | ||
|
||
@staticmethod | ||
def read_csv_and_compute_delta_t(file_path: Path, col_num: int) -> [list, list]: | ||
delta_t_values = [] | ||
y_values = [] | ||
last_timestamp = None | ||
|
||
# Open CSV file and read data | ||
with open(file_path, 'r') as csvfile: | ||
csvreader = csv.reader(csvfile) | ||
next(csvreader) # Read header | ||
|
||
for row in csvreader: | ||
timestamp_str = row[0] | ||
|
||
# Convert timestamp string to datetime object | ||
current_timestamp = None | ||
time_stamp_formats = ['%Y-%m-%d %H:%M:%S', '%m/%d/%Y %H:%M', '%m/%d/%Y %H:%M:%S'] | ||
for t in time_stamp_formats: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pandas was happy to try some different time stamp formats it seems, because the actual run output was different from the unit test, and they both passed. I just had to add a few options here. I think it would be sufficient to modify the unit test to match the run output, and then we just have our specific format. But this is also fine. |
||
try: | ||
current_timestamp = datetime.strptime(timestamp_str, t) | ||
break | ||
except ValueError: | ||
continue | ||
if not current_timestamp: | ||
raise ValueError(f"Bad timestamp format for timestamp: {timestamp_str}") | ||
|
||
if last_timestamp is not None: | ||
# Calculate delta t in seconds | ||
delta_t = (current_timestamp - last_timestamp).total_seconds() | ||
delta_t_values.append(delta_t) | ||
|
||
# Append value to y_values | ||
y_values.append(row[col_num + 1]) | ||
|
||
last_timestamp = current_timestamp | ||
|
||
# Initialize x_range with cumulative sum of delta_t_values | ||
x_range = [0] | ||
cumulative_sum = 0 | ||
for delta_t in delta_t_values: | ||
cumulative_sum += delta_t | ||
x_range.append(cumulative_sum) | ||
|
||
# Append an extrapolated value to x_range | ||
x_range.append(x_range[-1] + (x_range[-1] - x_range[-2])) | ||
|
||
# Append the first y_value to y_range | ||
y_values.append(y_values[0]) | ||
|
||
self._interp_values = interp1d(x_range, y_range) | ||
return x_range, y_values | ||
|
||
def get_value(self, time) -> float: | ||
return float(self._interp_values(time % self.max_time)) | ||
return float(self._interp_values.interpolate(time % self.max_time)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern is in the code pretty much anywhere we are reading a path from the input dict. I wonder if there is a better way to do it.
And after taking a quick look, it looks like the Path's
resolve()
method will handle this. If it's relative, it will applycwd()
to make it absolute. And if it's absolute it will leave it. And it will walk through links and stuff. I'll fix that up.TIL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I try to
resolve()
any pathlib object at instantiation, and then it all just works. This was all developed before I was a pathlib convert. 🙏