-
-
Notifications
You must be signed in to change notification settings - Fork 335
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(docs): Update README with new EC2 scripts
- Loading branch information
1 parent
c654937
commit e410fc9
Showing
6 changed files
with
628 additions
and
45 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
""" | ||
Description: This script identifies and optionally deletes EBS snapshots in an AWS account where the | ||
associated volume no longer exists and the snapshot is not part of any AMI. | ||
The script can perform a dry run to show which snapshots would be deleted without actually deleting them. | ||
It also supports a retention period to keep snapshots for a specified number of days. | ||
Key features: | ||
- Automatically uses the region specified in the AWS CLI profile | ||
- Supports dry run mode for safe execution | ||
- Provides detailed logging of all operations, including list of orphaned snapshot IDs | ||
- Uses boto3 to interact with AWS EC2 service | ||
- Implements error handling for robustness | ||
- Allows setting a retention period for snapshots | ||
- Ensures snapshots associated with AMIs are not deleted | ||
Usage: | ||
python ec2_delete_orphaned_snapshots.py [--dry-run] [--retention-days DAYS] [--profile PROFILE_NAME] | ||
Author: [Your Name] | ||
License: MIT | ||
""" | ||
|
||
import argparse | ||
import logging | ||
from datetime import datetime, timedelta | ||
|
||
import boto3 | ||
from botocore.exceptions import ClientError | ||
|
||
|
||
def setup_logging(): | ||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | ||
return logging.getLogger(__name__) | ||
|
||
|
||
def get_ec2_client(): | ||
try: | ||
return boto3.client("ec2") | ||
except ClientError as e: | ||
logger.error(f"Failed to create EC2 client: {e}") | ||
raise | ||
|
||
|
||
def get_owned_snapshots(ec2_client): | ||
try: | ||
owned_snapshots = [] | ||
paginator = ec2_client.get_paginator("describe_snapshots") | ||
for page in paginator.paginate(OwnerIds=["self"]): | ||
owned_snapshots.extend(page["Snapshots"]) | ||
logger.info(f"Owned snapshots: {len(owned_snapshots)}") | ||
return owned_snapshots | ||
except ClientError as e: | ||
logger.error(f"Failed to retrieve owned snapshots: {e}") | ||
return [] | ||
|
||
|
||
def is_volume_exists(ec2_client, volume_id): | ||
try: | ||
ec2_client.describe_volumes(VolumeIds=[volume_id]) | ||
return True | ||
except ClientError as e: | ||
if e.response["Error"]["Code"] == "InvalidVolume.NotFound": | ||
return False | ||
logger.error(f"Error checking volume {volume_id}: {e}") | ||
return True # Assume volume exists in case of other errors | ||
|
||
|
||
def get_snapshots_used_by_amis(ec2_client): | ||
try: | ||
used_snapshots = set() | ||
paginator = ec2_client.get_paginator("describe_images") | ||
for page in paginator.paginate(Owners=["self"]): | ||
for image in page["Images"]: | ||
for block_device in image.get("BlockDeviceMappings", []): | ||
if "Ebs" in block_device and "SnapshotId" in block_device["Ebs"]: | ||
used_snapshots.add(block_device["Ebs"]["SnapshotId"]) | ||
logger.info(f"Snapshots used by AMIs: {len(used_snapshots)}") | ||
logger.info(f"Snapshot IDs used by AMIs: {list(used_snapshots)}") | ||
return used_snapshots | ||
except ClientError as e: | ||
logger.error(f"Failed to retrieve snapshots used by AMIs: {e}") | ||
return set() | ||
|
||
|
||
def delete_snapshot(ec2_client, snapshot_id, dry_run=False): | ||
try: | ||
if not dry_run: | ||
ec2_client.delete_snapshot(SnapshotId=snapshot_id) | ||
logger.info(f"Deleted snapshot: {snapshot_id}") | ||
else: | ||
logger.info(f"Would delete snapshot: {snapshot_id}") | ||
return True | ||
except ClientError as e: | ||
logger.error(f"Failed to delete snapshot {snapshot_id}: {e}") | ||
return False | ||
|
||
|
||
def delete_orphaned_snapshots(ec2_client, orphaned_snapshots, dry_run=False): | ||
deleted_count = 0 | ||
for snapshot in orphaned_snapshots: | ||
if delete_snapshot(ec2_client, snapshot["SnapshotId"], dry_run): | ||
deleted_count += 1 | ||
return deleted_count | ||
|
||
|
||
def main(dry_run=False, retention_days=None): | ||
ec2_client = get_ec2_client() | ||
|
||
owned_snapshots = get_owned_snapshots(ec2_client) | ||
snapshots_used_by_amis = get_snapshots_used_by_amis(ec2_client) | ||
|
||
# Find orphaned snapshots | ||
orphaned_snapshots = [ | ||
snapshot | ||
for snapshot in owned_snapshots | ||
if "VolumeId" in snapshot | ||
and not is_volume_exists(ec2_client, snapshot["VolumeId"]) | ||
and snapshot["SnapshotId"] not in snapshots_used_by_amis | ||
] | ||
logger.info(f"Orphaned snapshots: {len(orphaned_snapshots)}") | ||
logger.info(f"Orphaned snapshot IDs: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}") | ||
|
||
if retention_days is not None: | ||
# Filter snapshots based on retention period | ||
cutoff_date = datetime.now(orphaned_snapshots[0]["StartTime"].tzinfo) - timedelta(days=retention_days) | ||
orphaned_snapshots = [snapshot for snapshot in orphaned_snapshots if snapshot["StartTime"] < cutoff_date] | ||
logger.info(f"Orphaned snapshots older than {retention_days} days: {len(orphaned_snapshots)}") | ||
logger.info( | ||
f"Orphaned snapshot IDs to be deleted: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}" | ||
) | ||
|
||
if not orphaned_snapshots: | ||
logger.info("No orphaned snapshots found to delete.") | ||
return | ||
|
||
if dry_run: | ||
logger.info(f"Dry run: Would delete {len(orphaned_snapshots)} orphaned snapshot(s).") | ||
logger.info( | ||
f"Snapshot IDs that would be deleted: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}" | ||
) | ||
else: | ||
deleted_count = delete_orphaned_snapshots(ec2_client, orphaned_snapshots, dry_run) | ||
logger.info(f"Deleted {deleted_count} orphaned snapshot(s).") | ||
|
||
# Summary | ||
logger.info("Summary:") | ||
logger.info(f" Total owned snapshots: {len(owned_snapshots)}") | ||
logger.info(f" Snapshots used by AMIs: {len(snapshots_used_by_amis)}") | ||
logger.info(f" Orphaned snapshots: {len(orphaned_snapshots)}") | ||
|
||
|
||
if __name__ == "__main__": | ||
logger = setup_logging() | ||
|
||
parser = argparse.ArgumentParser(description="Delete orphaned EC2 snapshots") | ||
parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting snapshots") | ||
parser.add_argument("--retention-days", type=int, help="Number of days to retain snapshots before deletion") | ||
parser.add_argument("--profile", help="AWS CLI profile name") | ||
args = parser.parse_args() | ||
|
||
if args.profile: | ||
boto3.setup_default_session(profile_name=args.profile) | ||
|
||
main(dry_run=args.dry_run, retention_days=args.retention_days) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
""" | ||
Description: This script identifies and optionally removes SSH (port 22) inbound rules from all security groups in an AWS account. | ||
It fetches all security groups, checks for SSH inbound rules, and removes them. The script supports a dry-run mode to show which | ||
rules would be removed without actually modifying the security groups. | ||
Key features: | ||
- Automatically uses the region specified in the AWS CLI profile | ||
- Supports dry run mode for safe execution | ||
- Provides detailed logging of all operations, including group rule IDs | ||
- Uses boto3 to interact with AWS EC2 service | ||
- Implements error handling for robustness | ||
Usage: | ||
python ec2_remove_ssh_from_security_groups.py [--dry-run] [--profile PROFILE_NAME] | ||
Author: [Your Name] | ||
License: MIT | ||
""" | ||
|
||
import argparse | ||
import logging | ||
|
||
import boto3 | ||
from botocore.exceptions import ClientError | ||
|
||
|
||
def setup_logging(): | ||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | ||
return logging.getLogger(__name__) | ||
|
||
|
||
def get_ec2_client(): | ||
try: | ||
return boto3.client("ec2") | ||
except ClientError as e: | ||
logger.error(f"Failed to create EC2 client: {e}") | ||
raise | ||
|
||
|
||
def get_all_security_groups(ec2_client): | ||
try: | ||
security_groups = [] | ||
paginator = ec2_client.get_paginator("describe_security_groups") | ||
for page in paginator.paginate(): | ||
security_groups.extend(page["SecurityGroups"]) | ||
logger.info(f"Total Security Groups: {len(security_groups)}") | ||
return security_groups | ||
except ClientError as e: | ||
logger.error(f"Failed to retrieve security groups: {e}") | ||
return [] | ||
|
||
|
||
def has_ssh_rule(security_group): | ||
for rule in security_group.get("IpPermissions", []): | ||
if rule.get("FromPort") == 22 and rule.get("ToPort") == 22 and rule.get("IpProtocol") == "tcp": | ||
return True | ||
return False | ||
|
||
|
||
def remove_ssh_rule(ec2_client, security_group, dry_run=False): | ||
group_id = security_group["GroupId"] | ||
group_name = security_group["GroupName"] | ||
ssh_rules = [ | ||
rule | ||
for rule in security_group.get("IpPermissions", []) | ||
if rule.get("FromPort") == 22 and rule.get("ToPort") == 22 and rule.get("IpProtocol") == "tcp" | ||
] | ||
|
||
if not ssh_rules: | ||
logger.info(f"No SSH rules found in security group: {group_id} ({group_name})") | ||
return False | ||
|
||
logger.info(f"{'Would remove' if dry_run else 'Removing'} SSH rules from security group: {group_id} ({group_name})") | ||
|
||
# Fetch the security group rules with their IDs | ||
try: | ||
response = ec2_client.describe_security_group_rules(Filters=[{"Name": "group-id", "Values": [group_id]}]) | ||
sg_rules = {rule["SecurityGroupRuleId"]: rule for rule in response["SecurityGroupRules"]} | ||
except ClientError as e: | ||
logger.error(f"Failed to fetch security group rules for {group_id}: {e}") | ||
return False | ||
|
||
for rule in ssh_rules: | ||
# Find matching rule(s) in sg_rules | ||
matching_rules = [ | ||
sg_rule | ||
for sg_rule in sg_rules.values() | ||
if sg_rule["IpProtocol"] == rule["IpProtocol"] | ||
and sg_rule["FromPort"] == rule["FromPort"] | ||
and sg_rule["ToPort"] == rule["ToPort"] | ||
and sg_rule["IsEgress"] == False # Inbound rules | ||
] | ||
|
||
for matching_rule in matching_rules: | ||
rule_id = matching_rule["SecurityGroupRuleId"] | ||
cidr_range = matching_rule.get("CidrIpv4", "N/A") | ||
logger.info(f" Rule ID: {rule_id}") | ||
logger.info(f" Port Range: {matching_rule['FromPort']}-{matching_rule['ToPort']}") | ||
logger.info(f" Protocol: {matching_rule['IpProtocol']}") | ||
logger.info(f" CIDR Range: {cidr_range}") | ||
|
||
if not dry_run: | ||
try: | ||
ec2_client.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ssh_rules) | ||
logger.info(f"Successfully removed SSH rules from security group: {group_id} ({group_name})") | ||
return True | ||
except ClientError as e: | ||
logger.error(f"Failed to remove SSH rules from security group {group_id} ({group_name}): {e}") | ||
return False | ||
return True | ||
|
||
|
||
def main(dry_run=False): | ||
ec2_client = get_ec2_client() | ||
security_groups = get_all_security_groups(ec2_client) | ||
|
||
affected_groups = 0 | ||
for sg in security_groups: | ||
if has_ssh_rule(sg): | ||
if remove_ssh_rule(ec2_client, sg, dry_run): | ||
affected_groups += 1 | ||
|
||
# Summary | ||
logger.info("Summary:") | ||
logger.info(f" Total Security Groups: {len(security_groups)}") | ||
logger.info(f" Security Groups with SSH rules {'that would be' if dry_run else ''} modified: {affected_groups}") | ||
|
||
|
||
if __name__ == "__main__": | ||
logger = setup_logging() | ||
|
||
parser = argparse.ArgumentParser(description="Remove SSH (port 22) inbound rules from EC2 Security Groups") | ||
parser.add_argument( | ||
"--dry-run", action="store_true", help="Perform a dry run without actually modifying security groups" | ||
) | ||
args = parser.parse_args() | ||
|
||
main(dry_run=args.dry_run) |
Oops, something went wrong.