-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Testing LFS and adding some testing for anomaly detection
- Loading branch information
Matthias Niedermaier
committed
Jan 3, 2025
1 parent
d40d15d
commit 8fe61dc
Showing
9 changed files
with
104 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
*.pcap filter=lfs diff=lfs merge=lfs -text |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
Git LFS file not shown
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
import pyshark | ||
import pandas as pd | ||
import numpy as np | ||
import tensorflow as tf | ||
from sklearn.preprocessing import MinMaxScaler | ||
|
||
def parse_modbus_layer_fields(pcap_file): | ||
""" | ||
Parse a PCAP file and extract all fields dynamically from the Modbus layer. | ||
""" | ||
print(f"Parsing Modbus TCP traffic from PCAP file: {pcap_file}") | ||
capture = pyshark.FileCapture(pcap_file, display_filter="modbus") | ||
traffic_data = [] | ||
|
||
for packet in capture: | ||
if 'MODBUS' in packet: | ||
modbus_layer = packet['MODBUS'] | ||
packet_data = {} | ||
for field in modbus_layer.field_names: | ||
try: | ||
packet_data[field] = getattr(modbus_layer, field) | ||
except AttributeError: | ||
packet_data[field] = None # If field is not available | ||
traffic_data.append(packet_data) | ||
|
||
capture.close() | ||
return pd.DataFrame(traffic_data) | ||
|
||
def preprocess_data(data, scaler=None): | ||
""" | ||
Preprocess Modbus TCP data for TensorFlow. | ||
""" | ||
# Fill missing values and convert non-numeric fields to numeric | ||
data = data.fillna(0) | ||
for col in data.columns: | ||
try: | ||
data[col] = pd.to_numeric(data[col], errors='coerce').fillna(0) | ||
except ValueError: | ||
pass | ||
|
||
# Normalize the data | ||
if scaler is None: | ||
scaler = MinMaxScaler() | ||
normalized_data = scaler.fit_transform(data) | ||
else: | ||
normalized_data = scaler.transform(data) | ||
|
||
return normalized_data, scaler | ||
|
||
def detect_anomalies(model, data, threshold=0.5): | ||
""" | ||
Use a trained model to detect anomalies in the data. | ||
""" | ||
predictions = model.predict(data) | ||
anomalies = predictions > threshold | ||
return anomalies, predictions | ||
|
||
if __name__ == "__main__": | ||
# Replace with your new PCAP file path | ||
new_pcap_file = "modbus_traffic_flood.pcap" | ||
|
||
# Step 1: Extract Modbus TCP traffic | ||
new_modbus_data = parse_modbus_layer_fields(new_pcap_file) | ||
|
||
if not new_modbus_data.empty: | ||
print("Extracted Modbus Data:") | ||
print(new_modbus_data.head()) | ||
|
||
# Step 2: Preprocess the data | ||
# Load the same scaler used during training | ||
preprocessed_data, _ = preprocess_data(new_modbus_data) | ||
|
||
# Step 3: Load the trained TensorFlow model | ||
model = tf.keras.models.load_model("modbus_tf_model.keras") | ||
print("Model loaded successfully.") | ||
|
||
# Step 4: Detect anomalies | ||
anomalies, predictions = detect_anomalies(model, preprocessed_data) | ||
|
||
# Display results | ||
new_modbus_data['Prediction'] = predictions | ||
new_modbus_data['Anomaly'] = anomalies | ||
print("Detection Results:") | ||
print(new_modbus_data[['Prediction', 'Anomaly']]) | ||
|
||
# Save results to a CSV | ||
new_modbus_data.to_csv("anomaly_detection_results.csv", index=False) | ||
print("Results saved to anomaly_detection_results.csv") | ||
else: | ||
print("No Modbus TCP traffic found in the PCAP file.") |