Skip to content

Commit

Permalink
feat(entra): extract Entra resource metadata automated (#6542)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
  • Loading branch information
puchy22 and MrCloudSec authored Jan 16, 2025
1 parent 43fd9ee commit ee7d32d
Show file tree
Hide file tree
Showing 24 changed files with 220 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,7 @@ def execute(self) -> Check_Report_Azure:
tenant_name,
conditional_access_policies,
) in entra_client.conditional_access_policy.items():
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report.subscription = f"Tenant: {tenant_name}"
report.resource_name = "Conditional Access Policy"
report.resource_id = "Conditional Access Policy"
report.status_extended = (
"Conditional Access Policy does not require MFA for management API."
)

for policy_id, policy in conditional_access_policies.items():
for policy in conditional_access_policies.values():
if (
policy.state == "enabled"
and "All" in policy.users["include"]
Expand All @@ -31,13 +22,24 @@ def execute(self) -> Check_Report_Azure:
for access_control in policy.access_controls["grant"]
)
):
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=policy
)
report.subscription = f"Tenant: {tenant_name}"
report.status = "PASS"
report.status_extended = (
"Conditional Access Policy requires MFA for management API."
)
report.resource_id = policy_id
report.resource_name = policy.name
break
else:
report = Check_Report_Azure(self.metadata())
report.subscription = f"Tenant: {tenant_name}"
report.resource_name = "Conditional Access Policy"
report.resource_id = "Conditional Access Policy"
report.status = "FAIL"
report.status_extended = (
"Conditional Access Policy does not require MFA for management API."
)

findings.append(report)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ def execute(self) -> Check_Report_Azure:
findings = []

for tenant_domain, directory_roles in entra_client.directory_roles.items():
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=entra_client.users
)
report.status = "FAIL"
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = "Global Administrator"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ def execute(self) -> Check_Report_Azure:
if not is_privileged_user(
user, entra_client.directory_roles[tenant_domain]
):
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report = Check_Report_Azure(self.metadata(), resource_metadata=user)
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = user_domain_name
report.resource_id = user.id
report.status = "FAIL"
report.status_extended = (
f"Non-privileged user {user.name} does not have MFA."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ def execute(self) -> Check_Report_Azure:
findings = []

for tenant_domain, auth_policy in entra_client.authorization_policy.items():

report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=auth_policy
)
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = getattr(auth_policy, "name", "Authorization Policy")
report.resource_id = getattr(auth_policy, "id", "authorizationPolicy")
report.status = "FAIL"
report.status_extended = "Non-privileged users are able to create security groups via the Access Panel and the Azure administration portal."

if getattr(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ def execute(self) -> Check_Report_Azure:
findings = []

for tenant_domain, auth_policy in entra_client.authorization_policy.items():

report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=auth_policy
)
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = getattr(auth_policy, "name", "Authorization Policy")
report.resource_id = getattr(auth_policy, "id", "authorizationPolicy")
report.status = "FAIL"
report.status_extended = "App creation is not disabled for non-admin users."

if getattr(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ def execute(self) -> Check_Report_Azure:
findings = []

for tenant_domain, auth_policy in entra_client.authorization_policy.items():
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=auth_policy
)
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = getattr(auth_policy, "name", "Authorization Policy")
report.resource_id = getattr(auth_policy, "id", "authorizationPolicy")
report.status = "FAIL"
report.status_extended = (
"Tenants creation is not disabled for non-admin users."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ def execute(self) -> Check_Report_Azure:
findings = []

for tenant_domain, auth_policy in entra_client.authorization_policy.items():
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=auth_policy
)
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = getattr(auth_policy, "name", "Authorization Policy")
report.resource_id = getattr(auth_policy, "id", "authorizationPolicy")
report.status = "FAIL"
report.status_extended = "Guest invitations are not restricted to users with specific administrative roles only."

if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ def execute(self) -> Check_Report_Azure:
findings = []

for tenant_domain, auth_policy in entra_client.authorization_policy.items():
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=auth_policy
)
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = getattr(auth_policy, "name", "Authorization Policy")
report.resource_id = getattr(auth_policy, "id", "authorizationPolicy")
report.status = "FAIL"
report.status_extended = "Guest user access is not restricted to properties and memberships of their own directory objects"

if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ def execute(self) -> Check_Report_Azure:
findings = []

for tenant_domain, auth_policy in entra_client.authorization_policy.items():
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=auth_policy
)
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = getattr(auth_policy, "name", "Authorization Policy")
report.resource_id = getattr(auth_policy, "id", "authorizationPolicy")
report.status = "FAIL"
report.status_extended = "Entra allows users to consent apps accessing company data on their behalf"

if getattr(auth_policy, "default_user_role_permissions", None) and not any(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ def execute(self) -> Check_Report_Azure:
findings = []

for tenant_domain, auth_policy in entra_client.authorization_policy.items():
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=auth_policy
)
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = getattr(auth_policy, "name", "Authorization Policy")
report.resource_id = getattr(auth_policy, "id", "authorizationPolicy")
report.status = "PASS"
report.status_extended = "Entra does not allow users to consent non-verified apps accessing company data on their behalf."

if getattr(auth_policy, "default_user_role_permissions", None) and any(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ def execute(self) -> Check_Report_Azure:
if is_privileged_user(
user, entra_client.directory_roles[tenant_domain]
):
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=user
)
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = user_domain_name
report.resource_id = user.id
report.status = "FAIL"
report.status_extended = (
f"Privileged user {user.name} does not have MFA."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ def execute(self) -> Check_Report_Azure:
tenant,
security_default,
) in entra_client.security_default.items():
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=security_default
)
report.subscription = f"Tenant: {tenant}"
report.resource_name = getattr(security_default, "name", "Security Default")
report.resource_id = getattr(security_default, "id", "Security Default")
report.status = "FAIL"
report.status_extended = "Entra security defaults is diabled."

if getattr(security_default, "is_enabled", False):
Expand Down
4 changes: 4 additions & 0 deletions prowler/providers/azure/services/entra/entra_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ async def _get_named_locations(self):
named_locations[tenant].update(
{
named_location.id: NamedLocation(
id=named_location.id,
name=named_location.display_name,
ip_ranges_addresses=[
getattr(ip_range, "cidr_address", None)
Expand Down Expand Up @@ -274,6 +275,7 @@ async def _get_conditional_access_policy(self):
conditional_access_policy[tenant].update(
{
policy.id: ConditionalAccessPolicy(
id=policy.id,
name=policy.display_name,
state=getattr(policy, "state", "None"),
users={
Expand Down Expand Up @@ -337,6 +339,7 @@ class SecurityDefault(BaseModel):


class NamedLocation(BaseModel):
id: str
name: str
ip_ranges_addresses: List[str]
is_trusted: bool
Expand All @@ -348,6 +351,7 @@ class DirectoryRole(BaseModel):


class ConditionalAccessPolicy(BaseModel):
id: str
name: str
state: str
users: dict[str, List[str]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ def execute(self) -> Check_Report_Azure:
findings = []

for tenant, named_locations in entra_client.named_locations.items():
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=named_locations
)
report.status = "FAIL"
report.subscription = f"Tenant: {tenant}"
report.resource_name = "Named Locations"
report.resource_id = "Named Locations"
report.status_extended = (
"There is no trusted location with IP ranges defined."
)
for named_location_id, named_location in named_locations.items():
report.resource_name = named_location.name
report.resource_id = named_location_id

for named_location in named_locations.values():
if named_location.ip_ranges_addresses and named_location.is_trusted:
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=named_location
)
report.subscription = f"Tenant: {tenant}"
report.status = "PASS"
report.status_extended = f"Exits trusted location with trusted IP ranges, this IPs ranges are: {[ip_range for ip_range in named_location.ip_ranges_addresses if ip_range]}"
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,12 @@ def execute(self) -> Check_Report_Azure:
]
and assignment.agent_id == user.id
):
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=user
)
report.subscription = subscription_name
report.status = "FAIL"
report.status_extended = f"User {user.name} without MFA can access VMs in subscription {subscription_name}"
report.subscription = subscription_name
report.resource_name = user_domain_name
report.resource_id = user.id

if len(user.authentication_methods) > 1:
report.status = "PASS"
report.status_extended = f"User {user.name} can access VMs in subscription {subscription_name} but it has MFA."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ def execute(self) -> Check_Report_Azure:
findings = []

for tenant_domain, group_settings in entra_client.group_settings.items():
report = Check_Report_Azure(self.metadata())
report = Check_Report_Azure(
metadata=self.metadata(), resource_metadata=group_settings
)
report.status = "FAIL"
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = "Microsoft365 Groups"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def test_entra_tenant_policy_no_mfa(self):
)

policy = ConditionalAccessPolicy(
id=policy_id,
name="Test Policy",
state="enabled",
users={"include": ["All"]},
Expand Down Expand Up @@ -112,6 +113,7 @@ def test_entra_tenant_policy_mfa(self):
)

policy = ConditionalAccessPolicy(
id=policy_id,
name="Test Policy",
state="enabled",
users={"include": ["All"]},
Expand Down Expand Up @@ -152,6 +154,7 @@ def test_entra_tenant_policy_mfa_disabled(self):
)

policy = ConditionalAccessPolicy(
id=policy_id,
name="Test Policy",
state="disabled",
users={"include": ["All"]},
Expand Down Expand Up @@ -192,6 +195,7 @@ def test_entra_tenant_policy_mfa_no_target(self):
)

policy = ConditionalAccessPolicy(
id=policy_id,
name="Test Policy",
state="enabled",
users={"include": ["All"]},
Expand Down Expand Up @@ -232,6 +236,7 @@ def test_entra_tenant_policy_mfa_no_users(self):
)

policy = ConditionalAccessPolicy(
id=policy_id,
name="Test Policy",
state="enabled",
users={"include": []},
Expand Down
Loading

0 comments on commit ee7d32d

Please sign in to comment.