From fccdf3d814f9df61e8195a356249c4b10bfcb64b Mon Sep 17 00:00:00 2001 From: Norbert Funke Date: Fri, 17 Nov 2023 16:16:20 -0500 Subject: [PATCH] added CDPK to table creation --- .../sql/last_value_fill_forward/README.md | 4 ++- .../sql/utils/create_batch_load_task.py | 27 ++++++++++++++++--- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/sample_apps/sql/last_value_fill_forward/README.md b/sample_apps/sql/last_value_fill_forward/README.md index b78cc20c..fba7c700 100644 --- a/sample_apps/sql/last_value_fill_forward/README.md +++ b/sample_apps/sql/last_value_fill_forward/README.md @@ -20,7 +20,8 @@ python3 ./create_batch_load_task.py \ object_key_prefix= \ data_file=../last_value_fill_forward/sensor_with_gaps.csv \ database=amazon-timestream-tools \ - table=sensordata + table=sensordata \ + partition_key=gpio ``` | **⚠ Note**: | @@ -38,3 +39,4 @@ Parameter | Description **data_file** | CSV file for this example | `sensor_with_gaps.csv` **database** | Database in region. Database will be created if not exists. | `amazon-timestream-tools` **table** | Table where data is loaded. If this table does not exist, the table will be created | `sensordata` +**partition_key**| Custome defined partition key (CDPK) used for this data example | `gpio` diff --git a/sample_apps/sql/utils/create_batch_load_task.py b/sample_apps/sql/utils/create_batch_load_task.py index 2b13a7be..9f4538cf 100644 --- a/sample_apps/sql/utils/create_batch_load_task.py +++ b/sample_apps/sql/utils/create_batch_load_task.py @@ -71,7 +71,7 @@ def load_mapping(file_name): return None return data -def create_resources(client, region, database_name, table_name, input_bucket_name, input_object_key_prefix, data_file): +def create_resources(client, region, database_name, table_name, input_bucket_name, input_object_key_prefix, data_file, partition_key): # Upload data file s3 = boto3.resource('s3') @@ -103,10 +103,23 @@ def create_resources(client, region, database_name, table_name, input_bucket_nam magnetic_store_write_properties = { 'EnableMagneticStoreWrites': True } + + schema = { + "CompositePartitionKey": [ + { + "EnforcementInRecord": "REQUIRED", + "Name": partition_key, + "Type": "DIMENSION" + } + ] + } + try: client.create_table(DatabaseName=database_name, TableName=table_name, RetentionProperties=retention_properties, - MagneticStoreWriteProperties=magnetic_store_write_properties) + MagneticStoreWriteProperties=magnetic_store_write_properties, + Schema=schema + ) print("Table [%s] successfully created." % table_name) except client.exceptions.ConflictException: print("Table [%s] exists on database [%s]. Skipping table creation" % ( @@ -165,7 +178,15 @@ def create_batch_load_task(client, database_name, table_name, input_bucket_name, print(write_client.meta.config.user_agent) print(write_client._service_model.operation_names) - success = create_resources(write_client, parameters['region'], parameters['database'], parameters['table'], parameters['input_bucket'], parameters['object_key_prefix'], parameters['data_file']) + success = create_resources(write_client, + parameters['region'], + parameters['database'], + parameters['table'], + parameters['input_bucket'], + parameters['object_key_prefix'], + parameters['data_file'], + parameters['partition_key'] + )