From cbdd70270e382323f1f44f0bf4413b5282d92445 Mon Sep 17 00:00:00 2001 From: Aayush Wadhwa Date: Wed, 12 Jun 2024 19:55:28 -0700 Subject: [PATCH 1/2] Add sample for eventbridge pipes integration --- integrations/eventbridge_pipes/README.md | 51 ++++++++ integrations/eventbridge_pipes/command.sh | 4 + integrations/eventbridge_pipes/template.yaml | 131 +++++++++++++++++++ 3 files changed, 186 insertions(+) create mode 100644 integrations/eventbridge_pipes/README.md create mode 100644 integrations/eventbridge_pipes/command.sh create mode 100644 integrations/eventbridge_pipes/template.yaml diff --git a/integrations/eventbridge_pipes/README.md b/integrations/eventbridge_pipes/README.md new file mode 100644 index 00000000..dd517934 --- /dev/null +++ b/integrations/eventbridge_pipes/README.md @@ -0,0 +1,51 @@ +# Amazon Timestream integration with AWS EventBridge Pipes + +This sample demostrates the process of ingesting data into Amazon Timestream from Kinesis Streams via EventBridge Pipes. + +Learn more about this integration and steps to map Kinesis record to Timestream data model from [this blog post](https://aws.amazon.com/blogs/database/build-time-series-applications-faster-with-amazon-eventbridge-pipes-and-timestream-for-liveanalytics/) + +## Requirements + +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html) (AWS SAM) installed + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/awslabs/amazon-timestream-tools.git + ``` +1. Change directory: + ``` + cd integrations/eventbridge_pipes/ + ``` +1. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yml file: + ``` + sam deploy --guided + ``` +1. During the prompts: + * Enter a stack name + * Enter the desired AWS Region + * Allow SAM CLI to create IAM roles with the required permissions. + +## How it works + +The template will create a Timestream table, Kinesis Stream, SQS Queue, CW log group and an IAM Role, and connect it together with a new EventBridge Pipe. + +When records are sent to a Kinesis stream, the Pipe will convert into a valid Timestream record and ingest it to the Timestream table. + +Please follow [this documentation](https://docs.aws.amazon.com/timestream/latest/developerguide/Kinesis.html#Kinesis-via-pipes) for more information. + +## Testing + +``` +sh command.sh +``` + +## Cleanup + +1. Delete the stack + ```bash + aws cloudformation delete-stack --stack-name STACK_NAME + ``` \ No newline at end of file diff --git a/integrations/eventbridge_pipes/command.sh b/integrations/eventbridge_pipes/command.sh new file mode 100644 index 00000000..d387ba1b --- /dev/null +++ b/integrations/eventbridge_pipes/command.sh @@ -0,0 +1,4 @@ + +DATE_TIME=$(date -u '+%Y-%m-%d %H:%M:%S.000') +DATA=$(echo "{\"dimension_1\": \"dimension1\", \"measure_1\": \"1.0\", \"time\": \"${DATE_TIME}\", \"version\": \"1\" }" | base64) +aws kinesis put-record --stream-name "$1" --data "$DATA" --partition-key "$RANDOM" --region "$2" \ No newline at end of file diff --git a/integrations/eventbridge_pipes/template.yaml b/integrations/eventbridge_pipes/template.yaml new file mode 100644 index 00000000..5f0aee8b --- /dev/null +++ b/integrations/eventbridge_pipes/template.yaml @@ -0,0 +1,131 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: Template to connect Kinesis streams to Timestream + +Resources: + # Kinesis Data Stream + Source: + Type: AWS::Kinesis::Stream + Properties: + Name: !Sub ${AWS::StackName}-source + RetentionPeriodHours: 168 + ShardCount: 10 + # DLQ for source + SourceQueueDLQ: + Type: AWS::SQS::Queue + Properties: + QueueName: !Sub ${AWS::StackName}-dlq + # Target + TimestreamTargetDatabase: + Type: AWS::Timestream::Database + Properties: + DatabaseName: !Sub ${AWS::StackName}-db + TimestreamTargetTable: + Type: AWS::Timestream::Table + Properties: + DatabaseName: !Ref TimestreamTargetDatabase + TableName: !Sub ${AWS::StackName}-tbl + RetentionProperties: + MemoryStoreRetentionPeriodInHours: "168" + MagneticStoreRetentionPeriodInDays: "7" + # Cloudwatch log group for debugging + CloudwatchLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub ${AWS::StackName}-pipe-logs + # IAM Role for Pipe + PipeRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: + - pipes.amazonaws.com + Action: + - sts:AssumeRole + Policies: + - PolicyName: SourcePolicy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - kinesis:DescribeStream + - kinesis:DescribeStreamSummary + - kinesis:GetRecords + - kinesis:GetShardIterator + - kinesis:ListStreams + - kinesis:ListShards + Resource: !GetAtt Source.Arn + - PolicyName: TargetPolicy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - timestream:WriteRecords + Resource: !GetAtt TimestreamTargetTable.Arn + - PolicyName: DescribeEndpointsPolicy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - timestream:DescribeEndpoints + Resource: '*' + - PolicyName: SourceQueueDLQPolicy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - sqs:SendMessage + Resource: !GetAtt SourceQueueDLQ.Arn + - PolicyName: CloudwatchLogPolicy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - logs:PutLogEvents + Resource: !GetAtt CloudwatchLogGroup.Arn + # Pipe connecting Kinesis to Timestream + TimestreamPipe: + Type: AWS::Pipes::Pipe + Properties: + Name: !Sub ${AWS::StackName}-pipe + RoleArn: !GetAtt PipeRole.Arn + Source: !GetAtt Source.Arn + SourceParameters: + KinesisStreamParameters: + BatchSize: 10 + DeadLetterConfig: + Arn: !GetAtt SourceQueueDLQ.Arn + MaximumBatchingWindowInSeconds: 5 + MaximumRecordAgeInSeconds: 300 + MaximumRetryAttempts: 10 + StartingPosition: LATEST + Target: !GetAtt TimestreamTargetTable.Arn + TargetParameters: + TimestreamParameters: + DimensionMappings: + - DimensionName: dimension_1 + DimensionValue: $.data.dimension_1 + DimensionValueType: VARCHAR + SingleMeasureMappings: + - MeasureName: measure_1 + MeasureValue: $.data.measure_1 + MeasureValueType: DOUBLE + TimeFieldType: TIMESTAMP_FORMAT + TimeValue: $.data.time + TimestampFormat: yyyy-MM-dd HH:mm:ss.SSS + VersionValue: $.data.version + LogConfiguration: + CloudwatchLogsLogDestination: + LogGroupArn: !GetAtt CloudwatchLogGroup.Arn + Level: TRACE + IncludeExecutionData: + - ALL \ No newline at end of file From 1d6dcb31cf4d78f95116da4c3d39f77a773fe0c3 Mon Sep 17 00:00:00 2001 From: Aayush Wadhwa Date: Mon, 5 Aug 2024 19:56:49 -0700 Subject: [PATCH 2/2] Incorporate suggested changes in PR --- integrations/eventbridge_pipes/README.md | 19 ++++ integrations/eventbridge_pipes/template.yaml | 93 ++++++++++++++++++-- 2 files changed, 105 insertions(+), 7 deletions(-) diff --git a/integrations/eventbridge_pipes/README.md b/integrations/eventbridge_pipes/README.md index dd517934..f72fcc72 100644 --- a/integrations/eventbridge_pipes/README.md +++ b/integrations/eventbridge_pipes/README.md @@ -12,6 +12,8 @@ Learn more about this integration and steps to map Kinesis record to Timestream ## Deployment Instructions +### Via CLI + 1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: ``` git clone https://github.com/awslabs/amazon-timestream-tools.git @@ -29,6 +31,23 @@ Learn more about this integration and steps to map Kinesis record to Timestream * Enter the desired AWS Region * Allow SAM CLI to create IAM roles with the required permissions. +### Via Console +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/awslabs/amazon-timestream-tools.git + ``` +1. Open the AWS CloudFormation console at https://console.aws.amazon.com/cloudformation + +1. On the Stacks page, choose Create stack at top right, and then choose With new resources + +1. In Prepare template section, select `Choose an existing template` and then select `Upload a template file` from specify template. + +1. Upload the template.yaml file available in integrations/eventbridge_pipes/ of the clone git repository + +1. Add the name of the stack and update the names of resources created by the template. + +1. Make any other changes, if necessary and create the stack + ## How it works The template will create a Timestream table, Kinesis Stream, SQS Queue, CW log group and an IAM Role, and connect it together with a new EventBridge Pipe. diff --git a/integrations/eventbridge_pipes/template.yaml b/integrations/eventbridge_pipes/template.yaml index 5f0aee8b..92168a69 100644 --- a/integrations/eventbridge_pipes/template.yaml +++ b/integrations/eventbridge_pipes/template.yaml @@ -2,29 +2,82 @@ AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Template to connect Kinesis streams to Timestream + +Metadata: + AWS::CloudFormation::Interface: + ParameterGroups: + - + Label: + default: "Configuration for Timeatream and EventBridge Pipes" + Parameters: + - KinesisStreamName + - CWGroupName + - DLQName + - TimestreamDatabaseName + - TimestreamTableName + - PipeName + +Parameters: + KinesisStreamName: + Description: A name to identify the Kinesis stream. + Type: String + Default: samplekinesisstream + MinLength: 1 + MaxLength: 128 + CWGroupName: + Description: A name for the cloudwatch log group + Type: String + Default: sampleloggroup + MinLength: 1 + MaxLength: 128 + DLQName: + Description: The name of the new dead-letter queue + Type: String + Default: sampledlq + MinLength: 1 + MaxLength: 64 + TimestreamDatabaseName: + Description: The name of the database in Timestream + Type: String + Default: sampledatabase + MinLength: 1 + MaxLength: 64 + TimestreamTableName: + Description: The name of the table in Timestream + Type: String + Default: sampletable + MinLength: 1 + MaxLength: 64 + PipeName: + Description: The name of the EventBridge Pipe + Type: String + Default: samplepipe + MinLength: 1 + MaxLength: 64 + Resources: # Kinesis Data Stream Source: Type: AWS::Kinesis::Stream Properties: - Name: !Sub ${AWS::StackName}-source + Name: !Ref KinesisStreamName RetentionPeriodHours: 168 ShardCount: 10 # DLQ for source SourceQueueDLQ: Type: AWS::SQS::Queue Properties: - QueueName: !Sub ${AWS::StackName}-dlq + QueueName: !Ref DLQName # Target TimestreamTargetDatabase: Type: AWS::Timestream::Database Properties: - DatabaseName: !Sub ${AWS::StackName}-db + DatabaseName: !Ref TimestreamDatabaseName TimestreamTargetTable: Type: AWS::Timestream::Table Properties: DatabaseName: !Ref TimestreamTargetDatabase - TableName: !Sub ${AWS::StackName}-tbl + TableName: !Ref TimestreamTableName RetentionProperties: MemoryStoreRetentionPeriodInHours: "168" MagneticStoreRetentionPeriodInDays: "7" @@ -32,7 +85,7 @@ Resources: CloudwatchLogGroup: Type: AWS::Logs::LogGroup Properties: - LogGroupName: !Sub ${AWS::StackName}-pipe-logs + LogGroupName: !Ref CWGroupName # IAM Role for Pipe PipeRole: Type: AWS::IAM::Role @@ -96,7 +149,7 @@ Resources: TimestreamPipe: Type: AWS::Pipes::Pipe Properties: - Name: !Sub ${AWS::StackName}-pipe + Name: !Ref PipeName RoleArn: !GetAtt PipeRole.Arn Source: !GetAtt Source.Arn SourceParameters: @@ -128,4 +181,30 @@ Resources: LogGroupArn: !GetAtt CloudwatchLogGroup.Arn Level: TRACE IncludeExecutionData: - - ALL \ No newline at end of file + - ALL +Outputs: + Kinesis: + Description: Source stream to send data to Timestream + Value: !Ref Source + DLQ: + Description: SQS Queue used as Dead letter queue + Value: !Ref SourceQueueDLQ + Database: + Description: Database created in Timestream + Value: !Ref TimestreamTargetDatabase + Table: + Description: Table created in Timestream for storing the ingested records + Value: !Ref TimestreamTargetTable + CWLogs: + Description: Cloudwatch log group to store execution logs from pipes + Value: !Ref CloudwatchLogGroup + Role: + Description: IAM Role used for executing the pipe + Value: !Ref PipeRole + Pipe: + Description: EventBrigde Pipe connecting Kinesis to Timestream + Value: !Ref TimestreamPipe + + + +