Skip to content

Commit

Permalink
Linted all files
Browse files Browse the repository at this point in the history
  • Loading branch information
Mannerow committed Oct 28, 2024
1 parent cd5e04d commit 9dab4b7
Show file tree
Hide file tree
Showing 18 changed files with 77 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ PREFECT_WORKSPACE='YOUR-PREFECT-WORKSPACE-NAME'
TF_VAR_aws_region=us-east-1
TF_VAR_mlflow_models_bucket='MODEL-BUCKET-NAME'
TF_VAR_prediction_bucket='PREDICTION-BUCKET-NAME'
TF_VAR_ecr_repository_name='ECR-REPO-NAME'
TF_VAR_ecr_repository_name='ECR-REPO-NAME'
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,4 @@ override.tf.json
*_override.tf
*_override.tf.json
update_pipfile.py
.vscode.env
.vscode.env
18 changes: 17 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,20 @@ repos:
name: pylint
entry: pylint
language: python
types: [python]
types: [python]
additional_dependencies:
- click
- pandas
- kaggle
- scikit-learn
- mlflow
- numpy
- hyperopt
- xgboost
- psycopg
- python-dotenv
- evidently
- boto3
- botocore
- scipy
- prefect
14 changes: 13 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,2 +1,14 @@
[MESSAGES CONTROL]
disable=line-too-long, wrong-import-position, invalid-name, no-value-for-parameter, logging-fstring-interpolation, unused-argument, too-many-locals
disable=
line-too-long,
wrong-import-position,
invalid-name,
no-value-for-parameter,
logging-fstring-interpolation,
unused-argument,
too-many-locals,
unused-variable,
redefined-outer-name,
broad-exception-raised,
duplicate-code,
not-context-manager
2 changes: 1 addition & 1 deletion Dockerfile.mlflow
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ WORKDIR /app/src
EXPOSE 5000

# Define the entrypoint to source the .env file and run mlflow
ENTRYPOINT ["/bin/bash", "-c", "source /app/.env && mlflow server --host 0.0.0.0 --backend-store-uri $MLFLOW_BACKEND_STORE_URI --default-artifact-root $MLFLOW_DEFAULT_ARTIFACT_ROOT"]
ENTRYPOINT ["/bin/bash", "-c", "source /app/.env && mlflow server --host 0.0.0.0 --backend-store-uri $MLFLOW_BACKEND_STORE_URI --default-artifact-root $MLFLOW_DEFAULT_ARTIFACT_ROOT"]
2 changes: 1 addition & 1 deletion config/grafana_dashboards.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ providers:
# <string, required> path to dashboard files on disk. Required when using the 'file' type
path: /opt/grafana/dashboards
# <bool> use folder names from filesystem to create folders in Grafana
foldersFromFilesStructure: true
foldersFromFilesStructure: true
2 changes: 1 addition & 1 deletion config/grafana_datasources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ datasources:
password: 'example'
jsonData:
sslmode: 'disable'
database: test
database: test
Original file line number Diff line number Diff line change
Expand Up @@ -540,4 +540,4 @@
"uid": "ddsyl5p7yzx1ce",
"version": 3,
"weekStart": ""
}
}
2 changes: 1 addition & 1 deletion infrastructure/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ module "predictions_data_bucket" {
module "ecr_repository" {
source = "./modules/ecr"
repository_name = var.ecr_repository_name
}
}
2 changes: 1 addition & 1 deletion infrastructure/modules/s3/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ resource "aws_s3_bucket" "s3_bucket" {
lifecycle {
prevent_destroy = false
}
}
}
2 changes: 1 addition & 1 deletion infrastructure/modules/s3/variables.tf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
variable "bucket_name" {
description = "The name of the S3 bucket"
type = string
}
}
2 changes: 1 addition & 1 deletion infrastructure/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ variable "docker_image_local_path" {
variable "ecr_repository_name" {
description = "The name of the ECR repository"
type = string
}
}
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tool.isort]
profile = "black"
69 changes: 20 additions & 49 deletions src/run_flow.py
Original file line number Diff line number Diff line change
@@ -1,111 +1,82 @@
"""Runs the batch pipeline flow."""

import subprocess
from datetime import timedelta

from prefect import flow, serve, task
from prefect import flow, task


@task
def run_data_preprocess():
"""Data Preprocessing."""
result = subprocess.run(
["python", "data_preprocess.py"], capture_output=True, text=True
["python", "data_preprocess.py"], capture_output=True, text=True, check=True
)
if result.returncode != 0:
raise Exception(f"data_preprocess.py failed: {result.stderr}")
print(result.stdout)
return (
result.stdout
) # Return value to be used as input in dependent tasks if needed


@task
def run_train():
result = subprocess.run(["python", "train.py"], capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"train.py failed: {result.stderr}")
print(result.stdout)
return (
result.stdout
) # Return value to be used as input in dependent tasks if needed
"""Training script."""
result = subprocess.run(
["python", "train.py"], capture_output=True, text=True, check=True
)


@task
def run_hpo():
result = subprocess.run(["python", "hpo.py"], capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"hpo.py failed: {result.stderr}")
print(result.stdout)
return (
result.stdout
) # Return value to be used as input in dependent tasks if needed
"""Hyperparameter tuning."""
result = subprocess.run(
["python", "hpo.py"], capture_output=True, text=True, check=True
)


@task
def run_register_model():
"""Hyperparameter tuning."""
result = subprocess.run(
["python", "register_model.py"], capture_output=True, text=True
["python", "register_model.py"], capture_output=True, text=True, check=True
)
if result.returncode != 0:
raise Exception(f"register_model.py failed: {result.stderr}")
print(result.stdout)
return (
result.stdout
) # Return value to be used as input in dependent tasks if needed


@task
def run_score_batch():
"""Makes predictions and saves them."""
result = subprocess.run(
["python", "score_batch.py"], capture_output=True, text=True
["python", "score_batch.py"], capture_output=True, text=True, check=True
)
if result.returncode != 0:
raise Exception(f"score_batch.py failed: {result.stderr}")
print(result.stdout)
return (
result.stdout
) # Return value to be used as input in dependent tasks if needed


@task
def run_monitor_metrics():
"""Monitors with Evidently."""
result = subprocess.run(
["python", "monitor_metrics.py"], capture_output=True, text=True
["python", "monitor_metrics.py"], capture_output=True, text=True, check=True
)
if result.returncode != 0:
raise Exception(f"monitor_metrics.py failed: {result.stderr}")
print(result.stdout)
return (
result.stdout
) # Return value to be used as input in dependent tasks if needed


@flow(log_prints=True)
def ml_workflow():
"""Runs the flow and serves every 10 mins."""
print("🔄 Preprocessing the data...")

data_preprocess_result = run_data_preprocess()

print("🏋️ Training the models...")

train_result = run_train(wait_for=[data_preprocess_result])

print("🎛️ Tuning hyperparameters...")

hpo_result = run_hpo(wait_for=[train_result]) # Dependency managed by wait_for

print("🏆 Registering the best model...")

register_model_result = run_register_model(
wait_for=[hpo_result]
) # Dependency managed by wait_for

print("🔮 Making predictions...")

score_batch_result = run_score_batch(
wait_for=[register_model_result]
) # Dependency managed by wait_for

print("📊 Monitoring...")

monitor_metrics_result = run_monitor_metrics(
wait_for=[score_batch_result]
) # Dependency managed by wait_for
Expand Down
11 changes: 8 additions & 3 deletions src/score_batch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Scores batch and saves to S3"""

import os
import pickle
import sys
import uuid

import boto3
Expand All @@ -13,7 +13,6 @@
# Load environment variables
from dotenv import load_dotenv
from mlflow.tracking import MlflowClient
from scipy.sparse import csr_matrix

import utils

Expand All @@ -26,13 +25,15 @@


def generate_uuids(n):
"""Generates trip IDs"""
trip_ids = []
for i in range(n):
trip_ids.append(str(uuid.uuid4()))
return trip_ids


def load_best_model(model_bucket, model_name, experiment_name):
"""Loads best model from MLflow"""
client = MlflowClient()

# Get experiment ID from the experiment name
Expand Down Expand Up @@ -62,6 +63,7 @@ def load_best_model(model_bucket, model_name, experiment_name):


def save_results(df, y_pred, y_test, run_id, output_file):
"""Saves the df to a parquet file in output file location."""
trip_ids = generate_uuids(df.shape[0])
results_df = pd.DataFrame(
{
Expand All @@ -85,6 +87,7 @@ def save_results(df, y_pred, y_test, run_id, output_file):


def create_s3_bucket(bucket_name, region=None):
"""Creates S3 bucket if it doesn't exist."""
# Initialize a session using Amazon S3
s3_client = boto3.client("s3", region_name=region)

Expand Down Expand Up @@ -114,6 +117,7 @@ def create_s3_bucket(bucket_name, region=None):
def apply_model(
test_data_path: str, model_bucket: str, model_name: str, dest_bucket: str
):
"""Applies the predictions."""
print(
f'Reading the prepared data from {os.path.join(test_data_path, "test.pkl")}...'
)
Expand Down Expand Up @@ -163,6 +167,7 @@ def apply_model(
help="Location where the resulting files will be saved",
)
def run(test_data_path: str, model_bucket: str, model_name: str, dest_bucket: str):
"""Runs flow."""
apply_model(test_data_path, model_bucket, model_name, dest_bucket)


Expand Down
2 changes: 1 addition & 1 deletion src/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
RANDOM_STATE = 42

import os
import pickle

import click
import mlflow
Expand All @@ -35,6 +34,7 @@
help="Location where the processed DoorDash data is saved",
)
def run_train(data_path: str):
"""Run flow."""
models = {
"LinearRegression": LinearRegression(),
"XGBRegressor": XGBRegressor(
Expand Down
5 changes: 5 additions & 0 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
"""Contains utility functions."""

import pickle

import pandas as pd
from scipy.sparse import csr_matrix


def load_pickle(filename: str):
"""Load pickle from file."""
with open(filename, "rb") as f_in:
return pickle.load(f_in)


def dump_pickle(obj, filename: str):
"""Dump pickle to file."""
with open(filename, "wb") as f_out:
return pickle.dump(obj, f_out)


def decode_dataframe(dv, df):
"""Decode a DF from Sparse --> original"""
# Ensure df is a DataFrame, not a csr_matrix
if isinstance(df, csr_matrix):
# Convert sparse matrix to dense matrix
Expand Down
2 changes: 1 addition & 1 deletion start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ aws ecr get-login-password --region $TF_VAR_aws_region | docker login --username
docker tag $TF_VAR_ecr_repository_name:latest $AWS_ACCOUNT_ID.dkr.ecr.$TF_VAR_aws_region.amazonaws.com/$TF_VAR_ecr_repository_name:latest

# Push Docker image to ECR
docker push $AWS_ACCOUNT_ID.dkr.ecr.$TF_VAR_aws_region.amazonaws.com/$TF_VAR_ecr_repository_name:latest
docker push $AWS_ACCOUNT_ID.dkr.ecr.$TF_VAR_aws_region.amazonaws.com/$TF_VAR_ecr_repository_name:latest

0 comments on commit 9dab4b7

Please sign in to comment.