Skip to content

Commit

Permalink
Merge pull request #195 from awslabs/tcu-testing
Browse files Browse the repository at this point in the history
Tcu testing scripts which will be referenced TCU blog
  • Loading branch information
sethusrinivasan authored Aug 8, 2024
2 parents a292ee2 + ba777a8 commit 657e61b
Show file tree
Hide file tree
Showing 7 changed files with 796 additions and 0 deletions.
37 changes: 37 additions & 0 deletions tools/python/timestream-compute-units-testing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Timestream Compute Units Performance Testing

## Performance testing

We provided scripts to create Timestream for LiveAnalytics resource (database and table) and ingestion script to load data and finally querying scripts for testing the tcu configuration with different queries concurrently.

## Requirements

1. Make sure you have latest pip package installed
```bash
python3 -m ensurepip --upgrade
```
2. Install Python packages if testing outside of sagemaker notebooks
```bash
python3 -m pip install boto3 numpy matplotlib
```
3. Tested on Python3.8

## How to use it

1. Create Timestream for LiveAnalytics Database and Table, you can use create_timestream_resource [python](./create_timestream_resource.py), [cloudformation template](./create_timestream_resource.yaml) or [terraform](./create_timestream_resource.tf) depending upon your choice.
- You can change the database name, table name, memory and magentic retention within the script or template
2. After resources are created, Execute the [ingestion script](./ingestion.py), python3.8 ingestion.py. If required [You can change the database name,table name, region](https://github.com/awslabs/amazon-timestream-tools/blob/tcu-testing/tools/python/timestream-compute-units-testing/ingestion.py#L183). Script will ingest 100 devops metrics into Timestream for LiveAnalytics table every second. Let the ingestion run for atleast couple hours before you start querying.
3. [Configure the Timestream Compute Unit (TCU)](https://docs.aws.amazon.com/timestream/latest/developerguide/tcu.html), we tested for 4 and 8 TCUs and shared the results and insights in the **blog** (once the blog is published this will be hyperlink to blog-- circular dependency, this needs to published so github link can be referenced in the blog)
4. Run the [lastpoint-query.ipynb](./lastpoint-query.ipynb) and [single-groupby-orderby.ipynb](./single-groupby-orderby.ipynb) notebooks to capture the perfomance metrics for different TCU configuration. These notebooks run one minute for different number of workers (7, 14, 21, 28, 42, 50, 60) concurrently and capture p50, p90, p99, total number of queries per minute, throttles and plot graphs (latency percentiles, Queries Per Minute, Throttling Counts vs number of workers in three different graphs).

## lastpoint-query
Retrieves the most recent memory Utilization for a given host
```sql
select memory from "devops"."sample_devops" where time > ago(10m) and hostname='host1' order by time desc limit 1
```
## single-groupby-orderby
Binning, grouping, and ordering for given host. A relatively more resource intensive query than lastpoint-query
```sql
select bin(time, 1m) AS binned_time, max(cpu_utilization) as max_cpu_utilization from "devops"."sample_devops" where time > ago(10m) and hostname='host2' group by bin(time, 1m) order by binned_time asc
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import boto3

database = "devops"
table = "sample_devops"
region = "us-east-1"
memory_store_retenion_in_hours = 24
magnetic_store_retention_in_days = 365
partition_key = "hostname"

timestream_client = boto3.client('timestream-write', region_name=region)

# create database
try:
timestream_client.create_database(DatabaseName=database)
print(f"Database {database} created successfully")
except timestream_client.exceptions.ConflictException:
print(f"Database {database} exists. Skipping database creation")
except Exception as err:
print(f"Create database failed with error : {err}")
raise

# create table
print("Creating table")
retention_properties = {
'MemoryStoreRetentionPeriodInHours': memory_store_retenion_in_hours,
'MagneticStoreRetentionPeriodInDays': magnetic_store_retention_in_days
}
magnetic_store_write_properties = {
'EnableMagneticStoreWrites': True
}

schema = {
"CompositePartitionKey": [
{
"EnforcementInRecord": "REQUIRED",
"Name": partition_key,
"Type": "DIMENSION"
}
]
}

try:
timestream_client.create_table(DatabaseName=database, TableName=table,
RetentionProperties=retention_properties,
MagneticStoreWriteProperties=magnetic_store_write_properties,
Schema=schema
)
print(f"Table {table} successfully created")
except timestream_client.exceptions.ConflictException:
print(
f"Table {table} exists on database {database}. Skipping table creation")
except Exception as err:
print(f"Create table failed: {err}")
raise
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
resource "aws_timestreamwrite_database" "tcu_testing" {
database_name = "devops"
}


resource "aws_timestreamwrite_table" "tcu_testing" {
database_name = aws_timestreamwrite_database.tcu_testing.database_name
table_name = "sample_devops"

retention_properties {
magnetic_store_retention_period_in_days = 365
memory_store_retention_period_in_hours = 24
}

schema {
composite_partition_key {
enforcement_in_record = "REQUIRED"
name = "hostname"
type = "DIMENSION"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
AWSTemplateFormatVersion: 2010-09-09
Description: Create Timestream Resources


Resources:
MyDatabase:
Type: AWS::Timestream::Database
Properties:
DatabaseName: "devops1"

MyTable:
DependsOn: MyDatabase
Type: AWS::Timestream::Table
Properties:
DatabaseName: !Ref MyDatabase
TableName : "sample_devops"
RetentionProperties:
MemoryStoreRetentionPeriodInHours: "24"
MagneticStoreRetentionPeriodInDays: "7300"
MagneticStoreWriteProperties:
EnableMagneticStoreWrites: true
Schema:
CompositePartitionKey:
- EnforcementInRecord: "REQUIRED"
Name: "hostname"
Type: "DIMENSION"
191 changes: 191 additions & 0 deletions tools/python/timestream-compute-units-testing/ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import random
import time
import boto3
import datetime
import threading
from botocore.config import Config

class Generator:
INTERVAL = 1 # Seconds
INTERVAL_MILLI = 100
BATCH_SIZE = 100


def __init__(self):
self.time_lock = threading.Lock()


def prepare_common_attributes(self):
common_attributes = {
'MeasureName': self.measure_name,
'MeasureValueType': 'MULTI'
}
print(common_attributes)
return common_attributes

def prepare_record(self, current_time):
record = {
'Time': str(current_time),
'TimeUnit': 'SECONDS',
'MeasureValues': [],
'Dimensions':[]
}
return record

def prepare_measure(self, measure_name, measure_value):
measure = {
'Name': measure_name,
'Value': str(measure_value),
'Type': 'DOUBLE'
}
return measure

def prepare_dimension(self, name, value):
dimension = {
'Name': name,
'Value': str(value),
}
return dimension

def write_records(self, records, common_attributes):
try:
result = self.write_client.write_records(DatabaseName=self.database_name,
TableName=self.table_name,
CommonAttributes=common_attributes,
Records=records)
status = result['ResponseMetadata']['HTTPStatusCode']
print("Processed %d records. WriteRecords HTTPStatusCode: %s" %
(len(records), status))
print(result)
except Exception as err:
print("Error:", err)
print(f'Error ingesting data for : {str(err.response["RejectedRecords"])}')

def unix_time_millis(self, dt):
epoch = datetime.datetime.utcfromtimestamp(0)
return (dt - epoch).total_seconds() * 1000.0

def write_buffer(self, buffer, common_attributes):
start_time = time.time()
total_records = 0
for records in buffer:
elapsed_time = time.time() - start_time
self.write_records(records, common_attributes)
total_records += len(records)
if elapsed_time == 0.0:
elapsed_time = 0.00001
rps = total_records/elapsed_time
print(f'{total_records} written rps = {rps}')

def generate_data(self, pid, region, database_name, table_name, max_records):

self.database_name = database_name
self.table_name = table_name
print(f'writing data to database {self.database_name} table {self.table_name}')

session = boto3.Session(region_name=region)
self.write_client = session.client('timestream-write', config=Config(
read_timeout=20, max_pool_connections=5000, retries={'max_attempts': 10}))

self.measure_name = f"metric{pid % 8192}"
common_attributes = self.prepare_common_attributes()

records = []

total_records = 0

launch_time = time.time()
current_time_seconds = int(datetime.datetime.now().timestamp())
current_time_nanoseconds = current_time_seconds * 10**9
time_delta_seconds = 365 * 24 * 60 * 60 # 365 days in seconds
host_number = 0


for i in range(0, int(max_records)):
current_time = int(time.time())
record = self.prepare_record(current_time)
host_number += 1

record['Dimensions'].append(self.prepare_dimension('hostname', f"host{host_number}"))
record['Dimensions'].append(self.prepare_dimension('region', "us-east-1"))
record['Dimensions'].append(self.prepare_dimension('az', f"az{int(random.randint(1,6))}"))
record['MeasureValues'].append(self.prepare_measure('cpu_utilization', float(random.randint(0, 1000)) / 10.0))
record['MeasureValues'].append(self.prepare_measure('memory', float(random.randint(0, 1000)) / 10.0))
record['MeasureValues'].append(self.prepare_measure('network_in', float(random.randint(0, 1000)) / 10.0))
record['MeasureValues'].append(self.prepare_measure('network_out', float(random.randint(0, 1000)) / 10.0))
record['MeasureValues'].append(self.prepare_measure('disk_read_ops', float(random.randint(0, 1000)) / 10.0))
record['MeasureValues'].append(self.prepare_measure('dicsk_write_iops', float(random.randint(0, 1000)) / 10.0))
records.append(record)


if len(records) == self.BATCH_SIZE:
total_records += len(records)

self.write_records(records, common_attributes)

if self.INTERVAL > 0.0:
time.sleep(self.INTERVAL)
host_number = 0
records = []

if len(records) > 0:
total_records += len(records)
self.write_records(records, common_attributes)

total_time = time.time() - launch_time
rps = total_records / total_time
print(f'Total Records in thread: {total_records:,} in {rps} rps')

return total_records


def lambda_handler(event, context):
lambda_time = time.time()
pid = 1
max_threads = event['threads']
threads = []
record_counts = []

for i in range(0, max_threads):
id = i
thread = threading.Thread(target=thread_handler, args=(id, event, context))
thread.start()
print(
f'[{pid}] process_record: Started process #{i} with pid={thread} to dump data chunk to timestream')
time.sleep(0.1)
threads.append(thread)

for thread in threads:
thread.join()

end_time = time.time()
total_time = end_time - lambda_time
total_records = int(event['records'])
rps = total_records / total_time
s_lambda_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(lambda_time))
s_end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
print(f'{s_lambda_time} - {s_end_time}')
print(f'Total Records in lambda: {total_records:,} in {rps} rps')

return

def thread_handler(pid, event, context):
generator = Generator()
region = event['region']
database = event['database']
table = event['table']
threads = int(event['threads'])
max_records_per_thread = int(event['records']) / threads
generator.generate_data(pid, region, database, table, max_records_per_thread)
return

if __name__ == '__main__':
event = {
'threads': 1,
'records': 1000000000,
'region': 'us-east-2',
'database': 'devops',
'table': 'sample_devops'
}
context = {}
lambda_handler(event, context)
Loading

0 comments on commit 657e61b

Please sign in to comment.