Skip to content

Commit

Permalink
Merge pull request #169 from awslabs/ingestor-funknor
Browse files Browse the repository at this point in the history
Ingestor funknor
  • Loading branch information
sethusrinivasan authored Sep 29, 2023
2 parents b7fa9c6 + 905d79e commit d838e93
Show file tree
Hide file tree
Showing 65 changed files with 354 additions and 5 deletions.
13 changes: 8 additions & 5 deletions tools/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
## Amazon Timestream data ingestion and query tools
To understand the performance and scale capabilities of Amazon Timestream, you can run the following workload:
* [Running large scale workloads with Amazon Timestream](perf-scale-workload)
* [Running large scale workloads with Amazon Timestream](python/perf-scale-workload)

The following tools can be used to continuously send data into Amazon Timestream:
* [Publishing data with Amazon Kinesis](kinesis_ingestor)
* [Writing data using a multi-thread Python DevOps data generator](continuous-ingestor)
* [Publishing data with Amazon Kinesis](python/kinesis_ingestor)
* [Writing data using a multi-thread Python DevOps data generator](python/continuous-ingestor)

The following tools show example to write common file formats:
* [Processing Apache Parquet files](python/parquet-writer)

The following tool shows how to use multiple threads to write to Amazon Timestream with Java, while collecting important operational metrics. It includes samples which shows:
* [Local CSV file ingestion to Amazon Timestream](multithreaded-writer#Local-CSV-file-ingestion-to-Timestream)
* [Lambda function ingesting S3 CSV file to Amazon Timestream](multithreaded-writer#Lambda-function-ingesting-S3-CSV-file-to-Timestream)
* [Local CSV file ingestion to Amazon Timestream](java/multithreaded-writer#Local-CSV-file-ingestion-to-Timestream)
* [Lambda function ingesting S3 CSV file to Amazon Timestream](java/multithreaded-writer#Lambda-function-ingesting-S3-CSV-file-to-Timestream)
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes
File renamed without changes.
File renamed without changes.
100 changes: 100 additions & 0 deletions tools/python/parquet-writer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Loading Apache Parquet files into Amazon Timestream

This example illustrates how to load Apache Parquet files and write the data to Amazon Timestream. Some of the characteristics are:

* Loading multiple files in a folder
* For higher ingestion speed multiple threads can be configured. The parameters are preconfigured for 4 threads

## Getting started

This example contains a sample parquet file with the following data structure:

| Field | example content | mapping to Timestream attribute |
|---------|-----------------|---------------------------------|
| signal | data channel | Dimension |
| source | device indentifier | Dimension |
| time | Timestamp | time |
| value | example measure value | Multi Measure column |

The python code can be modified and is able to process more than one measure in multi-measure format.
For modifying the code to your data, please change the following functions:

## Parquet data extraction:

### Function `load_parquet`

This function extracts the data needed for a record and simple transformation can be done here:

```python
def load_parquet(max_threads, folder_name):

...
for df_record in df_records:
buffer_index = record_count % max_threads
time = unix_time_millis(df_record['time'])
signal = df_record['signal']
value = df_record['value']
source = df_record['source']
# print(df_record)
row = {
'time': time,
'signal': signal,
'value': value,
'source': source
}
buffer[buffer_index].append(row)
record_count += 1

return buffer
```

### Function `create_record:`

## Parquet data to Timestream record mapping:


### Function `create_record:`

This function takes the record above and maps to Amazon Timestream attributes

```python
def create_record(self, item):
current_time = str(item['time'])
source = item['source']
value = item['value']
signal = item['signal']

record = self.prepare_record(current_time)

record['Dimensions'].append(self.prepare_dimension('source', source))
record['Dimensions'].append(self.prepare_dimension('signal', signal))
# add more Dimensions from item as needed

record['MeasureValues'].append(self.prepare_measure('value', value))
# append more MeasureValues as measure columns as needed

return record
```
Please append more dimensions or more measure values as needed

## Parameters to run:

The main function allows the following parameters:

| Parameter | Usage|
|-----------|------|
| threads | Number of threads to run, should be at least 1 |
| region | AWS Region of Timestream database |
| database | Database containing the target table |
| table | Target table |
| folder | folder that contains parquet files. A sample file is included in this example |

```json
{
'threads': 4,
'region': 'us-east-1',
'database': 'tools-sandbox',
'table': 'ingestion-parquet',
'folder': './'
}
```
246 changes: 246 additions & 0 deletions tools/python/parquet-writer/parquet_multi_process_lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
import random
import time
import boto3
import datetime
import glob
import numpy as np

import pandas as pd

data_frame = None

# from multiprocessing.sharedctypes import Value, Array
from multiprocessing import Process, Lock

from botocore.config import Config

epoch = datetime.datetime.utcfromtimestamp(0)

def unix_time_millis(dt):
# epoch = datetime.datetime.utcfromtimestamp(0)
return int((dt - pd.Timestamp("1970-01-01")).total_seconds() * 1000.0)

def load_parquet(max_threads, folder_name):

# create buffer for each thread
buffer = []

for i in range(0, max_threads):
buffer.append([])

record_count = 0

for file_name in glob.glob(folder_name + '/value.parquet'):
df = pd.read_parquet(file_name)
print(df)
df_records = df.to_records()

for df_record in df_records:
buffer_index = record_count % max_threads
time = unix_time_millis(df_record['time'])
signal = df_record['signal']
value = df_record['value']
source = df_record['source']
# print(df_record)
row = {
'time': time,
'signal': signal,
'value': value,
'source': source
}
buffer[buffer_index].append(row)
record_count += 1

return buffer

class Generator:
INTERVAL = 0.001 # Seconds
INTERVAL_MILLI = 1
BATCH_SIZE = 100

table_name = ''
database_name = ''

def prepare_common_attributes(self):
common_attributes = {
#'Dimensions': [
# {'Name': 'country', 'Value': COUNTRY.replace('${', '$ {')},
# {'Name': 'country', 'Value': COUNTRY} #,
#{'Name': 'city', 'Value': CITY},
#{'Name': 'hostname', 'Value': HOSTNAME}
#],
'MeasureName': self.measure_name,
'MeasureValueType': 'MULTI'
}
print(common_attributes)
#print(COUNTRY)
#self.variable_test(COUNTRY)
return common_attributes

def prepare_record(self, current_time):
record = {
'Time': str(current_time),
'MeasureValues': [],
'Dimensions':[]
}
return record

def prepare_measure(self, measure_name, measure_value):
measure = {
'Name': measure_name,
'Value': str(measure_value),
'Type': 'DOUBLE'
}
return measure

def prepare_dimension(self, name, value):
dimension = {
'Name': name,
'Value': str(value) #,
#'DimensionValueType': 'VARCHAR'
}
return dimension

def write_records(self, records, common_attributes):
try:
result = self.write_client.write_records(DatabaseName=self.database_name,
TableName=self.table_name,
CommonAttributes=common_attributes,
Records=records)
status = result['ResponseMetadata']['HTTPStatusCode']
#print("Processed %d records. WriteRecords HTTPStatusCode: %s" %
# (len(records), status))
except Exception as err:
print("Error:", err)
print(records)

def unix_time_millis(self, dt):
epoch = datetime.datetime.utcfromtimestamp(0)
return (dt - epoch).total_seconds() * 1000.0

# User can change this based on there record dimension/measure value
def create_record(self, item):
current_time = str(item['time'])
source = item['source']
value = item['value']
signal = item['signal']

record = self.prepare_record(current_time)

record['Dimensions'].append(self.prepare_dimension('source', source))
record['Dimensions'].append(self.prepare_dimension('signal', signal))
# add more Dimensions from item as needed

record['MeasureValues'].append(self.prepare_measure('value', value))
# append more MeasureValues as measure columns as needed

return record

def generate_data(self, pid, region, database_name, table_name, buffer):
self.data = buffer[pid]

self.database_name = database_name
self.table_name = table_name
print(f'writing data to database {self.database_name} table {self.table_name}')

session = boto3.Session(region_name=region)
self.write_client = session.client('timestream-write', config=Config(
read_timeout=20, max_pool_connections=5000, retries={'max_attempts': 10}))

self.measure_name = 'metric-' + str(pid % 8192)
common_attributes = self.prepare_common_attributes()

records = []

total_records = 0

launch_time = time.time()

for item in self.data:
# print(item)
record = self.create_record(item)

records.append(record)

if len(records) == self.BATCH_SIZE:
total_records += len(records)

self.write_records(records, common_attributes)

records = []

if self.INTERVAL > 0.0:
time.sleep(self.INTERVAL)

if len(records) > 0:
total_records += len(records)

self.write_records(records, common_attributes)

total_time = time.time() - launch_time

if total_time == 0:
total_time = 0.00001
rps = total_records / total_time
print(f'Total Records in thread: {total_records:,} in {rps} rps')

return total_records


def lambda_handler(event, context):

max_threads = event['threads']
folder_name = event['folder']
records = load_parquet(max_threads, folder_name)

lambda_time = time.time()

pid = 1


processes = []
record_counts = []

for i in range(0, max_threads):
id = i
process = Process(target=thread_handler, args=(id, event, context, records))
process.start()
print(
f'[{pid}] process_record: Started process #{i} with pid={process.pid} to dump data chunk to timestream')
processes.append(process)

# Wait for all processes to complete
for process in processes:
process.join()

end_time = time.time()
total_time = end_time - lambda_time

s_lambda_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(lambda_time))
s_end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
print(f'{s_lambda_time} - {s_end_time}')

return

def thread_handler(pid, event, context, records):
generator = Generator()
region = event['region']
database = event['database']
table = event['table']
threads = int(event['threads'])
generator.generate_data(pid, region, database, table, records)

return

if __name__ == '__main__':
event = {
'threads': 4,
'region': 'us-east-1',
'database': 'tools-sandbox',
'table': 'ingestion-parquet',
'folder': './'
}
context = {}
lambda_handler(event, context)


Binary file added tools/python/parquet-writer/value.parquet
Binary file not shown.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit d838e93

Please sign in to comment.