Skip to content

Commit

Permalink
Merge pull request #691 from AIStream-Peelout/finish_series_id_9
Browse files Browse the repository at this point in the history
Finish series_id
  • Loading branch information
isaacmg authored Nov 1, 2023
2 parents b35e808 + b898578 commit 8dd3826
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ jobs:
name: Model basic tests
when: always
command: |
coverage run -m unittest -v tests/test_series_id.py
coverage run -m unittest -v tests/test_squashed.py
bash <(curl -s https://codecov.io/bash) -cF python
coverage run -m unittest -v tests/test_attn.py
Expand All @@ -175,7 +176,6 @@ jobs:
bash <(curl -s https://codecov.io/bash) -cF python
coverage run -m unittest -v tests/pytorc_train_tests.py
bash <(curl -s https://codecov.io/bash) -cF python
coverage run -m unittest -v tests/test_series_id.py
coverage run -m unittest -v tests/test_classification2_loader.py
- run:
name: upload_stuff
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Deep learning for time series forecasting
![Example image](https://raw.githubusercontent.com/CoronaWhy/task-ts/master/images/Picture1.png)
Flow Forecast (FF) is an open-source deep learning for time series forecasting framework. It provides all the latest state of the art models (transformers, attention models, GRUs, ODEs) and cutting edge concepts with easy to understand interpretability metrics, cloud provider integration, and model serving capabilities. Flow Forecast was the first time series framework to feature support for transformer based models and remains the only true end-to-end deep learning for time series framework. Currently, [Task-TS from CoronaWhy](https://github.com/CoronaWhy/task-ts/wiki) primarily maintains this repository. Pull requests are welcome. Historically, this repository provided open source benchmark and codes for flash flood and river flow forecasting.
Flow Forecast (FF) is an open-source deep learning for time series forecasting framework. It provides all the latest state of the art models (transformers, attention models, GRUs, ODEs) and cutting edge concepts with easy to understand interpretability metrics, cloud provider integration, and model serving capabilities. Flow Forecast was the first time series framework to feature support for transformer based models and remains the only true end-to-end deep learning for time series framework. Currently, [Task-TS from CoronaWhy](https://github.com/CoronaWhy/task-ts/wiki) primarily maintains this repository. Pull requests are welcome. Historically, this repository provided open source benchmark and codes for flash flood and river flow forecasting.

For additional tutorials and examples please see our [tutorials repository](https://github.com/AIStream-Peelout/flow_tutorials).

Expand Down Expand Up @@ -34,6 +34,7 @@ Using the library
12. Vanilla GRU with optional probablistic output layer. Good for multivariate time series forecasting and classification.
13. DLinear and NLinear from the AAAI paper [Are Transformers Effective for Time Series Forecasting](https://arxiv.org/abs/2205.13504)
14. [Crossformer](https://openreview.net/forum?id=vSVLM2j9eie) from ICLR 2023
15. Anomaly Transformer

**Forthcoming Models**

Expand Down
13 changes: 10 additions & 3 deletions flood_forecast/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)
from flood_forecast.model_dict_function import decoding_functions
from flood_forecast.custom.custom_opt import MASELoss, GaussianLoss
from flood_forecast.preprocessing.pytorch_loaders import CSVTestLoader, TemporalTestLoader
from flood_forecast.preprocessing.pytorch_loaders import CSVTestLoader, TemporalTestLoader, SeriesIDTestLoader
from flood_forecast.time_model import TimeSeriesModel
from flood_forecast.utils import flatten_list_function
from flood_forecast.temporal_decoding import decoding_function
Expand Down Expand Up @@ -240,8 +240,11 @@ def infer_on_torch_model(
if "label_len" in model.params["model_params"]:
test_idx = model.params["model_params"]["label_len"] - model.params["dataset_params"]["forecast_length"]
csv_test_loader = TemporalTestLoader(model.params["dataset_params"]["temporal_feats"], input_dict, test_idx)
elif model.params["dataset_params"]["class"] == "CSVSeriesIDLoader":
print("CSVSeriesIDLoader not yet supported for inference. But is coming very soon.")
elif model.params["dataset_params"]["class"] == "SeriesIDLoader":
print("CSVSeriesIDLoader not yet supported for inference, but is coming very soon.")
series_id_col = model.params["dataset_params"]["series_id_col"]
csv_series_id_loader = SeriesIDTestLoader(series_id_col, dataset_params, "all")
handle_evaluation_series_loader(csv_series_id_loader, model, device, hours_to_forecast)
exit()
else:
csv_test_loader = CSVTestLoader(
Expand Down Expand Up @@ -328,6 +331,10 @@ def infer_on_torch_model(
)


def handle_evaluation_series_loader(csv_series_id_loader: SeriesIDTestLoader, model, device, hours_to_forecast):
pass


def handle_ci_multi(prediction_samples: torch.Tensor, csv_test_loader: CSVTestLoader, multi_params: int,
df_pred, decoder_param: bool, history_length: int, num_samples: int) -> List[pd.DataFrame]:
"""Handles the CI confidence interval
Expand Down
1 change: 1 addition & 0 deletions flood_forecast/pre_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@

interpolate_dict = {"back_forward": interpolate_missing_values, "back_forward_generic": back_forward_generic,
"forward_back_generic": forward_back_generic}
print("loaded dicts")
2 changes: 1 addition & 1 deletion flood_forecast/preprocessing/process_usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

def make_usgs_data(start_date: datetime, end_date: datetime, site_number: str) -> pd.DataFrame:
"""
"""
print('yes')
base_url = "https://nwis.waterdata.usgs.gov/usa/nwis/uv/?cb_00060=on&cb_00065&format=rdb&"
full_url = base_url + "site_no=" + site_number + "&period=&begin_date=" + \
start_date.strftime("%Y-%m-%d") + "&end_date=" + end_date.strftime("%Y-%m-%d")
Expand Down
63 changes: 55 additions & 8 deletions flood_forecast/preprocessing/pytorch_loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def __init__(
sort_column=None,
scaled_cols=None,
feature_params=None,
no_scale=False
no_scale=False,
preformatted_df=False

):
"""
Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(
self.forecast_length = forecast_length
print("interpolate should be below")
df = get_data(file_path)
print(df.columns)
relevant_cols3 = []
if sort_column:
df[sort_column] = df[sort_column].astype("datetime64[ns]")
Expand Down Expand Up @@ -183,11 +185,13 @@ def __init__(self, series_id_col: str, main_params: dict, return_method: str, re
self.return_all_series = return_all
self.unique_cols = self.original_df[series_id_col].dropna().unique().tolist()
df_list = []
self.df_orig_list = []
self.df = self.df.reset_index()
self.unique_dict = {}
print("The series id column is below:")
print(self.series_id_col)
for col in self.unique_cols:
self.df_orig_list.append(self.original_df[self.original_df[self.series_id_col] == col])
new_df = self.df[self.df[self.series_id_col] == col]
df_list.append(new_df)
print(new_df.columns)
Expand Down Expand Up @@ -276,19 +280,24 @@ def __init__(
print("CSV Path below")
print(df_path)
self.forecast_total = forecast_total
# TODO these are antiquated delete them
self.use_real_temp = use_real_temp
self.use_real_precip = use_real_precip
self.target_supplied = target_supplied
# Convert back to datetime and save index
sort_col1 = sort_column_clone if sort_column_clone else "datetime"
print("columns are: ")
print(self.original_df)
self.original_df[sort_col1] = self.original_df["datetime"].astype("datetime64[ns]")
self.original_df["original_index"] = self.original_df.index
if len(self.relevant_cols3) > 0:
self.original_df[self.relevant_cols3] = self.df[self.relevant_cols3]

def get_from_start_date(self, forecast_start: datetime):
dt_row = self.original_df[
self.original_df["datetime"] == forecast_start
def get_from_start_date(self, forecast_start: datetime, original_df=None):
if original_df is None:
original_df = self.original_df
dt_row = original_df[
original_df["datetime"] == forecast_start
]
revised_index = dt_row.index[0]
return self.__getitem__(revised_index - self.forecast_history)
Expand All @@ -310,7 +319,7 @@ def __getitem__(self, idx):
def convert_real_batches(self, the_col: str, rows_to_convert):
"""
A helper function to return properly divided precip and temp
values to be stacked with forecasted cfs.
values to be stacked with t forecasted cfs.
"""
the_column = torch.from_numpy(rows_to_convert[the_col].to_numpy())
chunks = [
Expand Down Expand Up @@ -347,6 +356,10 @@ def __len__(self) -> int:
)


class TestLoaderABC(CSVTestLoader):
pass


class AEDataloader(CSVDataLoader):
def __init__(
self,
Expand Down Expand Up @@ -384,7 +397,7 @@ def __init__(
:param forecast_history: [description], defaults to 1
:type forecast_history: int, optional
:param no_scale: [description], defaults to True
:type no_scale: bool, optional
:type no_scale: bool, optionals
:param sort_column: [description], defaults to None
:type sort_column: [type], optional
"""
Expand Down Expand Up @@ -582,11 +595,11 @@ def __getitem__(self, idx):
class VariableSequenceLength(CSVDataLoader):
def __init__(self, series_marker_column: str, csv_loader_params: Dict, pad_length=None, task="classification",
n_classes=9 + 90):
"""Enables easy loading of time-series with variable length data
"""Enables eas(ier) loading of time-series with variable length data
:param series_marker_column: The column that dealinates when an example begins and ends
:type series_marker_column: str
:param pad_length: If specified the length to truncate sequences at or pad them till that length
:param pad_length: If the specified the length to truncate sequences at or pad them till that length
:type pad_length: int
:param task: The specific task (e.g. classification, forecasting, auto_encode)
:type task: str
Expand Down Expand Up @@ -641,3 +654,37 @@ def pad_input_data(self, sequence: int):
def __getitem__(self, idx: int):
tasks = {"auto": self.get_item_auto_encoder, "classification": self.get_item_classification}
return tasks[self.task](idx)


class CSVResultsHolder(object):
def __init__(self, historical_rows, all_rows_orig, targ_idx) -> None:
self.historical_rows = historical_rows
self.all_rows_orig = all_rows_orig
self.targ_idx = targ_idx


class SeriesIDTestLoader(CSVSeriesIDLoader):
def __init__(self, series_id_col: str, main_params: dict, return_method: str, return_all=True, forecast_total=336):
"""_summary_
:param series_id_col: _de
:type series_id_col: str
:param main_params: _description_
:type main_params: dict
:param return_method: _description_
:type return_method: str
:param return_all: _description_, defaults to True
:type return_all: bool, optional
:param forecast_total: _description_, defaults to 336
:type forecast_total: int, optional
"""
super().__init__(series_id_col, main_params, return_method, return_all)
self.forecast_total = forecast_total
self.csv_test_loaders = [CSVTestLoader(loader_1, 336, **main_params) for loader_1 in self.df_orig_list]

def get_from_start_date_all(self, forecast_start: datetime, series_id: int = None):
res = []
for test_loader in self.csv_test_loaders:
test_loader.get_from_start_date(forecast_start, series_id)
res.append(test_loader)
return res
8 changes: 4 additions & 4 deletions flood_forecast/preprocessing/temporal_feats.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
def create_feature(key: str, value: str, df: pd.DataFrame, dt_column: str):
"""Function to create temporal feature. Uses dict to make val.
:param key: The datetime feature you would like to create
:type key: str 2
:param key: The datetime feature you would like to create from the datetime column
:type key: str
:param value: The type of feature you would like to create (cyclical or numerical)
:type value: str
:param df: The Pandas dataframe with the datetime
:param df: The Pandas dataframe with the datetime.
:type df: pd.DataFrame
:param dt_column: The name of the datetime column
:type dt_column: str
:return: The dataframe with the newly added column
:return: The dataframe with the newly added column.
:rtype: pd.DataFrame
"""
if key == "day_of_week":
Expand Down
14 changes: 13 additions & 1 deletion flood_forecast/pytorch_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@


def multi_crit(crit_multi: List, output, labels, valid=None):
"""_summary_
:param crit_multi: _description_
:type crit_multi: List
:param output: _description_
:type output: _type_
:param labels: _description_
:type labels: _type_
:param valid: _description_, defaults to None
:type valid: _type_, optional
:return: _description_
:rtype: _type_
"""
i = 0
loss = 0.0
for crit in crit_multi:
Expand Down Expand Up @@ -573,7 +586,6 @@ def compute_validation(validation_loader: DataLoader,
validation_dataset = validation_loader.dataset
for crit in criterion:
if validation_dataset.scale:
# Should this also do loss.item() stuff?
if len(src.shape) == 2:
src = src.unsqueeze(0)
src1 = src[:, :, 0:multi_targets]
Expand Down
6 changes: 2 additions & 4 deletions flood_forecast/series_id_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

def handle_csv_id_output(src: Dict[int, torch.Tensor], trg: Dict[int, torch.Tensor], model, criterion, opt,
random_sample: bool = False, n_targs: int = 1):
"""A helper function to better handle the output of models with a series_id and compute full loss,
"""A helper function to better handle the output of models with a series_id and compute full loss.
:param src: A dictionary of src sequences (partitioned by series_id)
:type src: torch.Tensor
Expand All @@ -15,8 +15,6 @@ def handle_csv_id_output(src: Dict[int, torch.Tensor], trg: Dict[int, torch.Tens
"""
total_loss = 0.00
for (k, v), (k2, v2) in zip(src.items(), trg.items()):
print("Shape of v below")
print(v.shape)
output = model.model(v, k)
loss = criterion(output, v2[:, :, :n_targs])
total_loss += loss.item()
Expand All @@ -31,7 +29,7 @@ def handle_csv_id_validation(src: Dict[int, torch.Tensor], trg: Dict[int, torch.
"""Function handles
:param src: _description_
:type src: Dict[int, torch.Tensor]
:type src: Dict[int, torchd
:param trg: _description_
:type trg: Dict[int, torch.Tensor]
:param model: _description_
Expand Down
2 changes: 1 addition & 1 deletion tests/config.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"model_name": "CustomTransformerDecoder", "model_type": "PyTorch", "model_params": {"seq_length": 11, "n_time_series": 9, "output_seq_length": 2, "n_layers_encoder": 3, "use_mask": true}, "dataset_params": {"class": "default", "training_path": "United_States__Florida__Palm_Beach_County.csv", "validation_path": "United_States__Florida__Palm_Beach_County.csv", "test_path": "United_States__Florida__Palm_Beach_County.csv", "forecast_test_len": 15, "batch_size": 20, "forecast_history": 11, "forecast_length": 2, "train_end": 61, "valid_start": 62, "valid_end": 88, "test_start": 61, "test_end": 90, "target_col": ["new_cases"], "relevant_cols": ["new_cases", "month", "weekday", "mobility_retail_recreation", "mobility_grocery_pharmacy", "mobility_parks", "mobility_transit_stations", "mobility_workplaces", "mobility_residential"], "scaler": "StandardScaler", "interpolate": false}, "training_params": {"criterion": "MSE", "optimizer": "Adam", "optim_params": {}, "lr": 0.0001, "epochs": 10, "batch_size": 20}, "GCS": true, "early_stopping": {"patience": 3}, "sweep": true, "wandb": false, "forward_params": {}, "metrics": ["MSE"], "inference_params": {"datetime_start": "2020-06-10", "hours_to_forecast": 15, "num_prediction_samples": 100, "test_csv_path": "United_States__Florida__Palm_Beach_County.csv", "decoder_params": {"decoder_function": "simple_decode", "unsqueeze_dim": 1}, "dataset_params": {"file_path": "United_States__Florida__Palm_Beach_County.csv", "forecast_history": 11, "forecast_length": 2, "relevant_cols": ["new_cases", "month", "weekday", "mobility_retail_recreation", "mobility_grocery_pharmacy", "mobility_parks", "mobility_transit_stations", "mobility_workplaces", "mobility_residential"], "target_col": ["new_cases"], "scaling": "StandardScaler", "interpolate_param": false}}, "weight_path": "/content/github_aistream-peelout_flow-forecast/29_June_202009_26AM_model.pth", "run": [{"epoch": 0, "train_loss": "0.6654510299364725", "validation_loss": "1.1439250165765935"}, {"epoch": 1, "train_loss": "0.7072166502475739", "validation_loss": "1.095255504954945"}, {"epoch": 2, "train_loss": "0.5650965571403503", "validation_loss": "1.2630093747919255"}, {"epoch": 3, "train_loss": "0.504930337270101", "validation_loss": "1.8901219367980957"}, {"epoch": 4, "train_loss": "0.49202097455660504", "validation_loss": "2.055953329259699"}], "epochs": 0}
{"model_name": "CustomTransformerDecoder", "model_type": "PyTorch", "model_params": {"seq_length": 11, "n_time_series": 9, "output_seq_length": 2, "n_layers_encoder": 3, "use_mask": true}, "dataset_params": {"class": "default", "training_path": "United_States__Florida__Palm_Beach_County.csv", "validation_path": "United_States__Florida__Palm_Beach_County.csv", "test_path": "United_States__Florida__Palm_Beach_County.csv", "forecast_test_len": 15, "batch_size": 20, "forecast_history": 11, "forecast_length": 2, "train_end": 61, "valid_start": 62, "valid_end": 88, "test_start": 61, "test_end": 90, "target_col": ["new_cases"], "relevant_cols": ["new_cases", "month", "weekday", "mobility_retail_recreation", "mobility_grocery_pharmacy", "mobility_parks", "mobility_transit_stations", "mobility_workplaces", "mobility_residential"], "scaler": "StandardScaler", "interpolate": false}, "training_params": {"criterion": "MSE", "optimizer": "Adam", "optim_params": {}, "lr": 0.0001, "epochs": 10, "batch_size": 19}, "GCS": true, "early_stopping": {"patience": 3}, "sweep": true, "wandb": false, "forward_params": {}, "metrics": ["MSE"], "inference_params": {"datetime_start": "2020-06-10", "hours_to_forecast": 15, "num_prediction_samples": 100, "test_csv_path": "United_States__Florida__Palm_Beach_County.csv", "decoder_params": {"decoder_function": "simple_decode", "unsqueeze_dim": 1}, "dataset_params": {"file_path": "United_States__Florida__Palm_Beach_County.csv", "forecast_history": 11, "forecast_length": 2, "relevant_cols": ["new_cases", "month", "weekday", "mobility_retail_recreation", "mobility_grocery_pharmacy", "mobility_parks", "mobility_transit_stations", "mobility_workplaces", "mobility_residential"], "target_col": ["new_cases"], "scaling": "StandardScaler", "interpolate_param": false}}, "weight_path": "/content/github_aistream-peelout_flow-forecast/29_June_202009_26AM_model.pth", "run": [{"epoch": 0, "train_loss": "0.6654510299364725", "validation_loss": "1.1439250165765935"}, {"epoch": 1, "train_loss": "0.7072166502475739", "validation_loss": "1.095255504954945"}, {"epoch": 2, "train_loss": "0.5650965571403503", "validation_loss": "1.2630093747919255"}, {"epoch": 3, "train_loss": "0.504930337270101", "validation_loss": "1.8901219367980957"}, {"epoch": 4, "train_loss": "0.49202097455660504", "validation_loss": "2.055953329259699"}], "epochs": 0}
Loading

0 comments on commit 8dd3826

Please sign in to comment.