diff --git a/.gitignore b/.gitignore index 108a4fa..834af2e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ app/output/*.txt -app/_models/ venv/ __pycache__/ diff --git a/app/_models/__init__.py b/app/_models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/_models/abstract_model.py b/app/_models/abstract_model.py new file mode 100644 index 0000000..ee29dfb --- /dev/null +++ b/app/_models/abstract_model.py @@ -0,0 +1,28 @@ +from abc import ABC, abstractmethod +from typing import ForwardRef, List + +import pandas as pd + +from app.data_formats import APIPrediction + +ModelFeatures = ForwardRef('ModelFeatures') + + +class Model(ABC): + """An abstract base class for ML models.""" + + @abstractmethod + def __getstate__(self): ... + + @abstractmethod + def __setstate__(self, state): ... + + @property + @abstractmethod + def supported_zones(self): ... + + @abstractmethod + def train(self, training_data: pd.DataFrame) -> None: ... + + @abstractmethod + def predict(self, data: ModelFeatures) -> List[APIPrediction]: ... \ No newline at end of file diff --git a/app/_models/deep_hong.py b/app/_models/deep_hong.py new file mode 100644 index 0000000..c1fd465 --- /dev/null +++ b/app/_models/deep_hong.py @@ -0,0 +1,155 @@ +import logging +import sys +from typing import ForwardRef, List, Mapping, MutableMapping + +import numpy as np +import pandas as pd +from pydantic import BaseModel, conlist, validate_arguments, validator +from sklearn.neural_network import MLPRegressor + +from app._models.abstract_model import Model, ModelFeatures +from app.constants import DAY_OF_WEEK, HOURS_END, HOURS_START, UNENFORCED_DAYS +from app.data_formats import APIPredictionRequest + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.INFO) +if sys.stdout.isatty(): + LOGGER.addHandler(logging.StreamHandler(sys.stdout)) + + +TOTAL_SEMIHOURS = 2 * (HOURS_END - HOURS_START).hours + abs(HOURS_END.minute - HOURS_START.minute) // 30 +TOTAL_ENFORCEMENT_DAYS = 7 - len(UNENFORCED_DAYS) + + +ModelFeaturesv0EarlyAccessPreRelease = ForwardRef('ModelFeaturesv0EarlyAccessPreRelease') + + +class ModelFeaturesv0EarlyAccessPreRelease(BaseModel): + zone_id: str + semihour_onehot: conlist(int, + min_items=TOTAL_SEMIHOURS - 1, + max_items=TOTAL_SEMIHOURS - 1) + dayofweek_onehot: conlist(int, + min_items=TOTAL_ENFORCEMENT_DAYS - 1, + max_items=TOTAL_ENFORCEMENT_DAYS - 1) + + @validator('semihour_onehot', 'dayofweek_onehot') + def one_hot_encoded(cls, feature): + number_of_ones = (np.array(feature) == 1).sum() + number_of_zeros = (np.array(feature) == 0).sum() + assert number_of_ones + number_of_zeros == len(feature), \ + 'Input should consist solely of 0s and 1s.' + assert number_of_ones <= 1, \ + 'Input must contain at most one 1.' + return feature + + @staticmethod + @validate_arguments + def from_request(request: APIPredictionRequest) -> List[ModelFeatures]: + """ + Convert a prediction request to the input format expected by a parking + availability model. + + Parameters + ---------- + request : APIPredictionRequest + The prediction request to transform into model features + + Returns + ------- + list of ModelFeaturesv0EarlyAccessPreRelease + A set of features that can be passed into the `predict` method of a + `ParkingAvailabilityModelv0EarlyAccessPreRelease`. + """ + timestamp = request.timestamp + + hour_onehot = np.array([timestamp.hour == hour for hour in range(8, 22)]) + minute_bin_onehot = np.array([30 * minute_bin <= timestamp.minute < 30 * (minute_bin + 1) for minute_bin in range(2)]) + semihour_onehot = (hour_onehot[:, None] & np.array(minute_bin_onehot)).flatten()[1:] + + enforcement_days = [day.value for day in DAY_OF_WEEK if day not in UNENFORCED_DAYS] + dayofweek_onehot = np.array([ + enforcement_days.index(timestamp.weekday()) == day_index + for day_index in range(TOTAL_ENFORCEMENT_DAYS) + ])[1:] + + return [ModelFeaturesv0EarlyAccessPreRelease(zone_id=zone_id, + semihour_onehot=semihour_onehot.astype(int).tolist(), + dayofweek_onehot=dayofweek_onehot.astype(int).tolist()) + for zone_id in request.zone_ids] + + +ModelFeaturesv0EarlyAccessPreRelease.update_forward_refs() + + +class ParkingAvailabilityModelv0EarlyAccessPreRelease(Model): + def __init__(self): + super().__init__() + self._zone_models: MutableMapping[str, MLPRegressor] = {} + self._supported_zones = [] + + def __getstate__(self): + return { + 'zone_models': self._zone_models, + 'supported_zones': self.supported_zones + } + + def __setstate__(self, state): + self._zone_models = state['zone_models'] + self._supported_zones = state['supported_zones'] + + @property + def supported_zones(self): + return self._supported_zones + + def train(self, training_data: pd.DataFrame) -> None: + zone_id, X, y = ( + training_data + .assign( + available_rate=lambda df: 1 - df.occu_cnt_rate, + dayofweek=lambda df: df.semihour.dt.dayofweek.astype('category'), + semihour=lambda df: pd.Series( + zip(df.semihour.dt.hour, df.semihour.dt.minute), + dtype='category', index=df.index + ) + ) + .pipe( + lambda df: ( + df.zone_id, + pd.get_dummies(df.loc[:, ['semihour', 'dayofweek']], + drop_first=True), + df.available_rate + ) + ) + ) + + self._supported_zones = zone_id.unique() + for zone in self.supported_zones: + LOGGER.info(f'Processing zone {zone}') + X_cluster = X[zone_id == zone] + y_cluster = y[zone_id == zone] + + LOGGER.info(f'Total (row, col) counts: {X_cluster.shape}') + mlp = MLPRegressor(hidden_layer_sizes=(50, 50), activation='relu') + mlp.fit(X_cluster, y_cluster) + if np.issubdtype(type(zone), np.float64): + zone = str(int(zone)) + self._zone_models[zone] = mlp + + LOGGER.info(f'Successfully trained {len(self._zone_models)} models') + + @validate_arguments + def predict(self, samples_batch: List[ModelFeaturesv0EarlyAccessPreRelease]) -> Mapping[str, float]: + requested_zone_ids = [sample.zone_id for sample in samples_batch + if sample.zone_id in self.supported_zones] + + regressor_feature_array = np.asarray([ + sample.semihour_onehot + sample.dayofweek_onehot + for sample in samples_batch + ]) + return { + zone_id: self._zone_models[zone_id] + .predict(regressor_feature_array) + .clip(0, 1)[0] + for zone_id in requested_zone_ids + } \ No newline at end of file diff --git a/app/_models/moving_average.py b/app/_models/moving_average.py new file mode 100644 index 0000000..24ff6f2 --- /dev/null +++ b/app/_models/moving_average.py @@ -0,0 +1,175 @@ +import logging +import sys +from datetime import datetime +from typing import ForwardRef, List, Mapping + +import numpy as np +import pandas as pd +from pydantic import BaseModel, constr, validate_arguments + +from app._models.abstract_model import Model +from app.data_formats import APIPredictionRequest + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.INFO) +if sys.stdout.isatty(): + LOGGER.addHandler(logging.StreamHandler(sys.stdout)) + +AverageFeatures = ForwardRef('AverageFeatures') + + +class AverageFeatures(BaseModel): + zone_id: constr(min_length=1) + at: datetime + + @staticmethod + @validate_arguments + def from_request(request: APIPredictionRequest) -> List[AverageFeatures]: + f""" + Convert a prediction request to the input format expected by a parking + availability model. + + Parameters + ---------- + request : APIPredictionRequest + The prediction request to transform into model features + + Returns + ------- + list of AverageFeatures + A set of features that can be passed into the `predict` method of a + `AverageFeatures`. + """ + return [AverageFeatures(zone_id=zone_id, at=request.timestamp) + for zone_id in request.zone_ids] + + +AverageFeatures.update_forward_refs() + + +class AvailabilityAverager(Model): + def __init__(self, weeks_to_average=1): + super().__init__() + self._supported_zones: List[str] = [] + self._weeks_to_average = weeks_to_average + + def __eq__(self, other): + if not isinstance(other, AvailabilityAverager): + return False + rolling_average_ids = set(self._rolling_averages.keys()) + other_rolling_average_ids = set(other._rolling_averages.keys()) + return ( + (rolling_average_ids == other_rolling_average_ids) and + all( + ( + self._rolling_averages[key] == other._rolling_averages[key] + ).all().all() + for key in rolling_average_ids + ) and + (self.supported_zones == other.supported_zones) and + (self.weeks_to_average == other.weeks_to_average) + ) + + def __getstate__(self): + return { + 'rolling_averages': self._rolling_averages, + 'supported_zones': self.supported_zones, + 'weeks_to_average': self.weeks_to_average + } + + def __setstate__(self, state): + self._rolling_averages = state['rolling_averages'] + self._supported_zones = state['supported_zones'] + self._weeks_to_average = state['weeks_to_average'] + + @property + def supported_zones(self) -> List[str]: + return self._supported_zones + + @property + def weeks_to_average(self) -> int: + return self._weeks_to_average + + def train(self, training_data: pd.DataFrame) -> None: + training_data = training_data.assign( + available_rate=lambda df: 1 - df.occu_cnt_rate, + dayofweek=lambda df: df.semihour.dt.dayofweek.astype('category'), + semihour_tuples=lambda df: pd.Series( + zip(df.semihour.dt.hour, df.semihour.dt.minute), + dtype='category', index=df.index + ) + ) + LOGGER.info(f"At start, {len(training_data)}") + training_data = training_data.sort_values( + ['zone_id', 'dayofweek', 'semihour_tuples'] + ) + LOGGER.info(f"Sorted, {len(training_data)}") + training_data[f'available_rate_{self.weeks_to_average:0>2}w'] = ( + training_data.groupby( + ['zone_id', 'dayofweek', 'semihour_tuples'] + ).available_rate.transform( + lambda group: group.shift().rolling(self.weeks_to_average, 1).mean() + ).dropna().clip(0, 1) + ) + LOGGER.info(f"Grouped?, {len(training_data)}") + training_data = training_data[ + [ + 'zone_id', 'semihour', 'semihour_tuples', 'dayofweek', + f'available_rate_{self.weeks_to_average:0>2}w' + ] + ].dropna() + LOGGER.info(f"Dropped, {len(training_data)}") + self._rolling_averages = { + zone_day_time: data_in_zone_on_day_at_time + for zone_day_time, data_in_zone_on_day_at_time + in training_data.groupby(['zone_id', 'dayofweek', + 'semihour_tuples']) + } + + def _get_zone_from_key(key): + zone_id, _d, _s = key + return zone_id + + zone_list = list(map(_get_zone_from_key, self._rolling_averages.keys())) + unique_zone_list = np.unique(np.array(zone_list)).tolist() + self._supported_zones = unique_zone_list + + + # @validate_arguments + def predict(self, samples_batch: List[AverageFeatures]) -> Mapping[str, float]: + valid_requests = [sample for sample in samples_batch + if sample.zone_id in self.supported_zones] + # LOGGER.info(f'Samples, {len(samples_batch)}') + # LOGGER.info(f'Valid, {len(valid_requests)}') + predictions = {} + for sample in valid_requests: + sample_timestamp = pd.Timestamp(sample.at) + sample_semihour_tuple = ( + sample_timestamp.hour, + 30 * (sample_timestamp.minute // 30) + ) + rolling_averages = self._rolling_averages.get(( + sample.zone_id, + sample_timestamp.dayofweek, + sample_semihour_tuple + ), pd.DataFrame()) + + if rolling_averages.empty: + # LOGGER.warn(f'Sample {sample.zone_id} empty') + # LOGGER.warn(f'Test {test}') + continue + + rolling_averages = rolling_averages.assign( + date_diff=lambda df: -( + df.semihour.dt.date - sample_timestamp.date() + ) + ).loc[ + lambda df: df.date_diff == df.date_diff.min() + ] + + try: + predictions[sample.zone_id] = rolling_averages[f'available_rate_{self.weeks_to_average:0>2}w'].iloc[0] + except IndexError: + continue + + return predictions diff --git a/app/_models/prophets.py b/app/_models/prophets.py new file mode 100644 index 0000000..8743cdd --- /dev/null +++ b/app/_models/prophets.py @@ -0,0 +1,181 @@ +import logging +import sys +from datetime import datetime +from itertools import starmap +from typing import ForwardRef, List, Mapping, MutableMapping + +import pandas as pd +from fbprophet import Prophet +from fbprophet.serialize import model_from_json, model_to_json +from pydantic import BaseModel, constr, validate_arguments +from tqdm import tqdm + +from app._models.abstract_model import Model +from app.data_formats import APIPredictionRequest + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.INFO) +if sys.stdout.isatty(): + LOGGER.addHandler(logging.StreamHandler(sys.stdout)) + + +ProphetableFeatures = ForwardRef('ProphetableFeatures') + + +class ProphetableFeatures(BaseModel): + zone_id: constr(min_length=1) + at: datetime + + @staticmethod + @validate_arguments + def from_request(request: APIPredictionRequest) -> List[ProphetableFeatures]: + f""" + Convert a prediction request to the input format expected by a parking + availability model. + + Parameters + ---------- + request : APIPredictionRequest + The prediction request to transform into model features + + Returns + ------- + list of ProphetableFeatures + A set of features that can be passed into the `predict` method of a + `ProphetableFeatures`. + """ + return [ProphetableFeatures(zone_id=zone_id, at=request.timestamp) + for zone_id in request.zone_ids] + + +ProphetableFeatures.update_forward_refs() + + +class ParkingProphet(Model): + def __init__(self): + super().__init__() + self._zone_clusterer: Mapping[str, str] = lambda zone_id: zone_id + self._cluster_models: MutableMapping[str, Prophet] = {} + self._zone_models: MutableMapping[str, Prophet] = {} + self._supported_zones: List[str] = [] + + def __getstate__(self): + def _to_json(name, model): + return name, model_to_json(model) + + serializable_cluster_models = dict(starmap(_to_json, self._cluster_models.items())) + serializable_zone_models = dict(starmap(_to_json, self._zone_models.items())) + + return { + 'zone_clusterer': self._zone_clusterer, + 'cluster_models': serializable_cluster_models, + 'zone_models': serializable_zone_models, + 'supported_zones': self.supported_zones + } + + def __setstate__(self, state): + def _from_json(name, jsonny): + return name, model_from_json(jsonny) + + deserialized_cluster_models = dict(starmap(_from_json, state['cluster_models'].items())) + deserialized_zone_models = dict(starmap(_from_json, state['zone_models'].items())) + + self._zone_clusterer = state['zone_clusterer'] + self._cluster_models = deserialized_cluster_models + self._zone_models = deserialized_zone_models + self._supported_zones = state['supported_zones'] + + @property + def supported_zones(self): + return self._supported_zones + + def train(self, training_data: pd.DataFrame) -> None: + self._supported_zones = list(training_data.zone_id.unique()) + self._zone_clusterer = self._derive_zone_clusters(training_data) + prophet_features = self._derive_prophet_features(training_data, self._zone_clusterer) + self._cluster_models = self._train_cluster_models(prophet_features) + self._zone_models = self._train_zone_models(prophet_features) + LOGGER.info(f'Successfully trained {len(self._zone_models)} models') + + def predict(self, samples_batch: List[ProphetableFeatures]) -> Mapping[str, float]: + requested_zone_ids = [sample.zone_id for sample in samples_batch + if sample.zone_id in self.supported_zones] + + if not samples_batch: + return {} + requested_time = pd.Timestamp(samples_batch[0].at.replace(tzinfo=None)) + future = pd.DataFrame({'ds': [requested_time]}) + + cluster_available_rates = { + self._zone_clusterer[zone_id]: self._cluster_models[self._zone_clusterer[zone_id]].predict(future) + for zone_id in requested_zone_ids + } + unwrap = lambda x: x[0] if x else x + result = { + zone_id: unwrap(self._zone_models[zone_id].predict( + future.assign( + cluster_available_rate=cluster_available_rates[ + self._zone_clusterer[zone_id] + ].yhat.clip(0, 1) + ) + ).yhat.clip(0, 1).tolist()) + for zone_id in requested_zone_ids + } + return result + + def _derive_zone_clusters(self, training_data: pd.DataFrame) -> Mapping[str, str]: + zone_and_cluster_id_pairs = pd.concat( + [training_data.zone_id, + training_data.zone_id.astype(str).str[:2].rename('cluster_id')], + axis='columns' + ).drop_duplicates() + zone_to_cluster_map = dict(zip(zone_and_cluster_id_pairs.zone_id, + zone_and_cluster_id_pairs.cluster_id)) + return zone_to_cluster_map + + def _derive_prophet_features(self, training_data: pd.DataFrame, zone_clusterer: Mapping[str, str]) -> pd.DataFrame: + training_data = training_data.assign( + available_rate=lambda df: (1 - df.occu_cnt_rate).clip(0, 1), + available_count=lambda df: (df.available_rate * df.total_cnt).round().astype(int), + cluster_id=lambda df: df.zone_id.map(lambda zone_id: zone_clusterer[zone_id]) + ) + training_data_by_cluster = training_data.groupby(['cluster_id', 'semihour']) + training_data['cluster_available_rate'] = ( + training_data_by_cluster.available_count.transform(lambda s: s.sum()) + / training_data_by_cluster.total_cnt.transform(lambda s: s.sum()) + ).clip(0, 1) + used_columns = ['cluster_id', 'zone_id', 'ds', 'available_rate', + 'cluster_available_rate'] + return (training_data.rename(columns={'semihour': 'ds'}) + .loc[:, used_columns]) + + def _train_zone_models(self, refined_training_data: pd.DataFrame) -> Mapping[str, Prophet]: + refined_training_data = refined_training_data.rename( + columns={'available_rate': 'y'} + ) + zone_models = {} + for zone_id, zone_training_data in tqdm( + refined_training_data.groupby('zone_id'), + desc='Training Parking Zone Models', + leave=False + ): + zone_model = Prophet(yearly_seasonality=False) + zone_model.add_regressor('cluster_available_rate') + zone_model.fit(zone_training_data) + zone_models[zone_id] = zone_model + return zone_models + + def _train_cluster_models(self, refined_training_data: pd.DataFrame) -> Mapping[str, Prophet]: + refined_training_data = refined_training_data.rename( + columns={'cluster_available_rate': 'y'} + ) + cluster_models = {} + for cluster_id, cluster_training_data in tqdm( + refined_training_data.groupby('cluster_id'), + desc='Training Clustered Parking Zone Models', + leave=False + ): + cluster_model = Prophet(yearly_seasonality=False) + cluster_model.fit(cluster_training_data) + cluster_models[cluster_id] = cluster_model + return cluster_models \ No newline at end of file