-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathk8s_example.py
108 lines (92 loc) · 3.77 KB
/
k8s_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import logging
import requests
from yaml import dump
class V1Connector:
def __init__(self, api_key: str, api_version: str = 'beta') -> None:
self.api_key = api_key
self.base_url = f'https://api.xdr.trendmicro.com/{api_version}/containerSecurity'
self.headers = {'Authorization': f'Bearer {self.api_key}'}
def list_clusters(self) -> dict | None:
try:
response = requests.get(
f'{self.base_url}/kubernetesClusters',
headers=self.headers
)
if response.status_code == 200:
return response.json()
return {'error': response.reason, 'status_code': response.status_code}
except Exception as error:
logging.error(f'Error in list_clusters: {error}')
def list_policies(self) -> dict | None:
try:
response = requests.get(
f'{self.base_url}/policies',
headers=self.headers
)
if response.status_code == 200:
return response.json()
return {'error': response.reason, 'status_code': response.status_code}
except Exception as error:
logging.error(f'Error in list_policies: {error}')
def list_rulesets(self) -> dict | None:
try:
response = requests.get(
f'{self.base_url}/rulesets',
headers=self.headers
)
if response.status_code == 200:
return response.json()
return {'error': response.reason, 'status_code': response.status_code}
except Exception as error:
logging.error(f'Error in list_rulesets: {error}')
def list_vulns(self) -> dict | None:
try:
response = requests.get(
f'{self.base_url}/vulnerabilities',
headers=self.headers
)
if response.status_code == 200:
return response.json()
return {'error': response.reason, 'status_code': response.status_code}
except Exception as error:
logging.error(f'Error in list_vulns: {error}')
def register_cluster(self, name: str, description: str = "",
policy_id: str = "", arn: str = "",
exclusions: list = [],
runtime: bool = True, vuln_scanning: bool = True,
inventory: bool = True) -> dict | str | None:
try:
response = requests.post(
f'{self.base_url}/kubernetesClusters',
headers=self.headers,
json={
'name': name,
'description': description,
'policyId': policy_id,
'arn': arn
}
)
if response.status_code == 201:
data = response.json()
yaml_dict = {
'cloudOne' : {
'apiKey': data.get('apiKey'),
'endpoint': data.get('endpointUrl'),
'exclusion': {
'namespaces': exclusions
},
'runtimeSecurity': {
'enabled': runtime
},
'vulnerabilityScanning': {
'enabled': vuln_scanning
},
'inventoryCollection': {
'enabled': inventory
}
}
}
return dump(yaml_dict)
return {'error': response.reason, 'status_code': response.status_code}
except Exception as error:
logging.error(f'Error in register_cluster: {error}')