diff --git a/tools/README.md b/tools/README.md index f5234ad2..e2faec2d 100644 --- a/tools/README.md +++ b/tools/README.md @@ -1,11 +1,14 @@ ## Amazon Timestream data ingestion and query tools To understand the performance and scale capabilities of Amazon Timestream, you can run the following workload: -* [Running large scale workloads with Amazon Timestream](perf-scale-workload) +* [Running large scale workloads with Amazon Timestream](python/perf-scale-workload) The following tools can be used to continuously send data into Amazon Timestream: -* [Publishing data with Amazon Kinesis](kinesis_ingestor) -* [Writing data using a multi-thread Python DevOps data generator](continuous-ingestor) +* [Publishing data with Amazon Kinesis](python/kinesis_ingestor) +* [Writing data using a multi-thread Python DevOps data generator](python/continuous-ingestor) + +The following tools show example to write common file formats: +* [Processing Apache Parquet files](python/parquet-writer) The following tool shows how to use multiple threads to write to Amazon Timestream with Java, while collecting important operational metrics. It includes samples which shows: -* [Local CSV file ingestion to Amazon Timestream](multithreaded-writer#Local-CSV-file-ingestion-to-Timestream) -* [Lambda function ingesting S3 CSV file to Amazon Timestream](multithreaded-writer#Lambda-function-ingesting-S3-CSV-file-to-Timestream) \ No newline at end of file +* [Local CSV file ingestion to Amazon Timestream](java/multithreaded-writer#Local-CSV-file-ingestion-to-Timestream) +* [Lambda function ingesting S3 CSV file to Amazon Timestream](java/multithreaded-writer#Lambda-function-ingesting-S3-CSV-file-to-Timestream) \ No newline at end of file diff --git a/tools/multithreaded-writer/.gitignore b/tools/java/multithreaded-writer/.gitignore similarity index 100% rename from tools/multithreaded-writer/.gitignore rename to tools/java/multithreaded-writer/.gitignore diff --git a/tools/multithreaded-writer/README.md b/tools/java/multithreaded-writer/README.md similarity index 100% rename from tools/multithreaded-writer/README.md rename to tools/java/multithreaded-writer/README.md diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/README.md b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/README.md similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/README.md rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/README.md diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/.gitignore b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/.gitignore similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/.gitignore rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/.gitignore diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/1-create-bucket.sh b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/1-create-bucket.sh similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/1-create-bucket.sh rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/1-create-bucket.sh diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/2-build-upload.sh b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/2-build-upload.sh similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/2-build-upload.sh rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/2-build-upload.sh diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/cloudformation-lambda.yaml b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/cloudformation-lambda.yaml similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/cloudformation-lambda.yaml rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/deployment/cloudformation-lambda.yaml diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/pom.xml b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/pom.xml similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/pom.xml rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/pom.xml diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/EnvVariablesHelper.java b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/EnvVariablesHelper.java similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/EnvVariablesHelper.java rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/EnvVariablesHelper.java diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/EventInfo.java b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/EventInfo.java similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/EventInfo.java rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/EventInfo.java diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/Handler.java b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/Handler.java similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/Handler.java rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/Handler.java diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/HandlerState.java b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/HandlerState.java similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/HandlerState.java rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/java/com/amazonaws/sample/lambda/HandlerState.java diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/resources/log4j2.xml b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/resources/log4j2.xml similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/resources/log4j2.xml rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/main/resources/log4j2.xml diff --git a/tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/test/java/com/amazonaws/sample/lambda/HandlerTest.java b/tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/test/java/com/amazonaws/sample/lambda/HandlerTest.java similarity index 100% rename from tools/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/test/java/com/amazonaws/sample/lambda/HandlerTest.java rename to tools/java/multithreaded-writer/SampleLambdaFromS3CsvIngestion/src/test/java/com/amazonaws/sample/lambda/HandlerTest.java diff --git a/tools/multithreaded-writer/SampleLocalCsvIngestion/README.md b/tools/java/multithreaded-writer/SampleLocalCsvIngestion/README.md similarity index 100% rename from tools/multithreaded-writer/SampleLocalCsvIngestion/README.md rename to tools/java/multithreaded-writer/SampleLocalCsvIngestion/README.md diff --git a/tools/multithreaded-writer/SampleLocalCsvIngestion/pom.xml b/tools/java/multithreaded-writer/SampleLocalCsvIngestion/pom.xml similarity index 100% rename from tools/multithreaded-writer/SampleLocalCsvIngestion/pom.xml rename to tools/java/multithreaded-writer/SampleLocalCsvIngestion/pom.xml diff --git a/tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/ingestion/Main.java b/tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/ingestion/Main.java similarity index 100% rename from tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/ingestion/Main.java rename to tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/ingestion/Main.java diff --git a/tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/ingestion/SampleCsvIngestion.java b/tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/ingestion/SampleCsvIngestion.java similarity index 100% rename from tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/ingestion/SampleCsvIngestion.java rename to tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/ingestion/SampleCsvIngestion.java diff --git a/tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/mapping/SampleCsvMapper.java b/tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/mapping/SampleCsvMapper.java similarity index 100% rename from tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/mapping/SampleCsvMapper.java rename to tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/mapping/SampleCsvMapper.java diff --git a/tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/mapping/SampleCsvRow.java b/tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/mapping/SampleCsvRow.java similarity index 100% rename from tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/mapping/SampleCsvRow.java rename to tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/java/com/amazonaws/sample/csv/mapping/SampleCsvRow.java diff --git a/tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/resources/log4j2.xml b/tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/resources/log4j2.xml similarity index 100% rename from tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/resources/log4j2.xml rename to tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/resources/log4j2.xml diff --git a/tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/resources/sample_withHeader.csv b/tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/resources/sample_withHeader.csv similarity index 100% rename from tools/multithreaded-writer/SampleLocalCsvIngestion/src/main/resources/sample_withHeader.csv rename to tools/java/multithreaded-writer/SampleLocalCsvIngestion/src/main/resources/sample_withHeader.csv diff --git a/tools/multithreaded-writer/TimestreamMultithreadedWriter/pom.xml b/tools/java/multithreaded-writer/TimestreamMultithreadedWriter/pom.xml similarity index 100% rename from tools/multithreaded-writer/TimestreamMultithreadedWriter/pom.xml rename to tools/java/multithreaded-writer/TimestreamMultithreadedWriter/pom.xml diff --git a/tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriter.java b/tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriter.java similarity index 100% rename from tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriter.java rename to tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriter.java diff --git a/tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterConfig.java b/tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterConfig.java similarity index 100% rename from tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterConfig.java rename to tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterConfig.java diff --git a/tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterImpl.java b/tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterImpl.java similarity index 100% rename from tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterImpl.java rename to tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterImpl.java diff --git a/tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterWorker.java b/tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterWorker.java similarity index 100% rename from tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterWorker.java rename to tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/TimestreamWriterWorker.java diff --git a/tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/metrics/TimestreamInsertionMetrics.java b/tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/metrics/TimestreamInsertionMetrics.java similarity index 100% rename from tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/metrics/TimestreamInsertionMetrics.java rename to tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/metrics/TimestreamInsertionMetrics.java diff --git a/tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/metrics/TimestreamWriterMetrics.java b/tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/metrics/TimestreamWriterMetrics.java similarity index 100% rename from tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/metrics/TimestreamWriterMetrics.java rename to tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/metrics/TimestreamWriterMetrics.java diff --git a/tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/util/LogMetricsPublisher.java b/tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/util/LogMetricsPublisher.java similarity index 100% rename from tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/util/LogMetricsPublisher.java rename to tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/util/LogMetricsPublisher.java diff --git a/tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/util/TimestreamInitializer.java b/tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/util/TimestreamInitializer.java similarity index 100% rename from tools/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/util/TimestreamInitializer.java rename to tools/java/multithreaded-writer/TimestreamMultithreadedWriter/src/main/java/com/amazonaws/sample/timestream/multithreaded/util/TimestreamInitializer.java diff --git a/tools/multithreaded-writer/pom.xml b/tools/java/multithreaded-writer/pom.xml similarity index 100% rename from tools/multithreaded-writer/pom.xml rename to tools/java/multithreaded-writer/pom.xml diff --git a/tools/multithreaded-writer/sample-lambda-from-s3-csv-ingestion.drawio b/tools/java/multithreaded-writer/sample-lambda-from-s3-csv-ingestion.drawio similarity index 100% rename from tools/multithreaded-writer/sample-lambda-from-s3-csv-ingestion.drawio rename to tools/java/multithreaded-writer/sample-lambda-from-s3-csv-ingestion.drawio diff --git a/tools/multithreaded-writer/sample-lambda-from-s3-csv-ingestion.png b/tools/java/multithreaded-writer/sample-lambda-from-s3-csv-ingestion.png similarity index 100% rename from tools/multithreaded-writer/sample-lambda-from-s3-csv-ingestion.png rename to tools/java/multithreaded-writer/sample-lambda-from-s3-csv-ingestion.png diff --git a/tools/multithreaded-writer/sample-local-csv-ingestion-diagram.drawio b/tools/java/multithreaded-writer/sample-local-csv-ingestion-diagram.drawio similarity index 100% rename from tools/multithreaded-writer/sample-local-csv-ingestion-diagram.drawio rename to tools/java/multithreaded-writer/sample-local-csv-ingestion-diagram.drawio diff --git a/tools/multithreaded-writer/sample-local-csv-ingestion-diagram.png b/tools/java/multithreaded-writer/sample-local-csv-ingestion-diagram.png similarity index 100% rename from tools/multithreaded-writer/sample-local-csv-ingestion-diagram.png rename to tools/java/multithreaded-writer/sample-local-csv-ingestion-diagram.png diff --git a/tools/multithreaded-writer/timestream-multithreaded-writer-diagram.drawio b/tools/java/multithreaded-writer/timestream-multithreaded-writer-diagram.drawio similarity index 100% rename from tools/multithreaded-writer/timestream-multithreaded-writer-diagram.drawio rename to tools/java/multithreaded-writer/timestream-multithreaded-writer-diagram.drawio diff --git a/tools/multithreaded-writer/timestream-multithreaded-writer-diagram.png b/tools/java/multithreaded-writer/timestream-multithreaded-writer-diagram.png similarity index 100% rename from tools/multithreaded-writer/timestream-multithreaded-writer-diagram.png rename to tools/java/multithreaded-writer/timestream-multithreaded-writer-diagram.png diff --git a/tools/continuous-ingestor/README.md b/tools/python/continuous-ingestor/README.md similarity index 100% rename from tools/continuous-ingestor/README.md rename to tools/python/continuous-ingestor/README.md diff --git a/tools/continuous-ingestor/TestRunbook.ipynb b/tools/python/continuous-ingestor/TestRunbook.ipynb similarity index 100% rename from tools/continuous-ingestor/TestRunbook.ipynb rename to tools/python/continuous-ingestor/TestRunbook.ipynb diff --git a/tools/continuous-ingestor/__init__.py b/tools/python/continuous-ingestor/__init__.py similarity index 100% rename from tools/continuous-ingestor/__init__.py rename to tools/python/continuous-ingestor/__init__.py diff --git a/tools/continuous-ingestor/requirements.txt b/tools/python/continuous-ingestor/requirements.txt similarity index 100% rename from tools/continuous-ingestor/requirements.txt rename to tools/python/continuous-ingestor/requirements.txt diff --git a/tools/continuous-ingestor/sinsawsnr.png b/tools/python/continuous-ingestor/sinsawsnr.png similarity index 100% rename from tools/continuous-ingestor/sinsawsnr.png rename to tools/python/continuous-ingestor/sinsawsnr.png diff --git a/tools/continuous-ingestor/timestream_sample_continuous_data_ingestor_application.py b/tools/python/continuous-ingestor/timestream_sample_continuous_data_ingestor_application.py similarity index 100% rename from tools/continuous-ingestor/timestream_sample_continuous_data_ingestor_application.py rename to tools/python/continuous-ingestor/timestream_sample_continuous_data_ingestor_application.py diff --git a/tools/kinesis_ingestor/README.md b/tools/python/kinesis_ingestor/README.md similarity index 100% rename from tools/kinesis_ingestor/README.md rename to tools/python/kinesis_ingestor/README.md diff --git a/tools/kinesis_ingestor/__init__.py b/tools/python/kinesis_ingestor/__init__.py similarity index 100% rename from tools/kinesis_ingestor/__init__.py rename to tools/python/kinesis_ingestor/__init__.py diff --git a/tools/kinesis_ingestor/timestream_kinesis_data_gen.py b/tools/python/kinesis_ingestor/timestream_kinesis_data_gen.py similarity index 100% rename from tools/kinesis_ingestor/timestream_kinesis_data_gen.py rename to tools/python/kinesis_ingestor/timestream_kinesis_data_gen.py diff --git a/tools/python/parquet-writer/README.md b/tools/python/parquet-writer/README.md new file mode 100644 index 00000000..bb412138 --- /dev/null +++ b/tools/python/parquet-writer/README.md @@ -0,0 +1,100 @@ +# Loading Apache Parquet files into Amazon Timestream + +This example illustrates how to load Apache Parquet files and write the data to Amazon Timestream. Some of the characteristics are: + +* Loading multiple files in a folder +* For higher ingestion speed multiple threads can be configured. The parameters are preconfigured for 4 threads + +## Getting started + +This example contains a sample parquet file with the following data structure: + +| Field | example content | mapping to Timestream attribute | +|---------|-----------------|---------------------------------| +| signal | data channel | Dimension | +| source | device indentifier | Dimension | +| time | Timestamp | time | +| value | example measure value | Multi Measure column | + +The python code can be modified and is able to process more than one measure in multi-measure format. +For modifying the code to your data, please change the following functions: + +## Parquet data extraction: + +### Function `load_parquet` + +This function extracts the data needed for a record and simple transformation can be done here: + +```python +def load_parquet(max_threads, folder_name): + +... + for df_record in df_records: + buffer_index = record_count % max_threads + time = unix_time_millis(df_record['time']) + signal = df_record['signal'] + value = df_record['value'] + source = df_record['source'] + # print(df_record) + row = { + 'time': time, + 'signal': signal, + 'value': value, + 'source': source + } + buffer[buffer_index].append(row) + record_count += 1 + + return buffer +``` + +### Function `create_record:` + +## Parquet data to Timestream record mapping: + + +### Function `create_record:` + +This function takes the record above and maps to Amazon Timestream attributes + +```python + def create_record(self, item): + current_time = str(item['time']) + source = item['source'] + value = item['value'] + signal = item['signal'] + + record = self.prepare_record(current_time) + + record['Dimensions'].append(self.prepare_dimension('source', source)) + record['Dimensions'].append(self.prepare_dimension('signal', signal)) + # add more Dimensions from item as needed + + record['MeasureValues'].append(self.prepare_measure('value', value)) + # append more MeasureValues as measure columns as needed + + return record +``` +Please append more dimensions or more measure values as needed + +## Parameters to run: + +The main function allows the following parameters: + +| Parameter | Usage| +|-----------|------| +| threads | Number of threads to run, should be at least 1 | +| region | AWS Region of Timestream database | +| database | Database containing the target table | +| table | Target table | +| folder | folder that contains parquet files. A sample file is included in this example | + +```json + { + 'threads': 4, + 'region': 'us-east-1', + 'database': 'tools-sandbox', + 'table': 'ingestion-parquet', + 'folder': './' + } +``` \ No newline at end of file diff --git a/tools/python/parquet-writer/parquet_multi_process_lambda_function.py b/tools/python/parquet-writer/parquet_multi_process_lambda_function.py new file mode 100644 index 00000000..fe016c1a --- /dev/null +++ b/tools/python/parquet-writer/parquet_multi_process_lambda_function.py @@ -0,0 +1,246 @@ +import random +import time +import boto3 +import datetime +import glob +import numpy as np + +import pandas as pd + +data_frame = None + +# from multiprocessing.sharedctypes import Value, Array +from multiprocessing import Process, Lock + +from botocore.config import Config + +epoch = datetime.datetime.utcfromtimestamp(0) + +def unix_time_millis(dt): + # epoch = datetime.datetime.utcfromtimestamp(0) + return int((dt - pd.Timestamp("1970-01-01")).total_seconds() * 1000.0) + +def load_parquet(max_threads, folder_name): + + # create buffer for each thread + buffer = [] + + for i in range(0, max_threads): + buffer.append([]) + + record_count = 0 + + for file_name in glob.glob(folder_name + '/value.parquet'): + df = pd.read_parquet(file_name) + print(df) + df_records = df.to_records() + + for df_record in df_records: + buffer_index = record_count % max_threads + time = unix_time_millis(df_record['time']) + signal = df_record['signal'] + value = df_record['value'] + source = df_record['source'] + # print(df_record) + row = { + 'time': time, + 'signal': signal, + 'value': value, + 'source': source + } + buffer[buffer_index].append(row) + record_count += 1 + + return buffer + +class Generator: + INTERVAL = 0.001 # Seconds + INTERVAL_MILLI = 1 + BATCH_SIZE = 100 + + table_name = '' + database_name = '' + + def prepare_common_attributes(self): + common_attributes = { + #'Dimensions': [ + # {'Name': 'country', 'Value': COUNTRY.replace('${', '$ {')}, + # {'Name': 'country', 'Value': COUNTRY} #, + #{'Name': 'city', 'Value': CITY}, + #{'Name': 'hostname', 'Value': HOSTNAME} + #], + 'MeasureName': self.measure_name, + 'MeasureValueType': 'MULTI' + } + print(common_attributes) + #print(COUNTRY) + #self.variable_test(COUNTRY) + return common_attributes + + def prepare_record(self, current_time): + record = { + 'Time': str(current_time), + 'MeasureValues': [], + 'Dimensions':[] + } + return record + + def prepare_measure(self, measure_name, measure_value): + measure = { + 'Name': measure_name, + 'Value': str(measure_value), + 'Type': 'DOUBLE' + } + return measure + + def prepare_dimension(self, name, value): + dimension = { + 'Name': name, + 'Value': str(value) #, + #'DimensionValueType': 'VARCHAR' + } + return dimension + + def write_records(self, records, common_attributes): + try: + result = self.write_client.write_records(DatabaseName=self.database_name, + TableName=self.table_name, + CommonAttributes=common_attributes, + Records=records) + status = result['ResponseMetadata']['HTTPStatusCode'] + #print("Processed %d records. WriteRecords HTTPStatusCode: %s" % + # (len(records), status)) + except Exception as err: + print("Error:", err) + print(records) + + def unix_time_millis(self, dt): + epoch = datetime.datetime.utcfromtimestamp(0) + return (dt - epoch).total_seconds() * 1000.0 + + # User can change this based on there record dimension/measure value + def create_record(self, item): + current_time = str(item['time']) + source = item['source'] + value = item['value'] + signal = item['signal'] + + record = self.prepare_record(current_time) + + record['Dimensions'].append(self.prepare_dimension('source', source)) + record['Dimensions'].append(self.prepare_dimension('signal', signal)) + # add more Dimensions from item as needed + + record['MeasureValues'].append(self.prepare_measure('value', value)) + # append more MeasureValues as measure columns as needed + + return record + + def generate_data(self, pid, region, database_name, table_name, buffer): + self.data = buffer[pid] + + self.database_name = database_name + self.table_name = table_name + print(f'writing data to database {self.database_name} table {self.table_name}') + + session = boto3.Session(region_name=region) + self.write_client = session.client('timestream-write', config=Config( + read_timeout=20, max_pool_connections=5000, retries={'max_attempts': 10})) + + self.measure_name = 'metric-' + str(pid % 8192) + common_attributes = self.prepare_common_attributes() + + records = [] + + total_records = 0 + + launch_time = time.time() + + for item in self.data: + # print(item) + record = self.create_record(item) + + records.append(record) + + if len(records) == self.BATCH_SIZE: + total_records += len(records) + + self.write_records(records, common_attributes) + + records = [] + + if self.INTERVAL > 0.0: + time.sleep(self.INTERVAL) + + if len(records) > 0: + total_records += len(records) + + self.write_records(records, common_attributes) + + total_time = time.time() - launch_time + + if total_time == 0: + total_time = 0.00001 + rps = total_records / total_time + print(f'Total Records in thread: {total_records:,} in {rps} rps') + + return total_records + + +def lambda_handler(event, context): + + max_threads = event['threads'] + folder_name = event['folder'] + records = load_parquet(max_threads, folder_name) + + lambda_time = time.time() + + pid = 1 + + + processes = [] + record_counts = [] + + for i in range(0, max_threads): + id = i + process = Process(target=thread_handler, args=(id, event, context, records)) + process.start() + print( + f'[{pid}] process_record: Started process #{i} with pid={process.pid} to dump data chunk to timestream') + processes.append(process) + + # Wait for all processes to complete + for process in processes: + process.join() + + end_time = time.time() + total_time = end_time - lambda_time + + s_lambda_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(lambda_time)) + s_end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) + print(f'{s_lambda_time} - {s_end_time}') + + return + +def thread_handler(pid, event, context, records): + generator = Generator() + region = event['region'] + database = event['database'] + table = event['table'] + threads = int(event['threads']) + generator.generate_data(pid, region, database, table, records) + + return + +if __name__ == '__main__': + event = { + 'threads': 4, + 'region': 'us-east-1', + 'database': 'tools-sandbox', + 'table': 'ingestion-parquet', + 'folder': './' + } + context = {} + lambda_handler(event, context) + + diff --git a/tools/python/parquet-writer/value.parquet b/tools/python/parquet-writer/value.parquet new file mode 100644 index 00000000..41558567 Binary files /dev/null and b/tools/python/parquet-writer/value.parquet differ diff --git a/tools/perf-scale-workload/README.md b/tools/python/perf-scale-workload/README.md similarity index 100% rename from tools/perf-scale-workload/README.md rename to tools/python/perf-scale-workload/README.md diff --git a/tools/perf-scale-workload/config.ini b/tools/python/perf-scale-workload/config.ini similarity index 100% rename from tools/perf-scale-workload/config.ini rename to tools/python/perf-scale-workload/config.ini diff --git a/tools/perf-scale-workload/config_row_count.ini b/tools/python/perf-scale-workload/config_row_count.ini similarity index 100% rename from tools/perf-scale-workload/config_row_count.ini rename to tools/python/perf-scale-workload/config_row_count.ini diff --git a/tools/perf-scale-workload/continuous_ingester.py b/tools/python/perf-scale-workload/continuous_ingester.py similarity index 100% rename from tools/perf-scale-workload/continuous_ingester.py rename to tools/python/perf-scale-workload/continuous_ingester.py diff --git a/tools/perf-scale-workload/devops_cleanup_resources.py b/tools/python/perf-scale-workload/devops_cleanup_resources.py similarity index 100% rename from tools/perf-scale-workload/devops_cleanup_resources.py rename to tools/python/perf-scale-workload/devops_cleanup_resources.py diff --git a/tools/perf-scale-workload/devops_ingestion_driver.py b/tools/python/perf-scale-workload/devops_ingestion_driver.py similarity index 100% rename from tools/perf-scale-workload/devops_ingestion_driver.py rename to tools/python/perf-scale-workload/devops_ingestion_driver.py diff --git a/tools/perf-scale-workload/devops_query_driver.py b/tools/python/perf-scale-workload/devops_query_driver.py similarity index 100% rename from tools/perf-scale-workload/devops_query_driver.py rename to tools/python/perf-scale-workload/devops_query_driver.py diff --git a/tools/perf-scale-workload/model.py b/tools/python/perf-scale-workload/model.py similarity index 100% rename from tools/perf-scale-workload/model.py rename to tools/python/perf-scale-workload/model.py diff --git a/tools/perf-scale-workload/query_executer.py b/tools/python/perf-scale-workload/query_executer.py similarity index 100% rename from tools/perf-scale-workload/query_executer.py rename to tools/python/perf-scale-workload/query_executer.py diff --git a/tools/perf-scale-workload/query_execution_utils.py b/tools/python/perf-scale-workload/query_execution_utils.py similarity index 100% rename from tools/perf-scale-workload/query_execution_utils.py rename to tools/python/perf-scale-workload/query_execution_utils.py diff --git a/tools/perf-scale-workload/requirements.txt b/tools/python/perf-scale-workload/requirements.txt similarity index 100% rename from tools/perf-scale-workload/requirements.txt rename to tools/python/perf-scale-workload/requirements.txt diff --git a/tools/perf-scale-workload/summarize_results.py b/tools/python/perf-scale-workload/summarize_results.py similarity index 100% rename from tools/perf-scale-workload/summarize_results.py rename to tools/python/perf-scale-workload/summarize_results.py diff --git a/tools/perf-scale-workload/timestreamquery.py b/tools/python/perf-scale-workload/timestreamquery.py similarity index 100% rename from tools/perf-scale-workload/timestreamquery.py rename to tools/python/perf-scale-workload/timestreamquery.py diff --git a/tools/perf-scale-workload/timestreamwrite.py b/tools/python/perf-scale-workload/timestreamwrite.py similarity index 100% rename from tools/perf-scale-workload/timestreamwrite.py rename to tools/python/perf-scale-workload/timestreamwrite.py