-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathreclassify.py
163 lines (129 loc) · 6.06 KB
/
reclassify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import re
import numpy as np
import argparse
import matplotlib.pyplot as plt
from jenkspy import JenksNaturalBreaks
import signal
import sys
# Function to handle SIGINT properly
def signal_handler(sig, frame):
print("\nProcess interrupted. Exiting gracefully.")
sys.exit(0)
# Register the signal handler
signal.signal(signal.SIGINT, signal_handler)
# Function to read data from file and filter relevant values
def read_data_from_file(filename):
values = []
with open(filename, 'r') as file:
for line in file:
if line.lstrip().startswith("VALID") or line.lstrip().startswith("INVALID"):
parts = re.split(r'\s+', line.strip())
value = parts[3]
values.append(3.0 if value == "XXX" else float(value))
return np.array(values)
# Set up argument parser
parser = argparse.ArgumentParser(description='Process a text file, classify, and update VALID/INVALID labels.')
parser.add_argument('-f', '--file', type=str, required=True, help='Path to the file containing the data')
parser.add_argument('--graph', action='store_true', help='Plot the histogram with the upper break overlay')
parser.add_argument('--max-value', type=float, help='Maximum value to include in the graph')
args = parser.parse_args()
# Read data from the specified file
data = read_data_from_file(args.file)
# Sort data
data = np.sort(data)
# Step 3: Perform Jenks natural breaks calculation to determine the major clusters
jenks = JenksNaturalBreaks(n_classes=3) # Increase number of classes to get better separation of bulk data
jenks.fit(data)
breaks = jenks.breaks_
# Identify the highest Jenks break point (excluding the maximum value)
max_jenks_break = max(breaks[:-1])
# Find differences between consecutive data points (gaps)
diffs = np.diff(data)
# Identify the two largest gaps
largest_gaps_indices = np.argsort(diffs)[-2:]
# Find the midpoint of the largest gap (upper break value for classification)
# Make sure this midpoint is less than the highest Jenks break point
upper_break = None
upper_break_upper_boundary = None
upper_break_lower_boudary = None
for idx in reversed(largest_gaps_indices):
midpoint = (data[idx] + data[idx + 1]) / 2
if midpoint < max_jenks_break:
upper_break = midpoint
break
left_value = data[idx]
right_value = data[idx + 1]
for idx in reversed(largest_gaps_indices):
left_value = data[idx]
right_value = data[idx + 1]
#point_99_percent = left_value + 0.99 * (right_value - left_value)
#if point_99_percent < max_jenks_break:
# upper_break = point_99_percent
# break
if right_value < max_jenks_break:
upper_break_upper_boundary = right_value
#break
if left_value < max_jenks_break:
upper_break_lower_boundary = left_value
# Fallback if no suitable midpoint was found (use max_jenks_break as the threshold)
if upper_break is None:
upper_break = max_jenks_break
# Print the upper break value for reference
print("Upper break value (threshold between VALID and INVALID):", upper_break)
print(f"Upper: {upper_break_upper_boundary}")
# Step 4: Process lines in the original file
output_lines = []
with open(args.file, 'r') as file:
for line in file:
original_line = line.rstrip() # Preserve leading whitespace, remove trailing newline
leading_whitespace = len(line) - len(line.lstrip()) # Calculate leading whitespace
if line.lstrip().startswith("VALID") or line.lstrip().startswith("INVALID"):
parts = re.split(r'\s+', original_line.strip())
value = parts[3]
value = 3.0 if value == "XXX" else float(value)
# Update the first part based on the value compared to the upper break
if value > upper_break:
parts[0] = "INVALID"
else:
parts[0] = "VALID"
updated_line = " " * leading_whitespace + " ".join(parts)
output_lines.append(updated_line)
else:
output_lines.append(line.rstrip())
# Step 5: Output the updated lines
output_filename = args.file + ".reclassified.txt"
with open(output_filename, "w") as file:
for line in output_lines:
file.write(line + "\n")
print(f"Processing complete. Output saved to '{output_filename}'.")
# Step 6: Plotting the histogram if --graph flag is provided
if args.graph:
try:
# If --max-value is provided, filter data accordingly
if args.max_value is not None:
filtered_data = data[data <= args.max_value]
else:
filtered_data = data
plt.hist(filtered_data, bins=20, color='blue', edgecolor='black')
plt.axvline(upper_break, color='red', linestyle='dashed', linewidth=2)
# Adding the upper break label with a background and offset to the right
plt.text(upper_break + 0.01, plt.ylim()[1] * 0.9, f'{upper_break:.2f}', color='red',
fontsize=14, fontweight='bold', ha='left',
bbox=dict(facecolor='#ccc', alpha=0.5, edgecolor='none'))
plt.axvline(upper_break_upper_boundary, color='red', linestyle='dashed', linewidth=2)
# Adding the upper break label with a background and offset to the right
plt.text(upper_break_upper_boundary + 0.05, plt.ylim()[1] * 0.7, f'{upper_break_upper_boundary:.2f}', color='red',
fontsize=14, fontweight='bold', ha='left',
bbox=dict(facecolor='#ccc', alpha=0.5, edgecolor='none'))
plt.axvline(upper_break_lower_boundary, color='red', linestyle='dashed', linewidth=2)
# Adding the upper break label with a background and offset to the right
plt.text(upper_break_lower_boundary - 0.15, plt.ylim()[1] * 0.7, f'{upper_break_lower_boundary:.2f}', color='red',
fontsize=14, fontweight='bold', ha='left',
bbox=dict(facecolor='#ccc', alpha=0.5, edgecolor='none'))
plt.title('Upper Break Value Overlay')
plt.xlabel('Values')
plt.ylabel('Frequency')
plt.show()
except KeyboardInterrupt:
print("\nPlotting interrupted. Exiting gracefully.")
print("Done.")