Skip to content

Commit

Permalink
feat(aws): include resource metadata to remaining checks (#6551)
Browse files Browse the repository at this point in the history
Co-authored-by: MrCloudSec <hello@mistercloudsec.com>
  • Loading branch information
danibarranqueroo and MrCloudSec authored Jan 16, 2025
1 parent 784a909 commit 8d8994b
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def execute(self):
report.resource_id = backup_client.backup_plans[0].name
findings.append(report)
elif backup_client.backup_vaults:
report = Check_Report_AWS(self.metadata(), resource_metadata=backup_client)
report = Check_Report_AWS(metadata=self.metadata(), resource_metadata={})
report.region = backup_client.region
report.status = "FAIL"
report.status_extended = "No Backup Plan exist."
report.resource_arn = backup_client.backup_plan_arn_template
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ def execute(self):
report.resource_id = iam_client.audited_account
for role in iam_client.roles:
if role.name == "CloudWatch-CrossAccountSharingRole":
report = Check_Report_AWS(self.metadata(), role)
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=role
)
report.region = iam_client.region
report.status = "FAIL"
report.status_extended = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ class directconnect_virtual_interface_redundancy(Check):
def execute(self):
findings = []
for vgw in directconnect_client.vgws.values():
report = Check_Report_AWS(self.metadata())
report.resource_arn = vgw.arn
report.region = vgw.region
report.resource_id = vgw.id
report = Check_Report_AWS(metadata=self.metadata(), resource_metadata=vgw)
if len(vgw.vifs) < 2:
report.status = "FAIL"
report.status_extended = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ class efs_mount_target_not_publicly_accessible(Check):
def execute(self):
findings = []
for fs in efs_client.filesystems.values():
report = Check_Report_AWS(self.metadata())
report.region = fs.region
report.resource_id = fs.id
report.resource_arn = fs.arn
report.resource_tags = fs.tags
report = Check_Report_AWS(metadata=self.metadata(), resource_metadata=fs)
report.status = "PASS"
report.status_extended = (
f"EFS {fs.id} does not have any public mount targets."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@ def execute(self):
"ELBSecurityPolicy-TLS13-1-2-Ext1-2021-06",
"ELBSecurityPolicy-TLS13-1-2-Ext2-2021-06",
]
for lb_arn, lb in elbv2_client.loadbalancersv2.items():
report = Check_Report_AWS(self.metadata())
report.region = lb.region
report.resource_id = lb.name
report.resource_arn = lb_arn
report.resource_tags = lb.tags
for lb in elbv2_client.loadbalancersv2.values():
report = Check_Report_AWS(metadata=self.metadata(), resource_metadata=lb)
report.status = "PASS"
report.status_extended = (
f"ELBv2 {lb.name} does not have insecure SSL protocols or ciphers."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ class eventbridge_bus_cross_account_access(Check):
def execute(self):
findings = []
for bus in eventbridge_client.buses.values():
report = Check_Report_AWS(self.metadata())
report.resource_id = bus.name
report.resource_arn = bus.arn
report.resource_tags = bus.tags
report.region = bus.region
report = Check_Report_AWS(metadata=self.metadata(), resource_metadata=bus)
report.status = "PASS"
report.status_extended = (
f"EventBridge event bus {bus.name} does not allow cross-account access."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def execute(self) -> Check_Report_AWS:
) - parser.parse(user["access_key_1_last_used_date"])
if access_key_1_last_used_date.days > maximum_expiration_days:
old_access_keys = True
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=user
)
report.region = iam_client.region
report.resource_id = user["user"] + "/AccessKey1"
report.resource_arn = user["arn"]
Expand All @@ -63,7 +65,9 @@ def execute(self) -> Check_Report_AWS:
) - parser.parse(user["access_key_2_last_used_date"])
if access_key_2_last_used_date.days > maximum_expiration_days:
old_access_keys = True
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=user
)
report.region = iam_client.region
report.resource_id = user["user"] + "/AccessKey2"
report.resource_arn = user["arn"]
Expand All @@ -73,7 +77,9 @@ def execute(self) -> Check_Report_AWS:
findings.append(report)

if not old_access_keys:
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=user
)
report.region = iam_client.region
report.resource_id = user["user"]
report.resource_arn = user["arn"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def execute(self) -> Check_Report_AWS:
for record in record_set.records:
# Check if record is an IP Address
if validate_ip_address(record):
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=record_set
)
report.resource_id = (
f"{record_set.hosted_zone_id}/{record_set.name}/{record}"
)
Expand All @@ -38,7 +40,6 @@ def execute(self) -> Check_Report_AWS:
report.resource_tags = route53_client.hosted_zones[
record_set.hosted_zone_id
].tags
report.region = record_set.region
report.status = "PASS"
report.status_extended = f"Route53 record {record} (name: {record_set.name}) in Hosted Zone {route53_client.hosted_zones[record_set.hosted_zone_id].name} is not a dangling IP."
# If Public IP check if it is in the AWS Account
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ class sagemaker_models_vpc_settings_configured(Check):
def execute(self):
findings = []
for model in sagemaker_client.sagemaker_models:
report = Check_Report_AWS(self.metadata())
report.region = model.region
report.resource_id = model.name
report.resource_arn = model.arn
report.resource_tags = model.tags
report = Check_Report_AWS(metadata=self.metadata(), resource_metadata=model)
report.status = "PASS"
report.status_extended = (
f"Sagemaker notebook instance {model.name} has VPC settings enabled."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ def execute(self):
if shield_client.enabled:
for elbv2_arn, elbv2 in elbv2_client.loadbalancersv2.items():
if elbv2.type == "application" and elbv2.scheme == "internet-facing":
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=elbv2
)
report.region = shield_client.region
report.resource_id = elbv2.name
report.resource_arn = elbv2_arn
report.resource_tags = elbv2.tags
report.status = "FAIL"
report.status_extended = f"ELBv2 ALB {elbv2.name} is not protected by AWS Shield Advanced."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ def execute(self):
for subscription in topic.subscriptions:
if subscription.arn == "PendingConfirmation":
continue
report = Check_Report_AWS(self.metadata())
report.resource_id = subscription.id
report.resource_arn = subscription.arn
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=subscription
)
report.resource_details = topic.arn
report.status = "PASS"
report.status_extended = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ def execute(self):
"secrets_ignore_patterns", []
)
for document in ssm_client.documents.values():
report = Check_Report_AWS(self.metadata())
report.region = document.region
report.resource_arn = document.arn
report.resource_id = document.name
report.resource_tags = document.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=document
)
report.status = "PASS"
report.status_extended = (
f"No secrets found in SSM Document {document.name}."
Expand Down

0 comments on commit 8d8994b

Please sign in to comment.