Skip to content

Commit

Permalink
Merge pull request #190 from aayushwadhwa/eventbridge-pipes-integration
Browse files Browse the repository at this point in the history
Add sample for eventbridge pipes integration
  • Loading branch information
sethusrinivasan authored Aug 7, 2024
2 parents 236113e + 1d6dcb3 commit fc1ec5a
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 0 deletions.
70 changes: 70 additions & 0 deletions integrations/eventbridge_pipes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Amazon Timestream integration with AWS EventBridge Pipes

This sample demostrates the process of ingesting data into Amazon Timestream from Kinesis Streams via EventBridge Pipes.

Learn more about this integration and steps to map Kinesis record to Timestream data model from [this blog post](https://aws.amazon.com/blogs/database/build-time-series-applications-faster-with-amazon-eventbridge-pipes-and-timestream-for-liveanalytics/)

## Requirements

* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html) (AWS SAM) installed

## Deployment Instructions

### Via CLI

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/awslabs/amazon-timestream-tools.git
```
1. Change directory:
```
cd integrations/eventbridge_pipes/
```
1. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yml file:
```
sam deploy --guided
```
1. During the prompts:
* Enter a stack name
* Enter the desired AWS Region
* Allow SAM CLI to create IAM roles with the required permissions.
### Via Console
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/awslabs/amazon-timestream-tools.git
```
1. Open the AWS CloudFormation console at https://console.aws.amazon.com/cloudformation
1. On the Stacks page, choose Create stack at top right, and then choose With new resources
1. In Prepare template section, select `Choose an existing template` and then select `Upload a template file` from specify template.
1. Upload the template.yaml file available in integrations/eventbridge_pipes/ of the clone git repository
1. Add the name of the stack and update the names of resources created by the template.
1. Make any other changes, if necessary and create the stack
## How it works
The template will create a Timestream table, Kinesis Stream, SQS Queue, CW log group and an IAM Role, and connect it together with a new EventBridge Pipe.
When records are sent to a Kinesis stream, the Pipe will convert into a valid Timestream record and ingest it to the Timestream table.
Please follow [this documentation](https://docs.aws.amazon.com/timestream/latest/developerguide/Kinesis.html#Kinesis-via-pipes) for more information.
## Testing
```
sh command.sh <Kinesis Stream Name> <Region>
```
## Cleanup
1. Delete the stack
```bash
aws cloudformation delete-stack --stack-name STACK_NAME
```
4 changes: 4 additions & 0 deletions integrations/eventbridge_pipes/command.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

DATE_TIME=$(date -u '+%Y-%m-%d %H:%M:%S.000')
DATA=$(echo "{\"dimension_1\": \"dimension1\", \"measure_1\": \"1.0\", \"time\": \"${DATE_TIME}\", \"version\": \"1\" }" | base64)
aws kinesis put-record --stream-name "$1" --data "$DATA" --partition-key "$RANDOM" --region "$2"
210 changes: 210 additions & 0 deletions integrations/eventbridge_pipes/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Template to connect Kinesis streams to Timestream


Metadata:
AWS::CloudFormation::Interface:
ParameterGroups:
-
Label:
default: "Configuration for Timeatream and EventBridge Pipes"
Parameters:
- KinesisStreamName
- CWGroupName
- DLQName
- TimestreamDatabaseName
- TimestreamTableName
- PipeName

Parameters:
KinesisStreamName:
Description: A name to identify the Kinesis stream.
Type: String
Default: samplekinesisstream
MinLength: 1
MaxLength: 128
CWGroupName:
Description: A name for the cloudwatch log group
Type: String
Default: sampleloggroup
MinLength: 1
MaxLength: 128
DLQName:
Description: The name of the new dead-letter queue
Type: String
Default: sampledlq
MinLength: 1
MaxLength: 64
TimestreamDatabaseName:
Description: The name of the database in Timestream
Type: String
Default: sampledatabase
MinLength: 1
MaxLength: 64
TimestreamTableName:
Description: The name of the table in Timestream
Type: String
Default: sampletable
MinLength: 1
MaxLength: 64
PipeName:
Description: The name of the EventBridge Pipe
Type: String
Default: samplepipe
MinLength: 1
MaxLength: 64

Resources:
# Kinesis Data Stream
Source:
Type: AWS::Kinesis::Stream
Properties:
Name: !Ref KinesisStreamName
RetentionPeriodHours: 168
ShardCount: 10
# DLQ for source
SourceQueueDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: !Ref DLQName
# Target
TimestreamTargetDatabase:
Type: AWS::Timestream::Database
Properties:
DatabaseName: !Ref TimestreamDatabaseName
TimestreamTargetTable:
Type: AWS::Timestream::Table
Properties:
DatabaseName: !Ref TimestreamTargetDatabase
TableName: !Ref TimestreamTableName
RetentionProperties:
MemoryStoreRetentionPeriodInHours: "168"
MagneticStoreRetentionPeriodInDays: "7"
# Cloudwatch log group for debugging
CloudwatchLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Ref CWGroupName
# IAM Role for Pipe
PipeRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- pipes.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: SourcePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- kinesis:DescribeStream
- kinesis:DescribeStreamSummary
- kinesis:GetRecords
- kinesis:GetShardIterator
- kinesis:ListStreams
- kinesis:ListShards
Resource: !GetAtt Source.Arn
- PolicyName: TargetPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- timestream:WriteRecords
Resource: !GetAtt TimestreamTargetTable.Arn
- PolicyName: DescribeEndpointsPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- timestream:DescribeEndpoints
Resource: '*'
- PolicyName: SourceQueueDLQPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !GetAtt SourceQueueDLQ.Arn
- PolicyName: CloudwatchLogPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- logs:PutLogEvents
Resource: !GetAtt CloudwatchLogGroup.Arn
# Pipe connecting Kinesis to Timestream
TimestreamPipe:
Type: AWS::Pipes::Pipe
Properties:
Name: !Ref PipeName
RoleArn: !GetAtt PipeRole.Arn
Source: !GetAtt Source.Arn
SourceParameters:
KinesisStreamParameters:
BatchSize: 10
DeadLetterConfig:
Arn: !GetAtt SourceQueueDLQ.Arn
MaximumBatchingWindowInSeconds: 5
MaximumRecordAgeInSeconds: 300
MaximumRetryAttempts: 10
StartingPosition: LATEST
Target: !GetAtt TimestreamTargetTable.Arn
TargetParameters:
TimestreamParameters:
DimensionMappings:
- DimensionName: dimension_1
DimensionValue: $.data.dimension_1
DimensionValueType: VARCHAR
SingleMeasureMappings:
- MeasureName: measure_1
MeasureValue: $.data.measure_1
MeasureValueType: DOUBLE
TimeFieldType: TIMESTAMP_FORMAT
TimeValue: $.data.time
TimestampFormat: yyyy-MM-dd HH:mm:ss.SSS
VersionValue: $.data.version
LogConfiguration:
CloudwatchLogsLogDestination:
LogGroupArn: !GetAtt CloudwatchLogGroup.Arn
Level: TRACE
IncludeExecutionData:
- ALL
Outputs:
Kinesis:
Description: Source stream to send data to Timestream
Value: !Ref Source
DLQ:
Description: SQS Queue used as Dead letter queue
Value: !Ref SourceQueueDLQ
Database:
Description: Database created in Timestream
Value: !Ref TimestreamTargetDatabase
Table:
Description: Table created in Timestream for storing the ingested records
Value: !Ref TimestreamTargetTable
CWLogs:
Description: Cloudwatch log group to store execution logs from pipes
Value: !Ref CloudwatchLogGroup
Role:
Description: IAM Role used for executing the pipe
Value: !Ref PipeRole
Pipe:
Description: EventBrigde Pipe connecting Kinesis to Timestream
Value: !Ref TimestreamPipe




0 comments on commit fc1ec5a

Please sign in to comment.