diff --git a/README.md b/README.md index bba91c1..5298347 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,9 @@ This collection includes Python and Bash scripts for managing various AWS servic | CloudWatch | [cw_set_retention_policy.py](cloudwatch/cw_set_retention_policy.py) | Sets retention policy for log groups | | CodePipeline | [cp_slack_notifications.py](codepipeline/cp_slack_notifications.py) | Enables notifications on Slack | | EC2 | [ec2_delete_unattached_volumes.py](ec2/ec2_delete_unattached_volumes.py) | Deletes unattached EBS volumes | +| EC2 | [ec2_delete_orphaned_snapshots.py](ec2/ec2_delete_orphaned_snapshots.py) | Deletes snapshots that are not associated with any volumes | +| EC2 | [ec2_delete_ssh_access_security_groups.py](ec2/ec2_delete_ssh_access_security_groups.py) | Deletes SSH (port 22) inbound rules from all security groups | +| EC2 | [ec2_delete_unused_amis.py](ec2/ec2_delete_unused_amis.py) | Deletes unused AMIs (Amazon Machine Images) in an AWS account | | EC2 | [ec2_delete_unused_eips.py](ec2/ec2_delete_unused_eips.py) | Deletes unused Elastic IPs | | EC2 | [ec2_delete_unused_keypairs_all_regions.py](ec2/ec2_delete_unused_keypairs_all_regions.py) | Deletes unused EC2 keypairs in all regions | | EC2 | [ec2_delete_unused_keypairs_single_region.py](ec2/ec2_delete_unused_keypairs_single_region.py) | Deletes unused EC2 keypairs in a single region | @@ -78,7 +81,7 @@ This collection includes Python and Bash scripts for managing various AWS servic | S3 | [s3_create_tar.py](s3/s3_create_tar.py) | Creates tar files | | S3 | [s3_delete_empty_buckets.py](s3/s3_delete_empty_buckets.py) | Deletes empty S3 buckets | | S3 | [s3_list_old_files.py](s3/s3_list_old_files.py) | Lists old files in S3 | -| S3 | [s3_delete_bucket_and_contents.py](s3/s3_delete_bucket_and_contents.py) | Deletes S3 bucket and its contents | +| S3 | [s3_search_bucket_and_delete.py](s3/s3_search_bucket_and_delete.py) | Deletes S3 bucket and its contents | | S3 | [s3_search_file.py](s3/s3_search_file.py) | Searches for files in S3 bucket | | S3 | [s3_search_key.py](s3/s3_search_key.py) | Searches for a key in S3 bucket | | S3 | [s3_search_multiple_keys.py](s3/s3_search_multiple_keys.py) | Searches for multiple keys in S3 bucket | diff --git a/ec2/ec2_delete_orphaned_snapshots.py b/ec2/ec2_delete_orphaned_snapshots.py new file mode 100644 index 0000000..f7facf8 --- /dev/null +++ b/ec2/ec2_delete_orphaned_snapshots.py @@ -0,0 +1,164 @@ +""" +Description: This script identifies and optionally deletes EBS snapshots in an AWS account where the +associated volume no longer exists and the snapshot is not part of any AMI. +The script can perform a dry run to show which snapshots would be deleted without actually deleting them. +It also supports a retention period to keep snapshots for a specified number of days. + +Key features: +- Automatically uses the region specified in the AWS CLI profile +- Supports dry run mode for safe execution +- Provides detailed logging of all operations, including list of orphaned snapshot IDs +- Uses boto3 to interact with AWS EC2 service +- Implements error handling for robustness +- Allows setting a retention period for snapshots +- Ensures snapshots associated with AMIs are not deleted + +Usage: +python ec2_delete_orphaned_snapshots.py [--dry-run] [--retention-days DAYS] [--profile PROFILE_NAME] + +Author: [Your Name] +License: MIT +""" + +import argparse +import logging +from datetime import datetime, timedelta + +import boto3 +from botocore.exceptions import ClientError + + +def setup_logging(): + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + return logging.getLogger(__name__) + + +def get_ec2_client(): + try: + return boto3.client("ec2") + except ClientError as e: + logger.error(f"Failed to create EC2 client: {e}") + raise + + +def get_owned_snapshots(ec2_client): + try: + owned_snapshots = [] + paginator = ec2_client.get_paginator("describe_snapshots") + for page in paginator.paginate(OwnerIds=["self"]): + owned_snapshots.extend(page["Snapshots"]) + logger.info(f"Owned snapshots: {len(owned_snapshots)}") + return owned_snapshots + except ClientError as e: + logger.error(f"Failed to retrieve owned snapshots: {e}") + return [] + + +def is_volume_exists(ec2_client, volume_id): + try: + ec2_client.describe_volumes(VolumeIds=[volume_id]) + return True + except ClientError as e: + if e.response["Error"]["Code"] == "InvalidVolume.NotFound": + return False + logger.error(f"Error checking volume {volume_id}: {e}") + return True # Assume volume exists in case of other errors + + +def get_snapshots_used_by_amis(ec2_client): + try: + used_snapshots = set() + paginator = ec2_client.get_paginator("describe_images") + for page in paginator.paginate(Owners=["self"]): + for image in page["Images"]: + for block_device in image.get("BlockDeviceMappings", []): + if "Ebs" in block_device and "SnapshotId" in block_device["Ebs"]: + used_snapshots.add(block_device["Ebs"]["SnapshotId"]) + logger.info(f"Snapshots used by AMIs: {len(used_snapshots)}") + logger.info(f"Snapshot IDs used by AMIs: {list(used_snapshots)}") + return used_snapshots + except ClientError as e: + logger.error(f"Failed to retrieve snapshots used by AMIs: {e}") + return set() + + +def delete_snapshot(ec2_client, snapshot_id, dry_run=False): + try: + if not dry_run: + ec2_client.delete_snapshot(SnapshotId=snapshot_id) + logger.info(f"Deleted snapshot: {snapshot_id}") + else: + logger.info(f"Would delete snapshot: {snapshot_id}") + return True + except ClientError as e: + logger.error(f"Failed to delete snapshot {snapshot_id}: {e}") + return False + + +def delete_orphaned_snapshots(ec2_client, orphaned_snapshots, dry_run=False): + deleted_count = 0 + for snapshot in orphaned_snapshots: + if delete_snapshot(ec2_client, snapshot["SnapshotId"], dry_run): + deleted_count += 1 + return deleted_count + + +def main(dry_run=False, retention_days=None): + ec2_client = get_ec2_client() + + owned_snapshots = get_owned_snapshots(ec2_client) + snapshots_used_by_amis = get_snapshots_used_by_amis(ec2_client) + + # Find orphaned snapshots + orphaned_snapshots = [ + snapshot + for snapshot in owned_snapshots + if "VolumeId" in snapshot + and not is_volume_exists(ec2_client, snapshot["VolumeId"]) + and snapshot["SnapshotId"] not in snapshots_used_by_amis + ] + logger.info(f"Orphaned snapshots: {len(orphaned_snapshots)}") + logger.info(f"Orphaned snapshot IDs: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}") + + if retention_days is not None: + # Filter snapshots based on retention period + cutoff_date = datetime.now(orphaned_snapshots[0]["StartTime"].tzinfo) - timedelta(days=retention_days) + orphaned_snapshots = [snapshot for snapshot in orphaned_snapshots if snapshot["StartTime"] < cutoff_date] + logger.info(f"Orphaned snapshots older than {retention_days} days: {len(orphaned_snapshots)}") + logger.info( + f"Orphaned snapshot IDs to be deleted: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}" + ) + + if not orphaned_snapshots: + logger.info("No orphaned snapshots found to delete.") + return + + if dry_run: + logger.info(f"Dry run: Would delete {len(orphaned_snapshots)} orphaned snapshot(s).") + logger.info( + f"Snapshot IDs that would be deleted: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}" + ) + else: + deleted_count = delete_orphaned_snapshots(ec2_client, orphaned_snapshots, dry_run) + logger.info(f"Deleted {deleted_count} orphaned snapshot(s).") + + # Summary + logger.info("Summary:") + logger.info(f" Total owned snapshots: {len(owned_snapshots)}") + logger.info(f" Snapshots used by AMIs: {len(snapshots_used_by_amis)}") + logger.info(f" Orphaned snapshots: {len(orphaned_snapshots)}") + + +if __name__ == "__main__": + logger = setup_logging() + + parser = argparse.ArgumentParser(description="Delete orphaned EC2 snapshots") + parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting snapshots") + parser.add_argument("--retention-days", type=int, help="Number of days to retain snapshots before deletion") + parser.add_argument("--profile", help="AWS CLI profile name") + args = parser.parse_args() + + if args.profile: + boto3.setup_default_session(profile_name=args.profile) + + main(dry_run=args.dry_run, retention_days=args.retention_days) diff --git a/ec2/ec2_delete_ssh_access_security_groups.py b/ec2/ec2_delete_ssh_access_security_groups.py new file mode 100644 index 0000000..c931b73 --- /dev/null +++ b/ec2/ec2_delete_ssh_access_security_groups.py @@ -0,0 +1,138 @@ +""" +Description: This script identifies and optionally removes SSH (port 22) inbound rules from all security groups in an AWS account. +It fetches all security groups, checks for SSH inbound rules, and removes them. The script supports a dry-run mode to show which +rules would be removed without actually modifying the security groups. + +Key features: +- Automatically uses the region specified in the AWS CLI profile +- Supports dry run mode for safe execution +- Provides detailed logging of all operations, including group rule IDs +- Uses boto3 to interact with AWS EC2 service +- Implements error handling for robustness + +Usage: +python ec2_remove_ssh_from_security_groups.py [--dry-run] [--profile PROFILE_NAME] + +Author: [Your Name] +License: MIT +""" + +import argparse +import logging + +import boto3 +from botocore.exceptions import ClientError + + +def setup_logging(): + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + return logging.getLogger(__name__) + + +def get_ec2_client(): + try: + return boto3.client("ec2") + except ClientError as e: + logger.error(f"Failed to create EC2 client: {e}") + raise + + +def get_all_security_groups(ec2_client): + try: + security_groups = [] + paginator = ec2_client.get_paginator("describe_security_groups") + for page in paginator.paginate(): + security_groups.extend(page["SecurityGroups"]) + logger.info(f"Total Security Groups: {len(security_groups)}") + return security_groups + except ClientError as e: + logger.error(f"Failed to retrieve security groups: {e}") + return [] + + +def has_ssh_rule(security_group): + for rule in security_group.get("IpPermissions", []): + if rule.get("FromPort") == 22 and rule.get("ToPort") == 22 and rule.get("IpProtocol") == "tcp": + return True + return False + + +def remove_ssh_rule(ec2_client, security_group, dry_run=False): + group_id = security_group["GroupId"] + group_name = security_group["GroupName"] + ssh_rules = [ + rule + for rule in security_group.get("IpPermissions", []) + if rule.get("FromPort") == 22 and rule.get("ToPort") == 22 and rule.get("IpProtocol") == "tcp" + ] + + if not ssh_rules: + logger.info(f"No SSH rules found in security group: {group_id} ({group_name})") + return False + + logger.info(f"{'Would remove' if dry_run else 'Removing'} SSH rules from security group: {group_id} ({group_name})") + + # Fetch the security group rules with their IDs + try: + response = ec2_client.describe_security_group_rules(Filters=[{"Name": "group-id", "Values": [group_id]}]) + sg_rules = {rule["SecurityGroupRuleId"]: rule for rule in response["SecurityGroupRules"]} + except ClientError as e: + logger.error(f"Failed to fetch security group rules for {group_id}: {e}") + return False + + for rule in ssh_rules: + # Find matching rule(s) in sg_rules + matching_rules = [ + sg_rule + for sg_rule in sg_rules.values() + if sg_rule["IpProtocol"] == rule["IpProtocol"] + and sg_rule["FromPort"] == rule["FromPort"] + and sg_rule["ToPort"] == rule["ToPort"] + and sg_rule["IsEgress"] == False # Inbound rules + ] + + for matching_rule in matching_rules: + rule_id = matching_rule["SecurityGroupRuleId"] + cidr_range = matching_rule.get("CidrIpv4", "N/A") + logger.info(f" Rule ID: {rule_id}") + logger.info(f" Port Range: {matching_rule['FromPort']}-{matching_rule['ToPort']}") + logger.info(f" Protocol: {matching_rule['IpProtocol']}") + logger.info(f" CIDR Range: {cidr_range}") + + if not dry_run: + try: + ec2_client.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ssh_rules) + logger.info(f"Successfully removed SSH rules from security group: {group_id} ({group_name})") + return True + except ClientError as e: + logger.error(f"Failed to remove SSH rules from security group {group_id} ({group_name}): {e}") + return False + return True + + +def main(dry_run=False): + ec2_client = get_ec2_client() + security_groups = get_all_security_groups(ec2_client) + + affected_groups = 0 + for sg in security_groups: + if has_ssh_rule(sg): + if remove_ssh_rule(ec2_client, sg, dry_run): + affected_groups += 1 + + # Summary + logger.info("Summary:") + logger.info(f" Total Security Groups: {len(security_groups)}") + logger.info(f" Security Groups with SSH rules {'that would be' if dry_run else ''} modified: {affected_groups}") + + +if __name__ == "__main__": + logger = setup_logging() + + parser = argparse.ArgumentParser(description="Remove SSH (port 22) inbound rules from EC2 Security Groups") + parser.add_argument( + "--dry-run", action="store_true", help="Perform a dry run without actually modifying security groups" + ) + args = parser.parse_args() + + main(dry_run=args.dry_run) diff --git a/ec2/ec2_delete_unused_amis.py b/ec2/ec2_delete_unused_amis.py new file mode 100644 index 0000000..9432848 --- /dev/null +++ b/ec2/ec2_delete_unused_amis.py @@ -0,0 +1,155 @@ +""" +Description: This script identifies and optionally deletes unused AMIs (Amazon Machine Images) in an AWS account. +It fetches all AMIs owned by the account, determines which ones are currently used by EC2 instances, +and identifies the unused AMIs. The script can perform a dry run to show which AMIs would be deleted +without actually deleting them. It also supports a retention period to keep AMIs for a specified number of days. + +Key features: +- Automatically uses the region specified in the AWS CLI profile +- Supports dry run mode for safe execution +- Provides detailed logging of all operations +- Uses boto3 to interact with AWS EC2 service +- Implements error handling for robustness +- Allows setting a retention period for AMIs +- Deletes associated snapshots when deleting AMIs + +Usage: +python ec2_delete_unused_amis.py [--dry-run] [--retention-days DAYS] [--profile PROFILE_NAME] + +Author: [Your Name] +License: MIT +""" + +import argparse +import logging +from datetime import datetime, timedelta + +import boto3 +from botocore.exceptions import ClientError + + +def setup_logging(): + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + return logging.getLogger(__name__) + + +def get_ec2_client(): + try: + return boto3.client("ec2") + except ClientError as e: + logger.error(f"Failed to create EC2 client: {e}") + raise + + +def get_owned_amis(ec2_client): + try: + owned_amis = [] + paginator = ec2_client.get_paginator("describe_images") + for page in paginator.paginate(Owners=["self"]): + owned_amis.extend(page["Images"]) + logger.info(f"Owned AMIs: {len(owned_amis)} : {[ami['ImageId'] for ami in owned_amis]}") + return owned_amis + except ClientError as e: + logger.error(f"Failed to retrieve owned AMIs: {e}") + return [] + + +def get_used_amis(ec2_client): + try: + used_amis = set() + paginator = ec2_client.get_paginator("describe_instances") + for page in paginator.paginate(): + for reservation in page["Reservations"]: + for instance in reservation["Instances"]: + if "ImageId" in instance: + used_amis.add(instance["ImageId"]) + logger.info(f"Used AMIs: {len(used_amis)} : {used_amis}") + return used_amis + except ClientError as e: + logger.error(f"Failed to retrieve used AMIs: {e}") + return set() + + +def delete_ami_and_snapshot(ec2_client, ami_id, dry_run=False): + try: + # Get snapshot IDs associated with the AMI + image = ec2_client.describe_images(ImageIds=[ami_id])["Images"][0] + snapshot_ids = [ + block_device["Ebs"]["SnapshotId"] + for block_device in image.get("BlockDeviceMappings", []) + if "Ebs" in block_device + ] + + if not dry_run: + # Deregister the AMI + ec2_client.deregister_image(ImageId=ami_id) + logger.info(f"Deregistered AMI: {ami_id}") + + # Delete associated snapshots + for snapshot_id in snapshot_ids: + ec2_client.delete_snapshot(SnapshotId=snapshot_id) + logger.info(f"Deleted snapshot: {snapshot_id}") + else: + logger.info(f"Would deregister AMI: {ami_id}") + for snapshot_id in snapshot_ids: + logger.info(f"Would delete snapshot: {snapshot_id}") + return True + except ClientError as e: + logger.error(f"Failed to delete AMI {ami_id} or its snapshots: {e}") + return False + + +def delete_unused_amis(ec2_client, unused_amis, dry_run=False): + deleted_count = 0 + for ami in unused_amis: + if delete_ami_and_snapshot(ec2_client, ami["ImageId"], dry_run): + deleted_count += 1 + return deleted_count + + +def main(dry_run=False, retention_days=None): + ec2_client = get_ec2_client() + + owned_amis = get_owned_amis(ec2_client) + used_amis = get_used_amis(ec2_client) + + # Find unused AMIs + unused_amis = [ami for ami in owned_amis if ami["ImageId"] not in used_amis] + logger.info(f"Unused AMIs: {len(unused_amis)} : {[ami['ImageId'] for ami in unused_amis]}") + + if retention_days is not None: + # Filter AMIs based on retention period + cutoff_date = datetime.now() - timedelta(days=retention_days) + unused_amis = [ + ami for ami in unused_amis if datetime.strptime(ami["CreationDate"], "%Y-%m-%dT%H:%M:%S.%fZ") < cutoff_date + ] + logger.info( + f"Unused AMIs older than {retention_days} days: {len(unused_amis)} : {[ami['ImageId'] for ami in unused_amis]}" + ) + + if not unused_amis: + logger.info("No unused AMIs found to delete.") + return + + if dry_run: + logger.info(f"Dry run: Would delete {len(unused_amis)} unused AMI(s) and their associated snapshots.") + else: + deleted_count = delete_unused_amis(ec2_client, unused_amis, dry_run) + logger.info(f"Deleted {deleted_count} unused AMI(s) and their associated snapshots.") + + # Summary + logger.info("Summary:") + logger.info(f" Total owned AMIs: {len(owned_amis)}") + logger.info(f" Used AMIs: {len(used_amis)}") + logger.info(f" Unused AMIs: {len(unused_amis)}") + + +if __name__ == "__main__": + logger = setup_logging() + + parser = argparse.ArgumentParser(description="Delete unused EC2 AMIs") + parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting AMIs") + parser.add_argument("--retention-days", type=int, help="Number of days to retain AMIs before deletion") + args = parser.parse_args() + + main(dry_run=args.dry_run, retention_days=args.retention_days) diff --git a/s3/s3_delete_bucket_and_contents.py b/s3/s3_delete_bucket_and_contents.py deleted file mode 100644 index 2c9965b..0000000 --- a/s3/s3_delete_bucket_and_contents.py +++ /dev/null @@ -1,44 +0,0 @@ -# https://github.com/dannysteenman/aws-toolbox -# -# License: MIT -# -# This script searches for your chosen bucketname and then deletes all (versioned)objects in that S3 bucket before deleting the bucket itself. -# -# Usage: python s3_search_bucket_and_delete.py - -import sys -import boto3 - -# Get the target bucket name from the command line argument -if len(sys.argv) < 2: - print("Please provide the target bucket name as a command line argument") - sys.exit(1) - -target_bucket_name = sys.argv[1] - -# Create an S3 client -s3_client = boto3.client("s3") -# create an S3 resource -s3 = boto3.resource("s3") - -# Get a list of all S3 buckets -response = s3_client.list_buckets() - -# Iterate over the buckets -for bucket in response["Buckets"]: - bucket_name = bucket["Name"] - # Check if the bucket name contains the target string - if target_bucket_name in bucket_name: - print(f"Found bucket: {bucket_name}") - - versioning = s3_client.get_bucket_versioning(Bucket=bucket_name) - bucket = s3.Bucket(bucket_name) - - if versioning.get("Status") == "Enabled": - bucket.object_versions.delete() - else: - bucket.objects.delete() - - # Finally, delete the bucket - s3_client.delete_bucket(Bucket=bucket_name) - print(f"Deleted bucket: {bucket_name}") diff --git a/s3/s3_search_bucket_and_delete.py b/s3/s3_search_bucket_and_delete.py new file mode 100644 index 0000000..8796ebe --- /dev/null +++ b/s3/s3_search_bucket_and_delete.py @@ -0,0 +1,167 @@ +""" +Description: This script searches for a specified bucket name and optionally deletes all (versioned) objects +in that S3 bucket before deleting the bucket itself. It supports a dry-run mode and provides information +about the storage used in the bucket. + +Key features: +- Supports dry run mode for safe execution +- Provides detailed logging of all operations +- Shows total storage used in the bucket +- Handles both versioned and non-versioned buckets +- Implements error handling for robustness + +Usage: +python s3_search_bucket_and_delete.py [--dry-run] + +Author: Danny Steenman +License: MIT +""" + +import argparse +import logging +import sys + +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError + + +def setup_logging(): + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + return logging.getLogger(__name__) + + +def get_s3_client(): + try: + return boto3.client("s3") + except ClientError as e: + logger.error(f"Failed to create S3 client: {e}") + sys.exit(1) + + +def get_bucket_size(s3_client, bucket_name): + try: + response = s3_client.list_objects_v2(Bucket=bucket_name) + total_size = sum(obj["Size"] for obj in response.get("Contents", [])) + return total_size + except ClientError as e: + logger.error(f"Failed to get bucket size for {bucket_name}: {e}") + return 0 + + +def delete_bucket_contents(s3_client, bucket_name, dry_run=False): + try: + versioning = s3_client.get_bucket_versioning(Bucket=bucket_name) + is_versioned = versioning.get("Status") == "Enabled" + + # Configure the client for more concurrency + config = Config( + max_pool_connections=50, # Increase concurrent connections + retries={"max_attempts": 10, "mode": "adaptive"}, # Add retry logic + ) + s3_resource = boto3.resource("s3", config=config) + bucket = s3_resource.Bucket(bucket_name) + + def delete_objects(object_versions): + if not dry_run: + try: + bucket.delete_objects(Delete={"Objects": object_versions}) + except ClientError as e: + logger.error(f"Error deleting objects: {e}") + + object_versions = [] + count = 0 + + if is_versioned: + iterator = bucket.object_versions.iterator() + else: + iterator = bucket.objects.iterator() + + for obj in iterator: + if is_versioned: + object_versions.append({"Key": obj.object_key, "VersionId": obj.id}) + else: + object_versions.append({"Key": obj.key}) + count += 1 + + # Process in batches of 1000 (S3 delete_objects limit) + if len(object_versions) >= 1000: + if dry_run: + logger.info(f"Would delete {len(object_versions)} {'versions' if is_versioned else 'objects'}") + else: + delete_objects(object_versions) + object_versions = [] + + # Log progress every 10000 objects + if count % 10000 == 0: + logger.info(f"Processed {count} {'versions' if is_versioned else 'objects'}") + + # Delete any remaining objects + if object_versions: + if dry_run: + logger.info(f"Would delete {len(object_versions)} {'versions' if is_versioned else 'objects'}") + else: + delete_objects(object_versions) + + logger.info( + f"{'Would delete' if dry_run else 'Deleted'} a total of {count} {'versions' if is_versioned else 'objects'} from {bucket_name}" + ) + + except ClientError as e: + logger.error(f"Failed to delete contents of bucket {bucket_name}: {e}") + + +def delete_bucket(s3_client, bucket_name, dry_run=False): + try: + if dry_run: + logger.info(f"Would delete bucket: {bucket_name}") + else: + s3_client.delete_bucket(Bucket=bucket_name) + logger.info(f"Deleted bucket: {bucket_name}") + except ClientError as e: + logger.error(f"Failed to delete bucket {bucket_name}: {e}") + + +def main(target_bucket_name, dry_run=False): + s3_client = get_s3_client() + + try: + response = s3_client.list_buckets() + except ClientError as e: + logger.error(f"Failed to list buckets: {e}") + sys.exit(1) + + found_buckets = [] + for bucket in response["Buckets"]: + bucket_name = bucket["Name"] + if target_bucket_name in bucket_name: + found_buckets.append(bucket_name) + + if not found_buckets: + logger.info(f"No buckets found containing the name: {target_bucket_name}") + return + + for bucket_name in found_buckets: + logger.info(f"Found bucket: {bucket_name}") + size_bytes = get_bucket_size(s3_client, bucket_name) + size_gb = size_bytes / (1024**3) # Convert bytes to gigabytes + logger.info(f"Bucket size: {size_gb:.2f} GB") + + if dry_run: + logger.info(f"Dry run: Would delete all contents and the bucket itself: {bucket_name}") + else: + delete_bucket_contents(s3_client, bucket_name, dry_run) + delete_bucket(s3_client, bucket_name, dry_run) + + logger.info("Operation completed.") + + +if __name__ == "__main__": + logger = setup_logging() + + parser = argparse.ArgumentParser(description="Delete S3 bucket and its contents") + parser.add_argument("bucket_name", help="Name of the bucket to search for and delete") + parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting anything") + args = parser.parse_args() + + main(args.bucket_name, args.dry_run)