Skip to content

Commit

Permalink
feat(network): extract Network resource metadata automated (#6555)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
  • Loading branch information
puchy22 and MrCloudSec authored Jan 16, 2025
1 parent f829145 commit 783ce13
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ def execute(self) -> Check_Report_Azure:
status = "PASS"
status_extended = f"Bastion Host from subscription {subscription} available are: {bastion_names}"

report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=bastion_hosts
)
report.subscription = subscription
report.resource_name = "Bastion Host"
report.resource_id = "Bastion Host"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ def execute(self) -> Check_Report_Azure:
findings = []
for subscription, network_watchers in network_client.network_watchers.items():
for network_watcher in network_watchers:
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=network_watcher
)
report.subscription = subscription
report.resource_name = network_watcher.name
report.resource_id = network_watcher.id
report.status = "FAIL"
report.location = network_watcher.location
report.status_extended = f"Network Watcher {network_watcher.name} from subscription {subscription} has no flow logs"
if network_watcher.flow_logs:
report.status = "FAIL"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ def execute(self) -> Check_Report_Azure:
findings = []
for subscription, network_watchers in network_client.network_watchers.items():
for network_watcher in network_watchers:
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=network_watcher
)
report.subscription = subscription
report.resource_name = network_watcher.name
report.resource_id = network_watcher.id
report.location = network_watcher.location
if network_watcher.flow_logs:
report.status = "PASS"
report.status_extended = f"Network Watcher {network_watcher.name} from subscription {subscription} has flow logs enabled for more than 90 days"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ def execute(self) -> Check_Report_Azure:
findings = []
for subscription, security_groups in network_client.security_groups.items():
for security_group in security_groups:
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=security_group
)
report.subscription = subscription
report.resource_name = security_group.name
report.resource_id = security_group.id
report.status = "PASS"
report.location = security_group.location
report.status_extended = f"Security Group {security_group.name} from subscription {subscription} has HTTP internet access restricted."
rule_fail_condition = any(
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ def execute(self):
api = shodan.Shodan(shodan_api_key)
for subscription, public_ips in network_client.public_ip_addresses.items():
for ip in public_ips:
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=ip
)
report.subscription = subscription
report.resource_name = ip.name
report.resource_id = ip.id
report.location = ip.location
try:
shodan_info = api.host(ip.ip_address)
report.status = "FAIL"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ def execute(self) -> Check_Report_Azure:
findings = []
for subscription, security_groups in network_client.security_groups.items():
for security_group in security_groups:
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=security_group
)
report.subscription = subscription
report.resource_name = security_group.name
report.resource_id = security_group.id
report.status = "PASS"
report.location = security_group.location
report.status_extended = f"Security Group {security_group.name} from subscription {subscription} has RDP internet access restricted."
rule_fail_condition = any(
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ def execute(self) -> Check_Report_Azure:
findings = []
for subscription, security_groups in network_client.security_groups.items():
for security_group in security_groups:
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=security_group
)
report.subscription = subscription
report.resource_name = security_group.name
report.resource_id = security_group.id
report.status = "PASS"
report.location = security_group.location
report.status_extended = f"Security Group {security_group.name} from subscription {subscription} has SSH internet access restricted."
rule_fail_condition = any(
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ def execute(self) -> Check_Report_Azure:
findings = []
for subscription, security_groups in network_client.security_groups.items():
for security_group in security_groups:
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=security_group
)
report.subscription = subscription
report.resource_name = security_group.name
report.resource_id = security_group.id
report.status = "PASS"
report.location = security_group.location
report.status_extended = f"Security Group {security_group.name} from subscription {subscription} has UDP internet access restricted."
rule_fail_condition = any(
rule.protocol in ["UDP", "Udp"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ class network_watcher_enabled(Check):
def execute(self) -> list[Check_Report_Azure]:
findings = []
for subscription, network_watchers in network_client.network_watchers.items():
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=network_watchers
)
report.subscription = subscription
report.resource_name = "Network Watcher"
report.location = "global"
Expand Down

0 comments on commit 783ce13

Please sign in to comment.