Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate prefect_aws.secrets_manager off sync_compatible #16169

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 148 additions & 16 deletions src/integrations/prefect-aws/prefect_aws/secrets_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
from pydantic import Field

from prefect import task
from prefect._internal.compatibility.async_dispatch import async_dispatch
from prefect.blocks.abstract import SecretBlock
from prefect.logging import get_run_logger
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect_aws import AwsCredentials


Expand Down Expand Up @@ -365,22 +366,21 @@ class AwsSecret(SecretBlock):
secret_name: The name of the secret.
"""

_logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png" # noqa
_logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png" # type: ignore
_block_type_name = "AWS Secret"
_documentation_url = "https://docs.prefect.io/integrations/prefect-aws" # noqa
_documentation_url = "https://docs.prefect.io/integrations/prefect-aws" # type: ignore

aws_credentials: AwsCredentials
secret_name: str = Field(default=..., description="The name of the secret.")

@sync_compatible
async def read_secret(
async def aread_secret(
self,
version_id: Optional[str] = None,
version_stage: Optional[str] = None,
**read_kwargs: Dict[str, Any],
**read_kwargs: Any,
) -> bytes:
"""
Reads the secret from the secret storage service.
Asynchronously reads the secret from the secret storage service.

Args:
version_id: The version of the secret to read. If not provided, the latest
Expand All @@ -397,7 +397,7 @@ async def read_secret(
Reads a secret.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.read_secret()
await secrets_manager.aread_secret()
```
"""
client = self.aws_credentials.get_secrets_manager_client()
Expand All @@ -416,12 +416,53 @@ async def read_secret(
self.logger.info(f"The secret {arn!r} data was successfully read.")
return secret

@sync_compatible
async def write_secret(
@async_dispatch(aread_secret)
def read_secret(
self,
version_id: Optional[str] = None,
version_stage: Optional[str] = None,
**read_kwargs: Any,
) -> bytes:
"""
Reads the secret from the secret storage service.

Args:
version_id: The version of the secret to read. If not provided, the latest
version will be read.
version_stage: The version stage of the secret to read. If not provided,
the latest version will be read.
read_kwargs: Additional keyword arguments to pass to the
`get_secret_value` method of the boto3 client.

Returns:
The secret data.

Examples:
Reads a secret.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.read_secret()
```
"""
client = self.aws_credentials.get_secrets_manager_client()
if version_id is not None:
read_kwargs["VersionId"] = version_id
if version_stage is not None:
read_kwargs["VersionStage"] = version_stage
response = client.get_secret_value(SecretId=self.secret_name, **read_kwargs)
if "SecretBinary" in response:
secret = response["SecretBinary"]
elif "SecretString" in response:
secret = response["SecretString"]
arn = response["ARN"]
self.logger.info(f"The secret {arn!r} data was successfully read.")
return secret

async def awrite_secret(
self, secret_data: bytes, **put_or_create_secret_kwargs: Dict[str, Any]
) -> str:
"""
Writes the secret to the secret storage service as a SecretBinary;
Asynchronously writes the secret to the secret storage service as a SecretBinary;
if it doesn't exist, it will be created.

Args:
Expand All @@ -436,7 +477,7 @@ async def write_secret(
Write some secret data.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.write_secret(b"my_secret_data")
await secrets_manager.awrite_secret(b"my_secret_data")
```
"""
client = self.aws_credentials.get_secrets_manager_client()
Expand All @@ -461,15 +502,57 @@ async def write_secret(
self.logger.info(f"The secret data was written successfully to {arn!r}.")
return arn

@sync_compatible
async def delete_secret(
@async_dispatch(awrite_secret)
def write_secret(
self, secret_data: bytes, **put_or_create_secret_kwargs: Dict[str, Any]
) -> str:
"""
Writes the secret to the secret storage service as a SecretBinary;
if it doesn't exist, it will be created.

Args:
secret_data: The secret data to write.
**put_or_create_secret_kwargs: Additional keyword arguments to pass to
put_secret_value or create_secret method of the boto3 client.

Returns:
The path that the secret was written to.

Examples:
Write some secret data.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.write_secret(b"my_secret_data")
```
"""
client = self.aws_credentials.get_secrets_manager_client()
try:
response = client.put_secret_value(
SecretId=self.secret_name,
SecretBinary=secret_data,
**put_or_create_secret_kwargs,
)
except client.exceptions.ResourceNotFoundException:
self.logger.info(
f"The secret {self.secret_name!r} does not exist yet, creating it now."
)
response = client.create_secret(
Name=self.secret_name,
SecretBinary=secret_data,
**put_or_create_secret_kwargs,
)
arn = response["ARN"]
self.logger.info(f"The secret data was written successfully to {arn!r}.")
return arn

async def adelete_secret(
self,
recovery_window_in_days: int = 30,
force_delete_without_recovery: bool = False,
**delete_kwargs: Dict[str, Any],
) -> str:
"""
Deletes the secret from the secret storage service.
Asynchronously deletes the secret from the secret storage service.

Args:
recovery_window_in_days: The number of days to wait before permanently
Expand All @@ -486,7 +569,7 @@ async def delete_secret(
Deletes the secret with a recovery window of 15 days.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.delete_secret(recovery_window_in_days=15)
await secrets_manager.adelete_secret(recovery_window_in_days=15)
```
"""
if force_delete_without_recovery and recovery_window_in_days:
Expand All @@ -510,3 +593,52 @@ async def delete_secret(
arn = response["ARN"]
self.logger.info(f"The secret {arn} was deleted successfully.")
return arn

@async_dispatch(adelete_secret)
def delete_secret(
self,
recovery_window_in_days: int = 30,
force_delete_without_recovery: bool = False,
**delete_kwargs: Dict[str, Any],
) -> str:
"""
Deletes the secret from the secret storage service.

Args:
recovery_window_in_days: The number of days to wait before permanently
deleting the secret. Must be between 7 and 30 days.
force_delete_without_recovery: If True, the secret will be deleted
immediately without a recovery window.
**delete_kwargs: Additional keyword arguments to pass to the
delete_secret method of the boto3 client.

Returns:
The path that the secret was deleted from.

Examples:
Deletes the secret with a recovery window of 15 days.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.delete_secret(recovery_window_in_days=15)
```
"""
if force_delete_without_recovery and recovery_window_in_days:
raise ValueError(
"Cannot specify recovery window and force delete without recovery."
)
elif not (7 <= recovery_window_in_days <= 30):
raise ValueError(
"Recovery window must be between 7 and 30 days, got "
f"{recovery_window_in_days}."
)

client = self.aws_credentials.get_secrets_manager_client()
response = client.delete_secret(
SecretId=self.secret_name,
RecoveryWindowInDays=recovery_window_in_days,
ForceDeleteWithoutRecovery=force_delete_without_recovery,
**delete_kwargs,
)
arn = response["ARN"]
self.logger.info(f"The secret {arn} was deleted successfully.")
return arn
Loading
Loading