-
-
Notifications
You must be signed in to change notification settings - Fork 89
/
Copy paths3-to-es.py
112 lines (94 loc) · 3.6 KB
/
s3-to-es.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import boto3
import json
import datetime
import gzip
import urllib
import urllib3
import logging
from requests_aws4auth import AWS4Auth
import requests
from io import BytesIO
"""
Can Override the global variables using Lambda Environment Parameters
"""
globalVars = {}
globalVars['Owner'] = "Mystique"
globalVars['Environment'] = "Prod"
globalVars['awsRegion'] = "eu-central-1"
globalVars['tagName'] = "serverless-s3-to-es-log-ingester"
globalVars['service'] = "es"
globalVars['esIndexPrefix'] = "s3-to-es-"
globalVars['esIndexDocType'] = "s3_to_es_docs"
globalVars['esHosts'] = {
'test': '',
'prod': 'https://search-es01-d2jqf4mjsrpehk24xvd6h3w7qm.eu-central-1.es.amazonaws.com'
}
# Initialize Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def indexDocElement(es_Url, awsauth, docData):
try:
#headers = { "Content-Type": "application/json" }
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
resp = requests.post(es_Url, auth=awsauth,
headers=headers, json=docData)
if resp.status_code == 201:
logger.info('INFO: Successfully inserted element into ES')
else:
logger.error('FAILURE: Unable to index element')
except Exception as e:
logger.error('ERROR: {0}'.format(str(e)))
logger.error('ERROR: Unable to index line:"{0}"'.format(
str(docData['content'])))
print(e)
def lambda_handler(event, context):
s3 = boto3.client('s3')
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key,
credentials.secret_key,
globalVars['awsRegion'],
globalVars['service'],
session_token=credentials.token
)
logger.info("Received event: " + json.dumps(event, indent=2))
try:
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(
event['Records'][0]['s3']['object']['key'])
# Get documet (obj) form S3
obj = s3.get_object(Bucket=bucket, Key=key)
except Exception as e:
logger.error('ERROR: {0}'.format(str(e)))
logger.error(
'ERROR: Unable able to GET object:{0} from S3 Bucket:{1}. Verify object exists.'.format(key, bucket))
if (key.endswith('.gz')) or (key.endswith('.tar.gz')):
mycontentzip = gzip.GzipFile(
fileobj=BytesIO(obj['Body'].read())).read()
lines = mycontentzip.decode("utf-8").replace("'", '"')
# print('unzipped file')
else:
lines = obj['Body'].read().decode("utf-8").replace("'", '"')
logger.info('SUCCESS: Retrieved object from S3')
# Split (S3 object/Log File) by lines
lines = lines.splitlines()
if (isinstance(lines, str)):
lines = [lines]
# Index each line to ES Domain
indexName = globalVars['esIndexPrefix'] + \
str(datetime.date.today().year) + '-' + \
str(datetime.date.today().month)
es_Url = globalVars['esHosts'].get(
'prod') + '/' + indexName + '/' + globalVars['esIndexDocType']
docData = {}
docData['objectKey'] = str(key)
docData['createdDate'] = str(obj['LastModified'])
docData['content_type'] = str(obj['ContentType'])
docData['content_length'] = str(obj['ContentLength'])
for line in lines:
docData['content'] = str(line)
indexDocElement(es_Url, awsauth, docData)
logger.info('File processing comlplete. Check logs for index status')
if __name__ == '__main__':
lambda_handler(None, None)