-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paths3_stats.py
96 lines (87 loc) · 3.82 KB
/
s3_stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import boto3
import datetime
import csv
import sys
import argparse
PERIOD = 60 * 60 * 24 # 1day
def get_region(location):
if not location :
return 'us-east-1'
elif location == 'EU':
return 'eu-west-1'
else:
return location
def get_bucket_region(bucket):
s3_client = boto3.client('s3')
location = s3_client.get_bucket_location(Bucket=bucket)['LocationConstraint']
return get_region(location)
def get_bucket_metrics(buckets,days=14):
s3_client = boto3.client('s3')
METRICS = {'NumberOfObjects' : ['AllStorageTypes'],'BucketSizeBytes': ['StandardStorage','ReducedRedundancyStorage','StandardIAStorage']}
results = {}
now = datetime.datetime.utcnow()
for bucket in buckets:
region = get_bucket_region(bucket)
cw = boto3.client('cloudwatch',region_name = region)
datapoints = {}
for metric, storage_types in METRICS.items():
for storage_type in storage_types:
response = cw.get_metric_statistics(
Namespace = 'AWS/S3',
MetricName = metric,
Statistics = ['Average'],
Period = PERIOD,
EndTime = now,
StartTime = now - datetime.timedelta(days=days),
Dimensions=[ { 'Name': 'BucketName', 'Value': bucket }, { 'Name': 'StorageType','Value': storage_type} ]
)
for stats in response['Datapoints']:
date = stats['Timestamp'].strftime("%Y-%m-%d")
if not date in datapoints:
datapoints[date] = {}
if metric not in datapoints[date]:
datapoints[date][metric] = dict.fromkeys(storage_types,0)
datapoints[date][metric][storage_type] = stats['Average']
results[bucket] = datapoints
return results
def Gb(bytes):
return round(bytes / (1024 * 1024 * 1024),1)
def print_metrics(results,out_file=sys.stdout):
rows = []
headers = ['Bucket','Region','Date']
for bucket in sorted(results.keys()):
region = get_bucket_region(bucket)
for date in sorted(results[bucket].keys()):
row = { 'Date' : date, 'Region': region, 'Bucket' : bucket}
for metric in results[bucket][date].keys():
for storage_type in results[bucket][date][metric].keys():
key = metric + "_" + storage_type
key = key.replace('Bytes','GBytes')
if not key in headers:
headers.append(key)
val = results[bucket][date][metric].get(storage_type,0)
if not metric.startswith('Number'):
val = Gb(val)
row[key] = val
rows.append(row)
writer = csv.DictWriter(out_file,delimiter=',',fieldnames = headers)
writer.writeheader()
for row in rows:
writer.writerow(row)
def list_buckets():
s3_client = boto3.client('s3')
return [ b['Name'] for b in s3_client.list_buckets()['Buckets']]
if __name__ == "__main__":
days = 1
parser = argparse.ArgumentParser(description="Prints metrics about S3 buckets")
parser.add_argument('-d', '--days',action="store",default=14,type=int)
parser.add_argument('-b','--buckets', default=[],action="store",help="Comma-separated list of selected buckets (will list all buckets if emtpy")
args = parser.parse_args()
if args.days not in range(2,15):
print("days must be between 2 and 14")
sys.exit(1)
if args.buckets:
metrics = get_bucket_metrics(args.buckets.split(','),args.days)
else:
metrics = get_bucket_metrics(list_buckets(),args.days)
print_metrics(metrics)