-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
403 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import pynetbox | ||
import requests | ||
|
||
def connect_to_netbox(url, token): | ||
""" | ||
Connect to the Netbox API using the provided URL and token. | ||
Args: | ||
- url (str): The base URL of the Netbox instance. | ||
- token (str): The authentication token for accessing the Netbox API. | ||
Returns: | ||
- netbox (pynetbox.core.api.Api): The Netbox API object configured to use the provided URL and token. | ||
""" | ||
# Create a custom requests session with SSL verification disabled | ||
session = requests.Session() | ||
session.verify = False # Disabling SSL verification for the session | ||
|
||
# Create a Netbox API object without specifying any session | ||
netbox = pynetbox.api(url, token) | ||
|
||
# Set the custom session for the Netbox API object's requests session | ||
netbox.http_session = session | ||
|
||
return netbox |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import csv | ||
import pynetbox | ||
from netbox_connection import connect_to_netbox | ||
from concurrent.futures import ThreadPoolExecutor | ||
import configparser | ||
from tqdm import tqdm | ||
|
||
def process_row(row, pbar): | ||
""" | ||
Process a single row from the CSV file and update/create IP addresses in Netbox. | ||
Args: | ||
- row (dict): A dictionary representing a single row from the CSV file. | ||
- pbar (tqdm.tqdm): Progress bar to update the progress of processing rows. | ||
""" | ||
# Convert 'tags' from a comma-separated string to a list of dictionaries | ||
tags_list = [{'name': tag.strip()} for tag in row['tags'].split(',')] | ||
|
||
# Assuming you're writing to the 'ipam' endpoint, replace with the correct endpoint if not | ||
existing_address = netbox.ipam.ip_addresses.get(address=row['address']) | ||
|
||
if existing_address: | ||
# Update the existing address | ||
existing_address.status = row['status'] | ||
existing_address.description = row['description'] | ||
existing_address.dns_name = row['dns_name'] | ||
existing_address.tags = tags_list | ||
if row['tenant'] != 'N/A': # Check if tenant is not 'N/A' | ||
existing_address.tenant = {'name': row['tenant']} | ||
existing_address.save() | ||
else: | ||
try: | ||
# Create a new address if it doesn't exist | ||
tenant_data = {'name': row['tenant']} if row['tenant'] != 'N/A' else None | ||
netbox.ipam.ip_addresses.create( | ||
address=row['address'], | ||
status=row['status'], | ||
description=row['description'], | ||
dns_name=row['dns_name'], | ||
tags=tags_list, | ||
tenant=tenant_data | ||
) | ||
except pynetbox.core.query.RequestError as e: | ||
# Handle duplicate address error | ||
if 'Duplicate IP address' in str(e): | ||
None | ||
else: | ||
# Propagate other errors | ||
raise | ||
|
||
# Update progress bar for each processed row | ||
pbar.update(1) | ||
|
||
def write_data_to_netbox(url, token, csv_file): | ||
""" | ||
Write data from a CSV file to Netbox. | ||
Args: | ||
- url (str): The base URL of the Netbox instance. | ||
- token (str): The authentication token for accessing the Netbox API. | ||
- csv_file (str): Path to the CSV file containing data to be written to Netbox. | ||
""" | ||
global netbox | ||
netbox = connect_to_netbox(url, token) | ||
|
||
with open(csv_file, 'r') as file: | ||
reader = csv.DictReader(file) | ||
rows = list(reader) | ||
|
||
total_rows = len(rows) | ||
with tqdm(total=total_rows, desc="Processing Rows") as pbar: | ||
with ThreadPoolExecutor(max_workers=5) as executor: # Adjust max_workers as needed | ||
futures = [executor.submit(process_row, row, pbar) for row in rows] | ||
# Wait for all futures to complete | ||
for future in futures: | ||
future.result() | ||
|
||
# Read URL and token from var.ini | ||
config = configparser.ConfigParser() | ||
config.read('var.ini') | ||
url = config['credentials']['url'] | ||
token = config['credentials']['token'] | ||
|
||
write_data_to_netbox(url, token, 'ipam_addresses.csv') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import csv | ||
import os | ||
import configparser | ||
import netbox_connection | ||
|
||
def get_ipam_prefixes(netbox): | ||
""" | ||
Retrieve all IPAM prefixes from Netbox. | ||
Args: | ||
- netbox (pynetbox.core.api.Api): The Netbox API object. | ||
Returns: | ||
- ipam_prefixes (pynetbox.core.query.Request): All IPAM prefixes retrieved from Netbox. | ||
""" | ||
ipam_prefixes = netbox.ipam.prefixes.all() | ||
return ipam_prefixes | ||
|
||
def write_to_csv(data, filename): | ||
""" | ||
Write IPAM prefixes data to a CSV file. | ||
Args: | ||
- data (pynetbox.core.query.Request): IPAM prefixes data retrieved from Netbox. | ||
- filename (str): Name of the CSV file to write data to. | ||
""" | ||
script_dir = os.path.dirname(os.path.realpath(__file__)) # Get the directory of the running script | ||
file_path = os.path.join(script_dir, filename) # Construct the full path to the output file | ||
with open(file_path, 'w', newline='') as file: | ||
writer = csv.writer(file) | ||
writer.writerow(['Prefix', 'Status', 'Tags', 'Tenant']) # Writing headers | ||
for prefix in data: | ||
tag_names = [tag.name for tag in prefix.tags] | ||
tenant_name = prefix.tenant.name if prefix.tenant else 'N/A' | ||
status_value = prefix.status.value if prefix.status else 'N/A' # Extract the value of the status field | ||
writer.writerow([prefix.prefix, status_value, ', '.join(tag_names), tenant_name]) | ||
|
||
# Read URL and token from var.ini | ||
config = configparser.ConfigParser() | ||
config.read('var.ini') | ||
url = config['credentials']['url'] | ||
token = config['credentials']['token'] | ||
|
||
netbox = netbox_connection.connect_to_netbox(url, token) | ||
|
||
ipam_prefixes = get_ipam_prefixes(netbox) | ||
write_to_csv(ipam_prefixes, 'ipam_prefixes.csv') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
import csv | ||
from datetime import datetime | ||
import os | ||
|
||
def get_file_path(directory, date): | ||
""" | ||
Generate a file path based on the directory and date. | ||
Args: | ||
- directory (str): The directory where the file will be located. | ||
- date (datetime.datetime): The date to be included in the file name. | ||
Returns: | ||
- file_path (str): The full file path based on the directory and date. | ||
""" | ||
return os.path.join(directory, f'nmap_results_{date.strftime("%Y-%m-%d")}.csv') | ||
|
||
def get_latest_files(directory, num_files=2): | ||
""" | ||
Get the list of CSV files in a directory and sort them by modification time. | ||
Args: | ||
- directory (str): The directory to search for CSV files. | ||
- num_files (int): The number of latest files to retrieve. | ||
Returns: | ||
- files (list): The list of latest CSV file names. | ||
""" | ||
files = [f for f in os.listdir(directory) if f.endswith('.csv')] | ||
files.sort(key=lambda x: os.path.getmtime(os.path.join(directory, x)), reverse=True) | ||
return files[:num_files] | ||
|
||
# Directory for result files | ||
directory = 'results/' | ||
|
||
# Get the two latest file paths | ||
latest_files = get_latest_files(directory) | ||
file_paths = [get_file_path(directory, datetime.strptime(file_name[13:23], "%Y-%m-%d")) for file_name in latest_files] | ||
|
||
# Output file path | ||
output_file_path = 'ipam_addresses.csv' | ||
|
||
def read_csv(file_path): | ||
""" | ||
Read a CSV file and return a dictionary with addresses as keys. | ||
Args: | ||
- file_path (str): The path to the CSV file. | ||
Returns: | ||
- data (dict): A dictionary with addresses as keys and corresponding row data as values. | ||
""" | ||
data = {} | ||
with open(file_path, 'r') as file: | ||
reader = csv.DictReader(file) | ||
for row in reader: | ||
address = row['address'] | ||
data[address] = row | ||
return data | ||
|
||
def write_csv(data, file_path): | ||
""" | ||
Write data to a new CSV file. | ||
Args: | ||
- data (dict): A dictionary containing row data with addresses as keys. | ||
- file_path (str): The path to the output CSV file. | ||
""" | ||
with open(file_path, 'w', newline='') as file: | ||
fieldnames = ['address', 'dns_name', 'status', 'description', 'tags', 'tenant'] | ||
writer = csv.DictWriter(file, fieldnames=fieldnames) | ||
|
||
# Write header | ||
writer.writeheader() | ||
|
||
# Write data | ||
for row in data.values(): | ||
writer.writerow(row) | ||
|
||
# Read data from the latest file | ||
data = read_csv(file_paths[0]) | ||
|
||
# Check for deprecated addresses in the older file and update their status | ||
if len(file_paths) == 2: | ||
older_data = read_csv(file_paths[1]) | ||
for address, older_row in older_data.items(): | ||
if address not in data: | ||
# Address is missing in latest file, mark as deprecated | ||
older_row['status'] = 'deprecated' | ||
data[address] = older_row | ||
|
||
# Write the updated data to the new CSV file | ||
write_csv(data, output_file_path) | ||
|
||
print("Comparison and processing completed. Check the output file:", output_file_path) |
Oops, something went wrong.