-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
539 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,4 @@ | ||
app/output/*.txt | ||
app/_models/ | ||
venv/ | ||
|
||
__pycache__/ | ||
|
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import ForwardRef, List | ||
|
||
import pandas as pd | ||
|
||
from app.data_formats import APIPrediction | ||
|
||
ModelFeatures = ForwardRef('ModelFeatures') | ||
|
||
|
||
class Model(ABC): | ||
"""An abstract base class for ML models.""" | ||
|
||
@abstractmethod | ||
def __getstate__(self): ... | ||
|
||
@abstractmethod | ||
def __setstate__(self, state): ... | ||
|
||
@property | ||
@abstractmethod | ||
def supported_zones(self): ... | ||
|
||
@abstractmethod | ||
def train(self, training_data: pd.DataFrame) -> None: ... | ||
|
||
@abstractmethod | ||
def predict(self, data: ModelFeatures) -> List[APIPrediction]: ... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
import logging | ||
import sys | ||
from typing import ForwardRef, List, Mapping, MutableMapping | ||
|
||
import numpy as np | ||
import pandas as pd | ||
from pydantic import BaseModel, conlist, validate_arguments, validator | ||
from sklearn.neural_network import MLPRegressor | ||
|
||
from app._models.abstract_model import Model, ModelFeatures | ||
from app.constants import DAY_OF_WEEK, HOURS_END, HOURS_START, UNENFORCED_DAYS | ||
from app.data_formats import APIPredictionRequest | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
LOGGER.setLevel(logging.INFO) | ||
if sys.stdout.isatty(): | ||
LOGGER.addHandler(logging.StreamHandler(sys.stdout)) | ||
|
||
|
||
TOTAL_SEMIHOURS = 2 * (HOURS_END - HOURS_START).hours + abs(HOURS_END.minute - HOURS_START.minute) // 30 | ||
TOTAL_ENFORCEMENT_DAYS = 7 - len(UNENFORCED_DAYS) | ||
|
||
|
||
ModelFeaturesv0EarlyAccessPreRelease = ForwardRef('ModelFeaturesv0EarlyAccessPreRelease') | ||
|
||
|
||
class ModelFeaturesv0EarlyAccessPreRelease(BaseModel): | ||
zone_id: str | ||
semihour_onehot: conlist(int, | ||
min_items=TOTAL_SEMIHOURS - 1, | ||
max_items=TOTAL_SEMIHOURS - 1) | ||
dayofweek_onehot: conlist(int, | ||
min_items=TOTAL_ENFORCEMENT_DAYS - 1, | ||
max_items=TOTAL_ENFORCEMENT_DAYS - 1) | ||
|
||
@validator('semihour_onehot', 'dayofweek_onehot') | ||
def one_hot_encoded(cls, feature): | ||
number_of_ones = (np.array(feature) == 1).sum() | ||
number_of_zeros = (np.array(feature) == 0).sum() | ||
assert number_of_ones + number_of_zeros == len(feature), \ | ||
'Input should consist solely of 0s and 1s.' | ||
assert number_of_ones <= 1, \ | ||
'Input must contain at most one 1.' | ||
return feature | ||
|
||
@staticmethod | ||
@validate_arguments | ||
def from_request(request: APIPredictionRequest) -> List[ModelFeatures]: | ||
""" | ||
Convert a prediction request to the input format expected by a parking | ||
availability model. | ||
Parameters | ||
---------- | ||
request : APIPredictionRequest | ||
The prediction request to transform into model features | ||
Returns | ||
------- | ||
list of ModelFeaturesv0EarlyAccessPreRelease | ||
A set of features that can be passed into the `predict` method of a | ||
`ParkingAvailabilityModelv0EarlyAccessPreRelease`. | ||
""" | ||
timestamp = request.timestamp | ||
|
||
hour_onehot = np.array([timestamp.hour == hour for hour in range(8, 22)]) | ||
minute_bin_onehot = np.array([30 * minute_bin <= timestamp.minute < 30 * (minute_bin + 1) for minute_bin in range(2)]) | ||
semihour_onehot = (hour_onehot[:, None] & np.array(minute_bin_onehot)).flatten()[1:] | ||
|
||
enforcement_days = [day.value for day in DAY_OF_WEEK if day not in UNENFORCED_DAYS] | ||
dayofweek_onehot = np.array([ | ||
enforcement_days.index(timestamp.weekday()) == day_index | ||
for day_index in range(TOTAL_ENFORCEMENT_DAYS) | ||
])[1:] | ||
|
||
return [ModelFeaturesv0EarlyAccessPreRelease(zone_id=zone_id, | ||
semihour_onehot=semihour_onehot.astype(int).tolist(), | ||
dayofweek_onehot=dayofweek_onehot.astype(int).tolist()) | ||
for zone_id in request.zone_ids] | ||
|
||
|
||
ModelFeaturesv0EarlyAccessPreRelease.update_forward_refs() | ||
|
||
|
||
class ParkingAvailabilityModelv0EarlyAccessPreRelease(Model): | ||
def __init__(self): | ||
super().__init__() | ||
self._zone_models: MutableMapping[str, MLPRegressor] = {} | ||
self._supported_zones = [] | ||
|
||
def __getstate__(self): | ||
return { | ||
'zone_models': self._zone_models, | ||
'supported_zones': self.supported_zones | ||
} | ||
|
||
def __setstate__(self, state): | ||
self._zone_models = state['zone_models'] | ||
self._supported_zones = state['supported_zones'] | ||
|
||
@property | ||
def supported_zones(self): | ||
return self._supported_zones | ||
|
||
def train(self, training_data: pd.DataFrame) -> None: | ||
zone_id, X, y = ( | ||
training_data | ||
.assign( | ||
available_rate=lambda df: 1 - df.occu_cnt_rate, | ||
dayofweek=lambda df: df.semihour.dt.dayofweek.astype('category'), | ||
semihour=lambda df: pd.Series( | ||
zip(df.semihour.dt.hour, df.semihour.dt.minute), | ||
dtype='category', index=df.index | ||
) | ||
) | ||
.pipe( | ||
lambda df: ( | ||
df.zone_id, | ||
pd.get_dummies(df.loc[:, ['semihour', 'dayofweek']], | ||
drop_first=True), | ||
df.available_rate | ||
) | ||
) | ||
) | ||
|
||
self._supported_zones = zone_id.unique() | ||
for zone in self.supported_zones: | ||
LOGGER.info(f'Processing zone {zone}') | ||
X_cluster = X[zone_id == zone] | ||
y_cluster = y[zone_id == zone] | ||
|
||
LOGGER.info(f'Total (row, col) counts: {X_cluster.shape}') | ||
mlp = MLPRegressor(hidden_layer_sizes=(50, 50), activation='relu') | ||
mlp.fit(X_cluster, y_cluster) | ||
if np.issubdtype(type(zone), np.float64): | ||
zone = str(int(zone)) | ||
self._zone_models[zone] = mlp | ||
|
||
LOGGER.info(f'Successfully trained {len(self._zone_models)} models') | ||
|
||
@validate_arguments | ||
def predict(self, samples_batch: List[ModelFeaturesv0EarlyAccessPreRelease]) -> Mapping[str, float]: | ||
requested_zone_ids = [sample.zone_id for sample in samples_batch | ||
if sample.zone_id in self.supported_zones] | ||
|
||
regressor_feature_array = np.asarray([ | ||
sample.semihour_onehot + sample.dayofweek_onehot | ||
for sample in samples_batch | ||
]) | ||
return { | ||
zone_id: self._zone_models[zone_id] | ||
.predict(regressor_feature_array) | ||
.clip(0, 1)[0] | ||
for zone_id in requested_zone_ids | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
import logging | ||
import sys | ||
from datetime import datetime | ||
from typing import ForwardRef, List, Mapping | ||
|
||
import numpy as np | ||
import pandas as pd | ||
from pydantic import BaseModel, constr, validate_arguments | ||
|
||
from app._models.abstract_model import Model | ||
from app.data_formats import APIPredictionRequest | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
LOGGER.setLevel(logging.INFO) | ||
if sys.stdout.isatty(): | ||
LOGGER.addHandler(logging.StreamHandler(sys.stdout)) | ||
|
||
AverageFeatures = ForwardRef('AverageFeatures') | ||
|
||
|
||
class AverageFeatures(BaseModel): | ||
zone_id: constr(min_length=1) | ||
at: datetime | ||
|
||
@staticmethod | ||
@validate_arguments | ||
def from_request(request: APIPredictionRequest) -> List[AverageFeatures]: | ||
f""" | ||
Convert a prediction request to the input format expected by a parking | ||
availability model. | ||
Parameters | ||
---------- | ||
request : APIPredictionRequest | ||
The prediction request to transform into model features | ||
Returns | ||
------- | ||
list of AverageFeatures | ||
A set of features that can be passed into the `predict` method of a | ||
`AverageFeatures`. | ||
""" | ||
return [AverageFeatures(zone_id=zone_id, at=request.timestamp) | ||
for zone_id in request.zone_ids] | ||
|
||
|
||
AverageFeatures.update_forward_refs() | ||
|
||
|
||
class AvailabilityAverager(Model): | ||
def __init__(self, weeks_to_average=1): | ||
super().__init__() | ||
self._supported_zones: List[str] = [] | ||
self._weeks_to_average = weeks_to_average | ||
|
||
def __eq__(self, other): | ||
if not isinstance(other, AvailabilityAverager): | ||
return False | ||
rolling_average_ids = set(self._rolling_averages.keys()) | ||
other_rolling_average_ids = set(other._rolling_averages.keys()) | ||
return ( | ||
(rolling_average_ids == other_rolling_average_ids) and | ||
all( | ||
( | ||
self._rolling_averages[key] == other._rolling_averages[key] | ||
).all().all() | ||
for key in rolling_average_ids | ||
) and | ||
(self.supported_zones == other.supported_zones) and | ||
(self.weeks_to_average == other.weeks_to_average) | ||
) | ||
|
||
def __getstate__(self): | ||
return { | ||
'rolling_averages': self._rolling_averages, | ||
'supported_zones': self.supported_zones, | ||
'weeks_to_average': self.weeks_to_average | ||
} | ||
|
||
def __setstate__(self, state): | ||
self._rolling_averages = state['rolling_averages'] | ||
self._supported_zones = state['supported_zones'] | ||
self._weeks_to_average = state['weeks_to_average'] | ||
|
||
@property | ||
def supported_zones(self) -> List[str]: | ||
return self._supported_zones | ||
|
||
@property | ||
def weeks_to_average(self) -> int: | ||
return self._weeks_to_average | ||
|
||
def train(self, training_data: pd.DataFrame) -> None: | ||
training_data = training_data.assign( | ||
available_rate=lambda df: 1 - df.occu_cnt_rate, | ||
dayofweek=lambda df: df.semihour.dt.dayofweek.astype('category'), | ||
semihour_tuples=lambda df: pd.Series( | ||
zip(df.semihour.dt.hour, df.semihour.dt.minute), | ||
dtype='category', index=df.index | ||
) | ||
) | ||
LOGGER.info(f"At start, {len(training_data)}") | ||
training_data = training_data.sort_values( | ||
['zone_id', 'dayofweek', 'semihour_tuples'] | ||
) | ||
LOGGER.info(f"Sorted, {len(training_data)}") | ||
training_data[f'available_rate_{self.weeks_to_average:0>2}w'] = ( | ||
training_data.groupby( | ||
['zone_id', 'dayofweek', 'semihour_tuples'] | ||
).available_rate.transform( | ||
lambda group: group.shift().rolling(self.weeks_to_average, 1).mean() | ||
).dropna().clip(0, 1) | ||
) | ||
LOGGER.info(f"Grouped?, {len(training_data)}") | ||
training_data = training_data[ | ||
[ | ||
'zone_id', 'semihour', 'semihour_tuples', 'dayofweek', | ||
f'available_rate_{self.weeks_to_average:0>2}w' | ||
] | ||
].dropna() | ||
LOGGER.info(f"Dropped, {len(training_data)}") | ||
self._rolling_averages = { | ||
zone_day_time: data_in_zone_on_day_at_time | ||
for zone_day_time, data_in_zone_on_day_at_time | ||
in training_data.groupby(['zone_id', 'dayofweek', | ||
'semihour_tuples']) | ||
} | ||
|
||
def _get_zone_from_key(key): | ||
zone_id, _d, _s = key | ||
return zone_id | ||
|
||
zone_list = list(map(_get_zone_from_key, self._rolling_averages.keys())) | ||
unique_zone_list = np.unique(np.array(zone_list)).tolist() | ||
self._supported_zones = unique_zone_list | ||
|
||
|
||
# @validate_arguments | ||
def predict(self, samples_batch: List[AverageFeatures]) -> Mapping[str, float]: | ||
valid_requests = [sample for sample in samples_batch | ||
if sample.zone_id in self.supported_zones] | ||
# LOGGER.info(f'Samples, {len(samples_batch)}') | ||
# LOGGER.info(f'Valid, {len(valid_requests)}') | ||
predictions = {} | ||
for sample in valid_requests: | ||
sample_timestamp = pd.Timestamp(sample.at) | ||
sample_semihour_tuple = ( | ||
sample_timestamp.hour, | ||
30 * (sample_timestamp.minute // 30) | ||
) | ||
rolling_averages = self._rolling_averages.get(( | ||
sample.zone_id, | ||
sample_timestamp.dayofweek, | ||
sample_semihour_tuple | ||
), pd.DataFrame()) | ||
|
||
if rolling_averages.empty: | ||
# LOGGER.warn(f'Sample {sample.zone_id} empty') | ||
# LOGGER.warn(f'Test {test}') | ||
continue | ||
|
||
rolling_averages = rolling_averages.assign( | ||
date_diff=lambda df: -( | ||
df.semihour.dt.date - sample_timestamp.date() | ||
) | ||
).loc[ | ||
lambda df: df.date_diff == df.date_diff.min() | ||
] | ||
|
||
try: | ||
predictions[sample.zone_id] = rolling_averages[f'available_rate_{self.weeks_to_average:0>2}w'].iloc[0] | ||
except IndexError: | ||
continue | ||
|
||
return predictions |
Oops, something went wrong.