Skip to content

Commit

Permalink
MinioAdmin: add IDP/LDAP attach/detach/list APIs (#1470)
Browse files Browse the repository at this point in the history
Signed-off-by: Bala.FA <bala@minio.io>
  • Loading branch information
balamurugana authored Dec 31, 2024
1 parent 5bc4dd0 commit 2af0b46
Showing 1 changed file with 85 additions and 0 deletions.
85 changes: 85 additions & 0 deletions minio/minioadmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ class _COMMAND(Enum):
SERVICE_ACCOUNT_ADD = "add-service-account"
SERVICE_ACCOUNT_UPDATE = "update-service-account"
SERVICE_ACCOUNT_DELETE = "delete-service-account"
IDP_LDAP_POLICY_ATTACH = "idp/ldap/policy/attach"
IDP_LDAP_POLICY_DETACH = "idp/ldap/policy/detach"
IDP_LDAP_LIST_ACCESS_KEYS = "idp/ldap/list-access-keys"
IDP_LDAP_LIST_ACCESS_KEYS_BULK = "idp/ldap/list-access-keys-bulk"


def _safe_str(value: Any) -> str:
Expand Down Expand Up @@ -842,3 +846,84 @@ def delete_service_account(self, access_key: str) -> str:
query_params={"accessKey": access_key},
)
return response.data.decode()

def _attach_detach_policy_ldap(
self,
command: _COMMAND,
policies: list[str],
user: str | None = None,
group: str | None = None,
) -> str:
"""Attach or detach policies for LDAP."""
if (user is not None) ^ (group is not None):
key = "user" if user else "group"
body = json.dumps(
{"policies": policies,
key: cast(str, user or group)},
).encode()
response = self._url_open(
"POST",
command,
body=encrypt(body, self._provider.retrieve().secret_key),
)
return response.data.decode()
raise ValueError("either user or group must be set")

def attach_policy_ldap(
self,
policies: list[str],
user: str | None = None,
group: str | None = None,
) -> str:
"""Attach policies for LDAP."""
return self._attach_detach_policy_ldap(
_COMMAND.IDP_LDAP_POLICY_ATTACH, policies, user, group,
)

def detach_policy_ldap(
self,
policies: list[str],
user: str | None = None,
group: str | None = None,
) -> str:
"""Detach policies for LDAP."""
return self._attach_detach_policy_ldap(
_COMMAND.IDP_LDAP_POLICY_DETACH, policies, user, group,
)

def list_access_keys_ldap(
self,
user_dn: str,
list_type: str,
) -> str:
"""List service accounts belonging to the specified user."""
response = self._url_open(
"GET", _COMMAND.IDP_LDAP_LIST_ACCESS_KEYS,
query_params={"userDN": user_dn, "listType": list_type},
preload_content=False,
)
plain_data = decrypt(
response, self._provider.retrieve().secret_key,
)
return plain_data.decode()

def list_access_keys_ldap_bulk(
self,
users: list[str],
list_type: str,
all_users: bool,
) -> str:
"""List access keys belonging to the given users or all users."""
if len(users) != 0 and all_users:
raise ValueError("both users and all_users are not permitted")

key, value = ("all", "true") if all_users else ("userDNs", users)
response = self._url_open(
"GET", _COMMAND.IDP_LDAP_LIST_ACCESS_KEYS_BULK,
query_params={"listType": list_type, key: value},
preload_content=False,
)
plain_data = decrypt(
response, self._provider.retrieve().secret_key,
)
return plain_data.decode()

0 comments on commit 2af0b46

Please sign in to comment.