-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopenvpn_status_ws_helper.py
145 lines (106 loc) · 3.39 KB
/
openvpn_status_ws_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""File that contains helper functions."""
import os
import json
import re
def validate_config():
"""Checks if configuration file is valid and return boolean based on it."""
if not get_config_path():
print("> Configuration file 'config.json' doesn't exist, unable to continue.")
return False
return True
def get_node_from_uri(uri):
"""Parses given URI and returns it."""
node = re.sub(r'\D', '', uri)
return int(node)
def get_config_path():
"""Returns path of the config file, None upon failure."""
container = os.path.dirname(os.path.realpath(__file__))
config_path = os.path.join(container, 'config.json')
if not os.path.isfile(config_path):
return None
return config_path
def get_config_dict():
"""Returns contents of the config file as dict, None upon failure."""
config_path = get_config_path()
with open(config_path, 'r') as file_handle:
data = json.load(file_handle)
return data
return None
def get_default_port():
"""Returns value that will be used as default server port."""
config = get_config_dict()
try:
return int(config['port'])
except KeyError:
pass
return 12200
def get_address():
"""Returns address from config file, None upon failure."""
config = get_config_dict()
try:
if bool(config['address']):
return config['address']
except KeyError:
return None
return None
def get_addresses():
"""Returns list of addresses from config file, None upon failure."""
config = get_config_dict()
try:
return config['addresses']
except KeyError:
return None
def get_default_address():
"""Returns value that will be used as default server address."""
address = get_address()
if not address:
addresses = get_addresses()
if addresses:
return addresses
return ['0.0.0.0', '::']
return address
def get_node_ids():
"""Returns list of ids, it can be empty list."""
data = get_config_dict()
nodes = list()
for node in data['nodes']:
nodes.append(int(node['id']))
return nodes
def get_status_log_path_for_node(node_id):
"""Returns path of the status log for node, None upon failure."""
data = get_config_dict()
nodes = data['nodes']
for node in nodes:
if int(node['id']) == int(node_id):
return node['path'].strip()
return None
def get_origins_for_node(node_id):
"""Returns list of origins for node, None upon failure."""
data = get_config_dict()
nodes = data['nodes']
for node in nodes:
if int(node['id']) == int(node_id):
try:
return node['origins']
except KeyError:
return None
return None
def build_open_node_log_string(request, node):
"""Method that builds up string that gets appended to log later."""
values = list()
try:
values.append(request.remote_ip)
except KeyError:
values.append('UNKNOWN_IP')
values.append(node)
try:
values.append(request.headers['Origin'])
except KeyError:
values.append('UNKNOWN_ORIGIN')
try:
values.append(request.headers['User-Agent'])
except KeyError:
values.append('UNKNOWN_USER_AGENT')
return '{NODE} %s - OPEN %s FROM %s - %s' % tuple(values)