Skip to content

Commit

Permalink
migrate prefect_aws.secrets_manager
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Dec 3, 2024
1 parent b52c2c9 commit 21dc297
Show file tree
Hide file tree
Showing 2 changed files with 291 additions and 93 deletions.
164 changes: 148 additions & 16 deletions src/integrations/prefect-aws/prefect_aws/secrets_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
from pydantic import Field

from prefect import task
from prefect._internal.compatibility.async_dispatch import async_dispatch
from prefect.blocks.abstract import SecretBlock
from prefect.logging import get_run_logger
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect_aws import AwsCredentials


Expand Down Expand Up @@ -365,22 +366,21 @@ class AwsSecret(SecretBlock):
secret_name: The name of the secret.
"""

_logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png" # noqa
_logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png" # type: ignore
_block_type_name = "AWS Secret"
_documentation_url = "https://docs.prefect.io/integrations/prefect-aws" # noqa
_documentation_url = "https://docs.prefect.io/integrations/prefect-aws" # type: ignore

aws_credentials: AwsCredentials
secret_name: str = Field(default=..., description="The name of the secret.")

@sync_compatible
async def read_secret(
async def aread_secret(
self,
version_id: Optional[str] = None,
version_stage: Optional[str] = None,
**read_kwargs: Dict[str, Any],
**read_kwargs: Any,
) -> bytes:
"""
Reads the secret from the secret storage service.
Asynchronously reads the secret from the secret storage service.
Args:
version_id: The version of the secret to read. If not provided, the latest
Expand All @@ -397,7 +397,7 @@ async def read_secret(
Reads a secret.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.read_secret()
await secrets_manager.aread_secret()
```
"""
client = self.aws_credentials.get_secrets_manager_client()
Expand All @@ -416,12 +416,53 @@ async def read_secret(
self.logger.info(f"The secret {arn!r} data was successfully read.")
return secret

@sync_compatible
async def write_secret(
@async_dispatch(aread_secret)
def read_secret(
self,
version_id: Optional[str] = None,
version_stage: Optional[str] = None,
**read_kwargs: Any,
) -> bytes:
"""
Reads the secret from the secret storage service.
Args:
version_id: The version of the secret to read. If not provided, the latest
version will be read.
version_stage: The version stage of the secret to read. If not provided,
the latest version will be read.
read_kwargs: Additional keyword arguments to pass to the
`get_secret_value` method of the boto3 client.
Returns:
The secret data.
Examples:
Reads a secret.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.read_secret()
```
"""
client = self.aws_credentials.get_secrets_manager_client()
if version_id is not None:
read_kwargs["VersionId"] = version_id
if version_stage is not None:
read_kwargs["VersionStage"] = version_stage
response = client.get_secret_value(SecretId=self.secret_name, **read_kwargs)
if "SecretBinary" in response:
secret = response["SecretBinary"]
elif "SecretString" in response:
secret = response["SecretString"]
arn = response["ARN"]
self.logger.info(f"The secret {arn!r} data was successfully read.")
return secret

async def awrite_secret(
self, secret_data: bytes, **put_or_create_secret_kwargs: Dict[str, Any]
) -> str:
"""
Writes the secret to the secret storage service as a SecretBinary;
Asynchronously writes the secret to the secret storage service as a SecretBinary;
if it doesn't exist, it will be created.
Args:
Expand All @@ -436,7 +477,7 @@ async def write_secret(
Write some secret data.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.write_secret(b"my_secret_data")
await secrets_manager.awrite_secret(b"my_secret_data")
```
"""
client = self.aws_credentials.get_secrets_manager_client()
Expand All @@ -461,15 +502,57 @@ async def write_secret(
self.logger.info(f"The secret data was written successfully to {arn!r}.")
return arn

@sync_compatible
async def delete_secret(
@async_dispatch(awrite_secret)
def write_secret(
self, secret_data: bytes, **put_or_create_secret_kwargs: Dict[str, Any]
) -> str:
"""
Writes the secret to the secret storage service as a SecretBinary;
if it doesn't exist, it will be created.
Args:
secret_data: The secret data to write.
**put_or_create_secret_kwargs: Additional keyword arguments to pass to
put_secret_value or create_secret method of the boto3 client.
Returns:
The path that the secret was written to.
Examples:
Write some secret data.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.write_secret(b"my_secret_data")
```
"""
client = self.aws_credentials.get_secrets_manager_client()
try:
response = client.put_secret_value(
SecretId=self.secret_name,
SecretBinary=secret_data,
**put_or_create_secret_kwargs,
)
except client.exceptions.ResourceNotFoundException:
self.logger.info(
f"The secret {self.secret_name!r} does not exist yet, creating it now."
)
response = client.create_secret(
Name=self.secret_name,
SecretBinary=secret_data,
**put_or_create_secret_kwargs,
)
arn = response["ARN"]
self.logger.info(f"The secret data was written successfully to {arn!r}.")
return arn

async def adelete_secret(
self,
recovery_window_in_days: int = 30,
force_delete_without_recovery: bool = False,
**delete_kwargs: Dict[str, Any],
) -> str:
"""
Deletes the secret from the secret storage service.
Asynchronously deletes the secret from the secret storage service.
Args:
recovery_window_in_days: The number of days to wait before permanently
Expand All @@ -486,7 +569,7 @@ async def delete_secret(
Deletes the secret with a recovery window of 15 days.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.delete_secret(recovery_window_in_days=15)
await secrets_manager.adelete_secret(recovery_window_in_days=15)
```
"""
if force_delete_without_recovery and recovery_window_in_days:
Expand All @@ -510,3 +593,52 @@ async def delete_secret(
arn = response["ARN"]
self.logger.info(f"The secret {arn} was deleted successfully.")
return arn

@async_dispatch(adelete_secret)
def delete_secret(
self,
recovery_window_in_days: int = 30,
force_delete_without_recovery: bool = False,
**delete_kwargs: Dict[str, Any],
) -> str:
"""
Deletes the secret from the secret storage service.
Args:
recovery_window_in_days: The number of days to wait before permanently
deleting the secret. Must be between 7 and 30 days.
force_delete_without_recovery: If True, the secret will be deleted
immediately without a recovery window.
**delete_kwargs: Additional keyword arguments to pass to the
delete_secret method of the boto3 client.
Returns:
The path that the secret was deleted from.
Examples:
Deletes the secret with a recovery window of 15 days.
```python
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.delete_secret(recovery_window_in_days=15)
```
"""
if force_delete_without_recovery and recovery_window_in_days:
raise ValueError(
"Cannot specify recovery window and force delete without recovery."
)
elif not (7 <= recovery_window_in_days <= 30):
raise ValueError(
"Recovery window must be between 7 and 30 days, got "
f"{recovery_window_in_days}."
)

client = self.aws_credentials.get_secrets_manager_client()
response = client.delete_secret(
SecretId=self.secret_name,
RecoveryWindowInDays=recovery_window_in_days,
ForceDeleteWithoutRecovery=force_delete_without_recovery,
**delete_kwargs,
)
arn = response["ARN"]
self.logger.info(f"The secret {arn} was deleted successfully.")
return arn
Loading

0 comments on commit 21dc297

Please sign in to comment.