diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 22963d98..e913729a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,7 +30,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13.0"] + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] os: [ubuntu-latest, windows-latest, macos-latest] steps: diff --git a/minio/minioadmin.py b/minio/minioadmin.py index ab4507b7..97f42341 100644 --- a/minio/minioadmin.py +++ b/minio/minioadmin.py @@ -62,7 +62,6 @@ class _COMMAND(Enum): SET_USER_OR_GROUP_POLICY = "set-user-or-group-policy" LIST_CANNED_POLICIES = "list-canned-policies" REMOVE_CANNED_POLICY = "remove-canned-policy" - UNSET_USER_OR_GROUP_POLICY = "idp/builtin/policy/detach" CANNED_POLICY_INFO = "info-canned-policy" SET_BUCKET_QUOTA = "set-bucket-quota" GET_BUCKET_QUOTA = "get-bucket-quota" @@ -98,6 +97,9 @@ class _COMMAND(Enum): IDP_LDAP_POLICY_DETACH = "idp/ldap/policy/detach" IDP_LDAP_LIST_ACCESS_KEYS = "idp/ldap/list-access-keys" IDP_LDAP_LIST_ACCESS_KEYS_BULK = "idp/ldap/list-access-keys-bulk" + IDP_BUILTIN_POLICY_ATTACH = "idp/builtin/policy/attach" + IDP_BUILTIN_POLICY_DETACH = "idp/builtin/policy/detach" + IDP_BUILTIN_POLICY_ENTITIES = "idp/builtin/policy-entities" def _safe_str(value: Any) -> str: @@ -476,7 +478,7 @@ def policy_list(self) -> str: def policy_set( self, - policy_name: str | list[str], + policy_name: str, user: str | None = None, group: str | None = None, ) -> str: @@ -499,29 +501,9 @@ def policy_unset( group: str | None = None, ) -> str: """Unset an IAM policy for a user or group.""" - if (user is not None) ^ (group is not None): - policies = ( - policy_name if isinstance(policy_name, list) else [policy_name] - ) - data: dict[str, str | list[str]] = {"policies": policies} - if user: - data["user"] = user - if group: - data["group"] = group - response = self._url_open( - "POST", - _COMMAND.UNSET_USER_OR_GROUP_POLICY, - body=encrypt( - json.dumps(data).encode(), - self._provider.retrieve().secret_key, - ), - preload_content=False, - ) - plain_data = decrypt( - response, self._provider.retrieve().secret_key, - ) - return plain_data.decode() - raise ValueError("either user or group must be set") + return self.detach_policy( + policy_name if isinstance(policy_name, list) else [policy_name], + user, group) def config_get(self, key: str | None = None) -> str: """Get configuration parameters.""" @@ -847,14 +829,14 @@ def delete_service_account(self, access_key: str) -> str: ) return response.data.decode() - def _attach_detach_policy_ldap( + def _attach_detach_policy( self, command: _COMMAND, policies: list[str], user: str | None = None, group: str | None = None, ) -> str: - """Attach or detach policies for LDAP.""" + """Attach or detach policies for builtin or LDAP.""" if (user is not None) ^ (group is not None): key = "user" if user else "group" body = json.dumps( @@ -876,7 +858,7 @@ def attach_policy_ldap( group: str | None = None, ) -> str: """Attach policies for LDAP.""" - return self._attach_detach_policy_ldap( + return self._attach_detach_policy( _COMMAND.IDP_LDAP_POLICY_ATTACH, policies, user, group, ) @@ -887,7 +869,7 @@ def detach_policy_ldap( group: str | None = None, ) -> str: """Detach policies for LDAP.""" - return self._attach_detach_policy_ldap( + return self._attach_detach_policy( _COMMAND.IDP_LDAP_POLICY_DETACH, policies, user, group, ) @@ -927,3 +909,42 @@ def list_access_keys_ldap_bulk( response, self._provider.retrieve().secret_key, ) return plain_data.decode() + + def attach_policy( + self, + policies: list[str], + user: str | None = None, + group: str | None = None, + ) -> str: + """Attach builtin policies.""" + return self._attach_detach_policy( + _COMMAND.IDP_BUILTIN_POLICY_ATTACH, policies, user, group, + ) + + def detach_policy( + self, + policies: list[str], + user: str | None = None, + group: str | None = None, + ) -> str: + """Detach builtin policies.""" + return self._attach_detach_policy( + _COMMAND.IDP_BUILTIN_POLICY_DETACH, policies, user, group, + ) + + def get_policy_entities( + self, + users: list[str], + groups: list[str], + policies: list[str], + ) -> str: + """Get builtin policy entities.""" + response = self._url_open( + "GET", _COMMAND.IDP_BUILTIN_POLICY_ENTITIES, + query_params={"user": users, "group": groups, "policy": policies}, + preload_content=False, + ) + plain_data = decrypt( + response, self._provider.retrieve().secret_key, + ) + return plain_data.decode()