-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathapp.py
143 lines (115 loc) · 4.87 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import numpy as np
import pandas as pd
import joblib
from flask import Flask, render_template, request, jsonify
from werkzeug.utils import secure_filename
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
# Initialize the Flask app
app = Flask(__name__)
# Set up directories and allowed file extensions
UPLOAD_FOLDER = "uploads"
ALLOWED_EXTENSIONS = {"csv"}
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
# Load the pre-trained model, scaler, and label encoders
class NetworkTrafficClassifier:
def __init__(self):
self.model = None
self.scaler = None
self.label_encoders = {}
self.numeric_columns = None
self.categorical_columns = ["proto", "service", "state"]
def load_model(self):
self.model = joblib.load("models/network_traffic_classifier.pkl")
self.scaler = joblib.load("models/scaler.pkl")
self.label_encoders = joblib.load("models/label_encoders.pkl")
print("Model, Scaler, and Label Encoders loaded.")
def preprocess_data(self, df, is_training=False):
df = df.copy()
columns_to_drop = ["id", "attack_cat", "is_sm_ips_ports"]
df = df.drop(columns=[col for col in columns_to_drop if col in df.columns])
if "label" in df.columns:
df["label"] = pd.to_numeric(df["label"])
# Handle categorical columns
for col in self.categorical_columns:
if col in df.columns:
if is_training:
self.label_encoders[col] = LabelEncoder()
df[col] = self.label_encoders[col].fit_transform(
df[col].astype(str)
)
else:
if col in self.label_encoders:
df[col] = df[col].astype(str)
known_categories = set(self.label_encoders[col].classes_)
df[col] = df[col].map(
lambda x: (
x
if x in known_categories
else self.label_encoders[col].classes_[0]
)
)
df[col] = self.label_encoders[col].transform(df[col])
# Dynamically identify numeric columns if not already available
if self.numeric_columns is None:
self.numeric_columns = [
col
for col in df.columns
if col not in self.categorical_columns
and col != "label"
and pd.api.types.is_numeric_dtype(df[col])
]
for col in self.numeric_columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
df = df.replace([np.inf, -np.inf], np.nan)
df = df.fillna(0)
X = df.drop("label", axis=1)
# Scale the numeric columns
if self.numeric_columns:
numeric_features = X[self.numeric_columns]
scaled_features = self.scaler.transform(numeric_features)
X[self.numeric_columns] = scaled_features
return X
def predict(self, X):
return self.model.predict(X)
# Instantiate the classifier and load the model
classifier = NetworkTrafficClassifier()
classifier.load_model()
# Check allowed file extension
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
# Home route
@app.route("/")
def index():
return render_template("index.html")
# Route for file upload and prediction
@app.route("/upload", methods=["POST"])
def upload_file():
if "file" not in request.files:
return jsonify({"error": "No file part"})
file = request.files["file"]
if file.filename == "":
return jsonify({"error": "No selected file"})
if file and allowed_file(file.filename):
# Secure the filename and save the file
filename = secure_filename(file.filename)
filepath = os.path.join(app.config["UPLOAD_FOLDER"], filename)
file.save(filepath)
# Load CSV file and preprocess data
df = pd.read_csv(filepath)
X_processed = classifier.preprocess_data(df, is_training=False)
# Predict using the trained model
y_pred = classifier.predict(X_processed)
# Generate results for display
result = []
for true_label, pred_label in zip(df["label"], y_pred):
result.append(
{
"True label": "Malicious" if true_label == 1 else "Normal",
"Predicted label": "Malicious" if pred_label == 1 else "Normal",
}
)
# Return the results
return jsonify({"result": result})
if __name__ == "__main__":
app.run(debug=True)