-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathlambda_function.py
198 lines (173 loc) · 7.72 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
"""
Module for Kinesis data stream consumer capabilities. This Lambda
is triggered to process Kinesis records originating from CloudFront
real-time logs and publish the data as CloudWatch metrics.
"""
import os
import re
import base64
import logging
import boto3
def proccess_records(records):
"""
Decodes base64 from Kinesis records in Lambda event
and returns list
"""
# Instantiate records_list
records_list = []
# Retrieve values from Kinesis record(s)
for record in records:
payload = base64.b64decode(record["kinesis"]["data"])
values = payload.decode("utf-8").strip().split('\t')
timestamp = values[0]
c_ip = values[1]
sc_status = values[2]
cs_host = values[3]
x_edge_location = values[4]
# Append dictionary of records to record list
record_dict = {
'timestamp': timestamp,
'c_ip': c_ip,
'sc_status': sc_status,
'cs_host': cs_host,
'x_edge_location': x_edge_location
}
records_list.append(record_dict)
return records_list
def aggregate_data_by_minute(records_list):
"""
Aggregate data from Kinesis records into dictionary for upload at 60-second resolution
"""
# Instantiate put_metric_data dictionary
put_metric_data_dict = {}
# Loop through records and and aggregate based on minute and by edge location
for record in records_list:
# round timestamp down to nearest whole minute, retrieve relevent dimensions
timestamp = int(float(record['timestamp'])//60 * 60)
cf_distro = record['cs_host']
x_edge_location = record['x_edge_location']
status_code = record['sc_status']
if cf_distro not in put_metric_data_dict:
put_metric_data_dict[cf_distro] = {}
if timestamp not in put_metric_data_dict[cf_distro]:
put_metric_data_dict[cf_distro][timestamp] = {}
if x_edge_location not in put_metric_data_dict[cf_distro][timestamp]:
put_metric_data_dict[cf_distro][timestamp][x_edge_location] = {}
if status_code not in put_metric_data_dict[cf_distro][timestamp][x_edge_location]:
put_metric_data_dict[cf_distro][timestamp][x_edge_location][status_code] = 0
# Augment status count
put_metric_data_dict[cf_distro][timestamp][x_edge_location][status_code] = \
put_metric_data_dict[cf_distro][timestamp][x_edge_location][status_code] + 1
return put_metric_data_dict
def upload_to_cloudwatch(aggregated_dict):
"""
Upload metrics to Cloudwatch
"""
# Instantiate CloudWatch client
cloudwatch_client = boto3.client('cloudwatch')
# Loop through dictionary of values for CloudFront distributions, timestamps,
# Edge locations, and status codes and upload to CloudWatch
for cf_distro, timestamp in aggregated_dict.items():
for timestamp, x_edge_location in timestamp.items():
for x_edge_location, status_codes in x_edge_location.items():
# put_metric_data total amount of requests per edge location
status_sum = sum(status_codes.values())
cloudwatch_client.put_metric_data(
Namespace='CloudFront by Edge Location - Count',
MetricData=[
{
'MetricName': 'RequestCount',
'Timestamp': timestamp,
'Dimensions': [
{
'Name': 'cs-host',
'Value': cf_distro
},
{
'Name': 'x-edge-location',
'Value': x_edge_location
}
],
'Value': status_sum,
'Unit': 'Count',
'StorageResolution': 60
},
]
)
# sum requests per HTTP status code category (2xx, 4xx, 5xx)
status_groups = {}
for group in ['2', '4', '5']:
vals = [int(v) for (k, v) in status_codes.items() if re.compile(f'^{group}').match(k)]
status_groups[f'{group}xx'] = sum(vals)
# put_metric_data for absolute count of HTTP status codes by CloudFront edge
for sc_status_group, count in status_groups.items():
cloudwatch_client.put_metric_data(
Namespace= 'CloudFront by Edge Location - Count',
MetricData=[
{
'MetricName': f'{sc_status_group}',
'Timestamp': timestamp,
'Dimensions': [
{
'Name': 'cs-host',
'Value': cf_distro
},
{
'Name': 'x-edge-location',
'Value': x_edge_location
}
],
'Value': count,
'Unit': 'Count',
'StorageResolution': 60
},
]
)
# put_metric_data for percentage of HTTP status codes by CloudFront edge
cloudwatch_client.put_metric_data(
Namespace= 'CloudFront by Edge Location - Percent',
MetricData=[
{
'MetricName': f'{sc_status_group}',
'Timestamp': timestamp,
'Dimensions': [
{
'Name': 'cs-host',
'Value': cf_distro
},
{
'Name': 'x-edge-location',
'Value': x_edge_location
}
],
'Value': round(count * 100.0 / status_sum, 2),
'Unit': 'Percent',
'StorageResolution': 60
},
]
)
def lambda_handler(event, context):
"""
Entrypoint for Lambda invocation. Decodes base64 from Kinesis
records in Lambda event, aggregates data, and uploads data to CloudWatch.
"""
# Set logging levels
log_level = os.environ['LOGGING_LEVEL']
logger = logging.getLogger()
logger.setLevel(log_level)
# Process Kinesis records values and return list of record dictionaries
try:
records_list = proccess_records(event['Records'])
logger.info(records_list)
except Exception as error:
logger.error(error)
raise Exception from error
# Aggregate data by minute, CloudFront edge location, and status code
try:
aggregated_dict = aggregate_data_by_minute(records_list)
logger.info(aggregated_dict)
except Exception as error:
logger.error(error)
raise Exception from error
# Upload metrics to CloudWatch
upload_to_cloudwatch(aggregated_dict)