-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtfsm_fire.py
197 lines (165 loc) · 7.39 KB
/
tfsm_fire.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import sqlite3
import textfsm
from typing import Dict, List, Tuple, Optional
import io
import time
import click
from multiprocessing import Process, Queue
import multiprocessing
import sys
def parse_template(template_content: str, device_output: str, result_queue: Queue):
"""Helper function to run in separate process."""
try:
template = textfsm.TextFSM(io.StringIO(template_content))
parsed = template.ParseText(device_output)
result_queue.put((template.header, parsed))
except Exception as e:
result_queue.put(None)
class TextFSMAutoEngine:
def __init__(self, db_path: str, verbose: bool = False):
self.db_path = db_path
self.verbose = verbose
self.connection = None
self._connect_db()
def _connect_db(self) -> None:
try:
self.connection = sqlite3.connect(self.db_path)
self.connection.row_factory = sqlite3.Row
if self.verbose:
click.echo(f"Connected to database: {self.db_path}")
except sqlite3.Error as e:
raise ConnectionError(f"Failed to connect to database: {e}")
def _calculate_template_score(
self,
parsed_data: List[Dict],
template: sqlite3.Row,
raw_output: str
) -> float:
score = 0.0
# Bail out if no data parsed
if not parsed_data:
return score
# Factor 1: Number of records parsed (0-30 points)
num_records = len(parsed_data)
if num_records > 0:
# For show version, we expect 1 record
if 'version' in template['cli_command'].lower():
score += 30 if num_records == 1 else 15
else:
# For neighbor discovery, scale based on typical counts
score += min(30, num_records * 10)
# Factor 2: Field population (0-40 points)
first_record = parsed_data[0]
if first_record:
if 'neighbors' in template['cli_command'].lower():
# Neighbor discovery critical fields
critical_fields = {
'LOCAL_INTERFACE', 'NEIGHBOR_PORT_ID', 'NEIGHBOR_NAME',
'MGMT_ADDRESS', 'NEIGHBOR_DESCRIPTION'
}
else:
# Show version critical fields
critical_fields = {
'VERSION', 'MODEL', 'HARDWARE', 'OS', 'HOSTNAME',
'UPTIME', 'SERIAL'
}
# Count populated critical fields
populated_critical = sum(
1 for field in critical_fields
if field in first_record and first_record[field] and str(first_record[field]).strip()
)
# Calculate percentage of critical fields populated
if critical_fields:
critical_score = (populated_critical / len(critical_fields)) * 40
score += critical_score
# Factor 3: Data quality (0-30 points)
if first_record:
quality_score = 0
if 'neighbors' in template['cli_command'].lower():
# Neighbor discovery quality checks
if 'LOCAL_INTERFACE' in first_record:
if any(x in str(first_record['LOCAL_INTERFACE']) for x in ['Gi', 'Eth', 'Te']):
quality_score += 10
if 'MGMT_ADDRESS' in first_record:
if str(first_record['MGMT_ADDRESS']).count('.') == 3:
quality_score += 10
if 'NEIGHBOR_NAME' in first_record:
if not any(x in str(first_record['NEIGHBOR_NAME']).lower() for x in ['show', 'invalid', 'total']):
quality_score += 10
else:
# Show version quality checks
if 'VERSION' in first_record:
if len(str(first_record['VERSION'])) > 3:
quality_score += 10
if 'MODEL' in first_record:
if len(str(first_record['MODEL'])) > 3:
quality_score += 10
if 'OS' in first_record:
if len(str(first_record['OS'])) > 3:
quality_score += 10
score += quality_score
if self.verbose:
click.echo(f"\nScore breakdown for template {template['cli_command']}:")
click.echo(f" Records score: {min(30, num_records * 10)}")
click.echo(f" Field population score: {critical_score}")
click.echo(f" Quality score: {quality_score}")
return score
def find_best_template(self, device_output: str, filter_string: Optional[str] = None) -> Tuple[
Optional[str], Optional[List[Dict]], float]:
"""Try filtered templates against the output and return the best match."""
best_template = None
best_parsed_output = None
best_score = 0
# Get filtered templates from database
templates = self.get_filtered_templates(filter_string)
total_templates = len(templates)
if self.verbose:
click.echo(f"Found {total_templates} matching templates for filter: {filter_string}")
# Try each template
for idx, template in enumerate(templates, 1):
if self.verbose:
percentage = (idx / total_templates) * 100
click.echo(f"\nTemplate {idx}/{total_templates} ({percentage:.1f}%): {template['cli_command']}",
nl=False)
try:
# Direct parsing without timeout
textfsm_template = textfsm.TextFSM(io.StringIO(template['textfsm_content']))
parsed = textfsm_template.ParseText(device_output)
parsed_dicts = [dict(zip(textfsm_template.header, row)) for row in parsed]
score = self._calculate_template_score(parsed_dicts, template, device_output)
if self.verbose:
click.echo(f" -> Score={score:.2f}, Records={len(parsed_dicts)}")
if score > best_score:
best_score = score
best_template = template['cli_command']
best_parsed_output = parsed_dicts
if self.verbose:
click.echo(click.style(" New best match!", fg='green'))
except Exception as e:
if self.verbose:
click.echo(" -> Failed to parse")
continue
# Remove debug prints
return best_template, best_parsed_output, best_score
def get_filtered_templates(self, filter_string: Optional[str] = None):
"""Get filtered templates from database."""
cursor = self.connection.cursor()
if filter_string:
filter_terms = filter_string.replace('-', '_').split('_')
query = "SELECT * FROM templates WHERE 1=1"
params = []
for term in filter_terms:
if term and len(term) > 2:
query += " AND cli_command LIKE ?"
params.append(f"%{term}%")
cursor.execute(query, params)
else:
cursor.execute("SELECT * FROM templates")
return cursor.fetchall()
def __del__(self):
if self.connection:
self.connection.close()
# Add this at the start of your script
if __name__ == '__main__':
# Required for Windows multiprocessing
multiprocessing.freeze_support()