-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
124 lines (82 loc) · 3.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import yaml
from collections import namedtuple
import sys
import time
import os
import requests
import functools
import dns.resolver
import dns.exception
import operator
import logging
DnsResponse = namedtuple('DnsResponse', ['ipv4', 'expiry', 'response_time'])
HostConfiguration = namedtuple('HostConfiguration', ['name', 'webhook_uri'])
def _load_configuration(path):
with open(path, 'r') as f:
data = yaml.safe_load(f)
return {k: HostConfiguration(v['name'], v['webhook_uri']) for k, v in data.items()}
def _load_response_cache(path):
try:
with open(path, 'r') as f:
cache = yaml.safe_load(f)
if cache is None:
return {}
return {k: DnsResponse(v['ipv4'], v['expiry'], v['response_time']) for k, v in cache.items()}
except FileNotFoundError:
logging.error(f'Cache file "{path}" not found')
return {}
def _save_response_cache(path, responses):
dict_values = {k: v._asdict() for k, v in responses.items()}
with open(path, 'w') as f:
yaml.dump(dict_values, f)
def _is_response_stale(responses, host):
response = responses.get(host)
return response is None or response.expiry < time.time()
def _notify_webhook(host, host_config, response):
content = f'IP address for **{host_config.name}** ({host}) is now **{response.ipv4}**'
resp = requests.post(
host_config.webhook_uri,
headers={'Content-Type': 'application/json'},
json={'content': content}
)
logging.debug(f'Received HTTP response from WebHook API: {resp}')
def _check_dns(host):
logging.debug(f'Retrieving A record for {host}')
answers = dns.resolver.resolve(host, 'A')
answer = answers[0]
response = DnsResponse(answer.address, answers.expiration, time.time())
logging.debug(f'Received response: {response}')
return response
def main():
logging.basicConfig(level=logging.DEBUG)
config = _load_configuration(os.environ.get('CONFIG_FILE', 'config.yml'))
logging.info(f'Loaded configuration: {config}')
response_cache_path = os.environ.get('CACHE_FILE', 'response_cache.yml')
responses = _load_response_cache(response_cache_path)
logging.info(f'Cached responses: {responses}')
stale_check_predicate = functools.partial(_is_response_stale, responses)
while True:
stale_hosts = filter(stale_check_predicate, config.keys())
for stale_host in stale_hosts:
old_response = responses.get(stale_host)
try:
response = _check_dns(stale_host)
except dns.exception.DNSException as e:
logging.exception(f'Exception occurred executing DNS query for {stale_host}', e)
continue
if old_response is None or old_response.ipv4 != response.ipv4:
old_ipv4 = None if old_response is None else old_response.ipv4
logging.info(f'{stale_host} IP address changed to {response.ipv4} from {old_ipv4}')
try:
_notify_webhook(stale_host, config[stale_host], response)
except requests.exceptions.RequestException as e:
logging.exception(f'Exception occurred executing WebHook API HTTP request', e)
continue
responses[stale_host] = response
_save_response_cache(response_cache_path, responses)
next_check = min(map(operator.attrgetter('expiry'), responses.values()))
sleep_time = max(30, next_check - time.time())
logging.debug(f'Sleeping {sleep_time} seconds')
time.sleep(sleep_time)
if __name__ == '__main__':
main()