-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnvd-cve-json-polling.py
139 lines (115 loc) · 4.85 KB
/
nvd-cve-json-polling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python3
import os
import requests
import json
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timedelta, timezone
#parameters which can be modified are below
#last fetch timestamp to be used for pubStartDate
LAST_FETCH_FILE = os.getenv('LAST_FETCH_FILE', 'last_fetch.txt')
#CVE log with all the goodies
JSON_LOG_FILE = os.getenv('JSON_LOG_FILE', 'cve_log.log')
#output for debugging
GENERAL_LOG_FILE = os.getenv('GENERAL_LOG_FILE', 'output.log')
#5MB - could raise higher if needed
LOG_FILE_MAX_SIZE = int(os.getenv('LOG_FILE_MAX_SIZE', 5 * 1024 * 1024))
#look into expanding this
LOG_FILE_BACKUP_COUNT = int(os.getenv('LOG_FILE_BACKUP_COUNT', 3))
# create a logger
def setup_json_logger():
logger = logging.getLogger('JSONLogger')
logger.setLevel(logging.INFO)
# set file handler to use variables set for file size and rotation
file_handler = RotatingFileHandler(JSON_LOG_FILE, maxBytes=LOG_FILE_MAX_SIZE, backupCount=LOG_FILE_BACKUP_COUNT)
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(message)s')
file_handler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(file_handler)
return logger
# general logger will log any errors
def setup_general_logger():
logger = logging.getLogger('GeneralLogger')
logger.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler(GENERAL_LOG_FILE, maxBytes=LOG_FILE_MAX_SIZE, backupCount=LOG_FILE_BACKUP_COUNT)
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
return logger
# The last fetch time will be used to set the pubStartDate query parameter
def get_last_fetch_time():
if os.path.exists(LAST_FETCH_FILE):
with open(LAST_FETCH_FILE, 'r') as f:
last_fetch = f.read().strip()
return datetime.strptime(last_fetch, '%Y-%m-%dT%H:%M:%S').replace(tzinfo=timezone.utc)
return None
# This will update the last_fetch_time for use on the next iteration
def save_last_fetch_time(timestamp_str):
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S.%f').replace(tzinfo=timezone.utc)
timestamp_no_ms = timestamp.strftime('%Y-%m-%dT%H:%M:%S')
with open(LAST_FETCH_FILE, 'w') as f:
f.write(timestamp_no_ms)
# This is the NVD API and request parameters
# ref: https://nvd.nist.gov/developers/vulnerabilities
def fetch_cve_data(api_key, last_fetch_time):
url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
params = {"resultsPerPage": 2000}
if last_fetch_time:
pub_start_date = last_fetch_time.strftime('%Y-%m-%dT%H:%M:%S')
else:
pub_start_date = (datetime.now(timezone.utc) - timedelta(days=1)).strftime('%Y-%m-%dT%H:%M:%S')
pub_end_date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S')
params['pubStartDate'] = pub_start_date
params['pubEndDate'] = pub_end_date
general_logger = logging.getLogger('GeneralLogger')
general_logger.debug(f"Request URL: {url}")
general_logger.debug(f"Request Params: {params}")
headers = {
"Accept": "application/json",
"apiKey": api_key
}
# output logging
try:
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
data = response.json()
general_logger.debug("Data fetched successfully")
return data
else:
error_message = f"Error: {response.status_code} - {response.text}"
general_logger.error(error_message)
return {"error": error_message}
except requests.RequestException as e:
general_logger.error(f"Request failed: {e}")
return {"error": f"Request failed: {e}"}
def main():
api_key = os.getenv('API_KEY')
if api_key is None:
print("API key not found. Please set the API_KEY environment variable.")
return
general_logger = setup_general_logger()
json_logger = setup_json_logger()
general_logger.debug("Starting main process")
last_fetch_time = get_last_fetch_time()
if last_fetch_time:
general_logger.debug(f"Last fetch time: {last_fetch_time}")
data = fetch_cve_data(api_key, last_fetch_time)
if 'error' in data:
error_message = data['error']
print(error_message)
general_logger.error(error_message)
else:
if 'timestamp' in data:
save_last_fetch_time(data['timestamp'])
log_message = json.dumps(data, indent=4)
print(log_message)
json_logger.info(log_message)
if __name__ == "__main__":
main()