diff --git a/integrations/kafka_connector/README.md b/integrations/kafka_connector/README.md index 11db0b29..87d478f9 100644 --- a/integrations/kafka_connector/README.md +++ b/integrations/kafka_connector/README.md @@ -3,10 +3,13 @@ The Timestream Kafka Sink Connector is designed to work with Kafka Connect and to be deployed to a Kafka Connect cluster. The deployed connector polls for events from a source Kafka topic, and ingests them as records to a target Timestream table. -[Amazon Timestream](https://aws.amazon.com/timestream/) is a fast, scalable, and serverless time series database service that makes it straightforward to store and analyze trillions +[Amazon Timestream for LiveAnalytics](https://aws.amazon.com/timestream/) is a fast, scalable, and serverless time series database service that makes it straightforward to store and analyze trillions of events per day for use cases like monitoring hundreds of millions of Internet of Things (IoT) devices, industrial equipment, gaming sessions, streaming video sessions, and more. +With [Amazon Timestream for InfluxDB](https://aws.amazon.com/timestream/), you can easily run open source InfluxDB databases on AWS for time-series applications, +such as real-time alerting and monitoring infrastructure reliability, with millisecond response times. Timestream for InfluxDB provides up to 99.9% availability. + You can securely scale your streaming data platform with hundreds and thousands of Kafka clusters using [Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/msk/) (Amazon MSK), a fully managed service to build and run applications to process streaming data, which simplifies the setup, scaling, and management of clusters running Kafka. And [Amazon MSK Connect](https://aws.amazon.com/msk/features/msk-connect/) enables you to deploy, monitor, and automatically scale connectors that move data between your MSK or Kafka clusters and external systems. @@ -28,26 +31,36 @@ As data arrives, an instance of the Timestream Sink Connector for Apache Kafka v The following table lists the complete set of the Timestream Kafka Sink Connector configuration properties -| # | Key |
Description
|
Remarks
| Required | Default | -|----|----------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------| -| 1 | connector.class | Specifies the name of the connector class | Must be mentioned as "software.amazon.timestream.TimestreamSinkConnector" | Yes | NONE | -| 2 | tasks.max | The maximum number of active tasks for a sink connector | Non negative number | Yes | NONE | -| 3 | aws.region | The region in which the AWS service resources are provisioned | Example: "us-east-1"; see [here](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html) for the list of regions | Yes | NONE | -| 4 | topics | Name of the Kafka topic which needs to be polled for messages | | Yes | NONE | -| 5 | timestream.schema.s3.bucket.name | Name of the Amazon S3 bucket in which the target Timestream table's schema definition is present | | Yes | NONE | -| 6 | timestream.schema.s3.key | S3 object key of the targeted Timestream table schema | | Yes | NONE | -| 7 | timestream.database.name | Name of the Timestream database where the table exists | See [Create a database](https://docs.aws.amazon.com/timestream/latest/developerguide/console_timestream.html#console_timestream.db.using-console) for details | Yes | NONE | -| 8 | timestream.table.name | Name of the Timestream table where the events will be ingested as records | See [Create a table](https://docs.aws.amazon.com/timestream/latest/developerguide/console_timestream.html#console_timestream.table.using-console) for details | Yes | NONE | -| 9 | timestream.ingestion.endpoint | Ingestion endpoint for Timestream, in URI format | Example: https://ingest-cell1.timestream.ap-southeast-2.amazonaws.com; see [here](https://docs.aws.amazon.com/timestream/latest/developerguide/VPCEndpoints.html) for details | Yes | NONE | -| 10 | timestream.connections.max | The maximum number of allowed concurrently opened HTTP connections to the Timestream service. | See [Write SDK client](https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.write-client.html) for further details | No | 5000 | -| 11 | timestream.connections.timeoutseconds | The time in seconds the AWS SDK will wait for a query request before timing out. Non-positive value disables request timeout. | See [Write SDK client](https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.write-client.html) for recommended values | No | 20 | -| 12 | timestream.connections.retries | The maximum number of retry attempts for retryable errors with 5XX error codes in the SDK. The value must be non-negative. | See [Write SDK client](https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.write-client.html) for recommended values | No | 10 | -| 13 | timestream.record.batch.size | The maximum number of records in a WriteRecords API request. | | No | 100 | -| 14 | timestream.record.versioning.auto | Enable if upserts are required. By default the version is set to 1 | See [WriteRecords](https://docs.aws.amazon.com/timestream/latest/developerguide/API_WriteRecords.html) for further details | No | false | -| 15 | timestream.record.dimension.skip.empty | When a dimension value is not present/ empty, only that dimension would be skipped by default. | If disabled, it would be logged as error and the whole record would be skipped. See [Amazon Timestream concepts](https://docs.aws.amazon.com/timestream/latest/developerguide/concepts.html) for further details | No | true | -| 16 | timestream.record.measure.skip.empty | When a measure value is not present/ empty, only that measure would be skipped by default. | If disabled, it would be logged as error and the whole record would be skipped. See [Amazon Timestream concepts](https://docs.aws.amazon.com/timestream/latest/developerguide/concepts.html) for further details | No | true | - -#### Sample: Connector Configuration +| # | Key |
Description
|
Remarks
| Required | Default | +|----|----------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------| +| 1 | connector.class | Specifies the name of the connector class | Must be mentioned as "software.amazon.timestream.TimestreamSinkConnector" | Yes | NONE | +| 2 | tasks.max | The maximum number of active tasks for a sink connector | Non negative number | Yes | NONE | +| 3 | aws.region | The region in which the AWS service resources are provisioned | Example: "us-east-1"; see [here](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html) for the list of regions | Yes | NONE | +| 4 | topics | Name of the Kafka topic which needs to be polled for messages | | Yes | NONE | +| 5 | timestream.schema.s3.bucket.name | Name of the Amazon S3 bucket in which the target Timestream table's schema definition is present | | Yes | NONE | +| 6 | timestream.schema.s3.key | S3 object key of the targeted Timestream table schema | | Yes | NONE | +| 7 | timestream.target.liveanalytics | Enable or disable ingestion of data into Timestream for LiveAnalytics | true / false | Yes | false | +| 8 | timestream.target.influxdb | Enable or disable ingestion of data into Timestream for InfluxDB | true / false | Yes | false | +| 9 | timestream.database.name | Name of the Timestream database where the table exists | See [Create a database](https://docs.aws.amazon.com/timestream/latest/developerguide/console_timestream.html#console_timestream.db.using-console) for details | Yes | NONE | +| 10 | timestream.table.name | Name of the Timestream table where the events will be ingested as records | See [Create a table](https://docs.aws.amazon.com/timestream/latest/developerguide/console_timestream.html#console_timestream.table.using-console) for details | Yes | NONE | +| 11 | timestream.ingestion.endpoint | Ingestion endpoint for Timestream, in URI format | Example: https://ingest-cell1.timestream.ap-southeast-2.amazonaws.com; see [here](https://docs.aws.amazon.com/timestream/latest/developerguide/VPCEndpoints.html) for details | Yes | NONE | +| 12 | timestream.connections.max | The maximum number of allowed concurrently opened HTTP connections to the Timestream service. | See [Write SDK client](https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.write-client.html) for further details | No | 5000 | +| 13 | timestream.connections.timeoutseconds | The time in seconds the AWS SDK will wait for a query request before timing out. Non-positive value disables request timeout. | See [Write SDK client](https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.write-client.html) for recommended values | No | 20 | +| 14 | timestream.connections.retries | The maximum number of retry attempts for retryable errors with 5XX error codes in the SDK. The value must be non-negative. | See [Write SDK client](https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.write-client.html) for recommended values | No | 10 | +| 15 | timestream.record.batch.size | The maximum number of records in a WriteRecords API request. | | No | 100 | +| 16 | timestream.record.versioning.auto | Enable if upserts are required. By default the version is set to 1 | See [WriteRecords](https://docs.aws.amazon.com/timestream/latest/developerguide/API_WriteRecords.html) for further details | No | false | +| 17 | timestream.record.dimension.skip.empty | When a dimension value is not present/ empty, only that dimension would be skipped by default. | If disabled, it would be logged as error and the whole record would be skipped. See [Amazon Timestream concepts](https://docs.aws.amazon.com/timestream/latest/developerguide/concepts.html) for further details | No | true | +| 18 | timestream.record.measure.skip.empty | When a measure value is not present/ empty, only that measure would be skipped by default. | If disabled, it would be logged as error and the whole record would be skipped. See [Amazon Timestream concepts](https://docs.aws.amazon.com/timestream/latest/developerguide/concepts.html) for further details | No | true | +| 19 | timestream.influxdb.bucket | InfluxDB local bucket to write data to, must be created on InfluxDB prior to writing data. | Required for InfluxDB when target enabled. An [InfluxDB bucket](https://docs.influxdata.com/influxdb/v2/admin/buckets/) is a named location where time series data is locally stored on the instance. Please note these buckets are unrelated to AWS S3 buckets. | Yes | NONE | +| 20 | timestream.influxdb.org | InfluxDB Organization of InfluxDB instance | Required for InfluxDB when target enabled | Yes | NONE | +| 21 | timestream.influxdb.url | API URL for writing data to | Required for InfluxDB when target enabled | Yes | NONE | +| 22 | timestream.influxdb.token | API token for InfluxDB | Required for InfluxDB when target enabled | Yes | NONE | + +--- +**NOTE:** +For they connector to write data to Amazon Timestream either ```timestream.target.liveanalytics``` or ```timestream.target.influxdb``` must be set to ```true``` +--- +#### Sample: Connector Configuration Amazon Timestream for LiveAnalytics ```properties aws.region=ap-southeast-2 @@ -56,11 +69,29 @@ tasks.max=2 topics=purchase-history timestream.schema.s3.bucket.name=msk-timestream-ap-southeast-2-plugins-bucket timestream.schema.s3.key=purchase_history.json +timestream.target.liveanalytics=true timestream.database.name=kafkastream timestream.ingestion.endpoint=https://ingest-cell1.timestream.ap-southeast-2.amazonaws.com timestream.table.name=purchase-history ``` +#### Sample: Connector Configuration Amazon Timestream for InfluxDB + +```properties +aws.region=us-east-1 +connector.class=software.amazon.timestream.TimestreamSinkConnector +tasks.max=2 +topics=purchase-history +timestream.schema.s3.bucket.name=msk-timestream-us-east-1-plugins-bucket +timestream.schema.s3.key=purchase_history.json +timestream.target.influxdb=true +timestream.influxdb.bucket=purchase-history +timestream.influxdb.org=wwso-sa +timestream.influxdb.url=https://-east-1.timestream-influxdb.amazonaws.com:8086 +timestream.influxdb.token= +``` + + ### Worker Configuration parameters A worker is a Java virtual machine (JVM) process that runs the connector logic. See [Workers](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html) for additional details. diff --git a/integrations/kafka_connector/pom.xml b/integrations/kafka_connector/pom.xml index 773fbd4e..1e0a6036 100644 --- a/integrations/kafka_connector/pom.xml +++ b/integrations/kafka_connector/pom.xml @@ -124,6 +124,13 @@ 5.8.0 test + + com.influxdb + influxdb-client-java + + 6.6.0 + + @@ -136,6 +143,11 @@ true true + + + diff --git a/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConnectorConfig.java b/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConnectorConfig.java index 7a49ea73..3c467215 100644 --- a/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConnectorConfig.java +++ b/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConnectorConfig.java @@ -137,7 +137,40 @@ public static ConfigDef conf() { ConfigDef.Type.STRING, "org.apache.kafka.common.serialization.StringSerializer", ConfigDef.Importance.LOW, - "Serializer class for Value"); + "Serializer class for Value") + /// INFLUXDB + .define(TimestreamSinkConstants.LIVE_ANALYTICS_ENABLE, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.HIGH, + "LiveAnalytics Ingestion Enabled") + .define(TimestreamSinkConstants.INFLUXDB_ENABLE, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.HIGH, + "InfluxDB Ingestion Enabled") + .define(TimestreamSinkConstants.INFLUXDB_BUCKET, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.HIGH, + "InfluxDB Target Bucket") + .define(TimestreamSinkConstants.INFLUXDB_URL, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.HIGH, + "InfluxDB API URL") + .define(TimestreamSinkConstants.INFLUXDB_TOKEN, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.HIGH, + "InfluxDB API Token") + .define(TimestreamSinkConstants.INFLUXDB_ORG, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.HIGH, + "InfluxDB Organization") + //////////////////////////////// + ; } /** @@ -166,6 +199,69 @@ public Region getAWSRegion() { } } + ////// INFLUXDB Configs + public boolean isLiveAnalyticsEnabled() { + try { + return getBoolean(TimestreamSinkConstants.LIVE_ANALYTICS_ENABLE); + } catch (final ConfigException ex) { + final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG, + TimestreamSinkConstants.LIVE_ANALYTICS_ENABLE, ex); + throw new TimestreamSinkConnectorException(error, ex); + } + } + + public boolean isInfluxDBEnabled() { + try { + return getBoolean(TimestreamSinkConstants.INFLUXDB_ENABLE); + } catch (final ConfigException ex) { + final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG, + TimestreamSinkConstants.INFLUXDB_ENABLE, ex); + throw new TimestreamSinkConnectorException(error, ex); + } + } + + public String getInfluxDBBucket() { + try { + return getString(TimestreamSinkConstants.INFLUXDB_BUCKET); + } catch (final ConfigException ex) { + final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG, + TimestreamSinkConstants.INFLUXDB_BUCKET, ex); + throw new TimestreamSinkConnectorException(error, ex); + } + } + + public String getInfluxDBUrl() { + try { + return getString(TimestreamSinkConstants.INFLUXDB_URL); + } catch (final ConfigException ex) { + final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG, + TimestreamSinkConstants.INFLUXDB_URL, ex); + throw new TimestreamSinkConnectorException(error, ex); + } + } + + public String getInfluxDBToken() { + try { + return getString(TimestreamSinkConstants.INFLUXDB_TOKEN); + } catch (final ConfigException ex) { + final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG, + TimestreamSinkConstants.INFLUXDB_TOKEN, ex); + throw new TimestreamSinkConnectorException(error, ex); + } + } + public String getInfluxDBOrg() { + try { + return getString(TimestreamSinkConstants.INFLUXDB_ORG); + } catch (final ConfigException ex) { + final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG, + TimestreamSinkConstants.INFLUXDB_ORG, ex); + throw new TimestreamSinkConnectorException(error, ex); + } + } + + + //////////////////////////////////////////////////////// + /** * * @return The maximum number of retry attempts for retryable errors. diff --git a/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConstants.java b/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConstants.java index 256cd2ac..3da2f6d1 100644 --- a/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConstants.java +++ b/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConstants.java @@ -128,4 +128,15 @@ public class TimestreamSinkConstants { */ public static final String CONST_DLQ_CLIENT_ID = "DLQPublisher"; + public static final String LIVE_ANALYTICS_ENABLE = "timestream.target.liveanalytics"; + + public static final String INFLUXDB_ENABLE = "timestream.target.influxdb"; + + public static final String INFLUXDB_BUCKET = "timestream.influxdb.bucket"; + + public static final String INFLUXDB_URL = "timestream.influxdb.url"; + + public static final String INFLUXDB_TOKEN = "timestream.influxdb.token"; + + public static final String INFLUXDB_ORG = "timestream.influxdb.org"; } \ No newline at end of file diff --git a/integrations/kafka_connector/src/main/java/software/amazon/timestream/exception/TimestreamSinkErrorCodes.java b/integrations/kafka_connector/src/main/java/software/amazon/timestream/exception/TimestreamSinkErrorCodes.java index d1e96410..d293df18 100644 --- a/integrations/kafka_connector/src/main/java/software/amazon/timestream/exception/TimestreamSinkErrorCodes.java +++ b/integrations/kafka_connector/src/main/java/software/amazon/timestream/exception/TimestreamSinkErrorCodes.java @@ -120,4 +120,11 @@ public class TimestreamSinkErrorCodes { */ public static final String INVALID_MEASURE_VALUE ="invalid.measure.value"; + /** + * Error code: no target engine specified (InfluxDB or LiveAnalytics) + */ + public static final String NO_INGESTION_TARGET ="No ingestion engine specified, enable either LiveAnalytics or InfluxDB"; + + //// INFLUX DB + // #TODO fix later to add detailed errors } diff --git a/integrations/kafka_connector/src/main/java/software/amazon/timestream/utility/TimestreamWriter.java b/integrations/kafka_connector/src/main/java/software/amazon/timestream/utility/TimestreamWriter.java index 11763a17..67375cf9 100644 --- a/integrations/kafka_connector/src/main/java/software/amazon/timestream/utility/TimestreamWriter.java +++ b/integrations/kafka_connector/src/main/java/software/amazon/timestream/utility/TimestreamWriter.java @@ -1,5 +1,6 @@ package software.amazon.timestream.utility; +import com.influxdb.client.WriteApiBlocking; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,6 +18,16 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; +// Influx: +import com.influxdb.annotations.Column; +import com.influxdb.annotations.Measurement; +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.WriteApi; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; +import com.influxdb.query.FluxTable; + /** * Class that receives the non-empty Kafka messages as {@link SinkRecord} * objects and writes to Timestream table as records @@ -60,6 +71,18 @@ public class TimestreamWriter { */ private final DataModel schemaDefinition; + //// INFLUXDB + private final Boolean liveAnalyticsEnabled; + private final Boolean influxDBEnabled; + private final String influxDBBucket; + private final String influxDBUrl; + private final String influxDBToken; + private final String influxDBOrg; + private static InfluxDBClient influxDBClient = null; + private static WriteApiBlocking influxWriteApi = null; + + //////////////////////// + /** * * @param schemaDefinition table schema @@ -73,6 +96,46 @@ public TimestreamWriter(final DataModel schemaDefinition, final TimestreamSinkCo this.skipDimension = config.isSkipEmptyDimensions(); this.skipMeasure = config.isSkipEmptyMeasures(); this.schemaDefinition = schemaDefinition; + + // InfluxDB + this.liveAnalyticsEnabled = config.isLiveAnalyticsEnabled(); + this.influxDBEnabled = config.isInfluxDBEnabled(); + this.influxDBBucket = config.getInfluxDBBucket(); + this.influxDBUrl = config.getInfluxDBUrl(); + this.influxDBToken = config.getInfluxDBToken(); + this.influxDBOrg = config.getInfluxDBOrg(); + + if (!this.influxDBEnabled && !this.liveAnalyticsEnabled) { + LOGGER.error("ERROR::TimeStreamWriter:: initialization failed on : [{}]", TimestreamSinkErrorCodes.NO_INGESTION_TARGET); + + } + + if (this.influxDBEnabled) { + this.influxDBClient = getInfluxDBClient( + this.influxDBUrl, this.influxDBToken, this.influxDBBucket, this.influxDBOrg + ); + if (this.influxDBClient != null) { + LOGGER.info("INFO::TimeStreamWriter:: influxDB client successfull connected: [{}] [{}] [{}]", + this.influxDBUrl, this.influxDBBucket, influxDBOrg + ); + this.influxWriteApi = getInfluxDBWriteApi(this.influxDBClient); + if (this.influxWriteApi == null) { + LOGGER.error("ERROR::TimeStreamWriter:: influxDB writer API successfull connected: [{}] [{}] [{}]", + this.influxDBUrl, this.influxDBBucket, influxDBOrg + ); + } + else { + LOGGER.info("INFO::TimeStreamWriter:: influxDB writer API successfull connected: [{}] [{}] [{}]", + this.influxDBUrl, this.influxDBBucket, influxDBOrg + ); + } + } + else { + LOGGER.error("ERROR::TimeStreamWriter:: getInfluxDBClient failed on : [{}]", this.influxDBUrl); + } + } + + ///////////////////////// } /** @@ -81,7 +144,6 @@ public TimestreamWriter(final DataModel schemaDefinition, final TimestreamSinkCo * @param sinkRecords List of incoming records from the source Kafka topic */ public List writeRecords(final AWSServiceClientFactory clientFactory, final Collection sinkRecords) { - LOGGER.trace("Begin::TimeStreamWriter::writeRecords"); final List rejectedRecords = new ArrayList<>(); final List records = getTimestreamRecordsFromSinkRecords(sinkRecords, rejectedRecords); @@ -89,26 +151,55 @@ public List writeRecords(final AWSServiceClientFactory clientFac final int batchSize = records.size() / TimestreamSinkConstants.DEFAULT_BATCHSIZE + 1; List batchRecords = null; for (int currentBatch = 0; currentBatch < batchSize; currentBatch ++) { - try { + if (!this.influxDBEnabled && !this.liveAnalyticsEnabled) { + // no target specified, cannot write, send records to DLQ batchRecords = getBatchRecords(records, currentBatch); - if (batchRecords != null && !batchRecords.isEmpty()) { - final WriteRecordsRequest writeRequest = WriteRecordsRequest.builder() - .databaseName(databaseName) - .tableName(tableName) - .records(batchRecords) - .build(); - final WriteRecordsResponse writeResponse = clientFactory.getTimestreamClient().writeRecords(writeRequest); - LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: batch size [{}], status [{}] ", batchRecords.size(), writeResponse.sdkHttpResponse().statusCode()); - } else { - LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: Batch ingestion is complete for the records of size [{}] ", records.size()); + for (Record record : batchRecords) { + RejectedRecord rejectedRecord = new RejectedRecord(record,TimestreamSinkErrorCodes.NO_INGESTION_TARGET); + rejectedRecords.add(rejectedRecord); } - } catch (RejectedRecordsException e) { - LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Few records have been rejected in the batch [{}] , due to [{}]", currentBatch, e.getLocalizedMessage()); - if (e.hasRejectedRecords()) { - rejectedRecords.addAll(getRejectedTimestreamRecords(e.rejectedRecords(), batchRecords)); + LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Records have been rejected in the batch [{}] , due to [{}]", currentBatch, TimestreamSinkErrorCodes.NO_INGESTION_TARGET); + } + else { + try { + batchRecords = getBatchRecords(records, currentBatch); + if (batchRecords != null && !batchRecords.isEmpty()) { + if (this.liveAnalyticsEnabled) { + + final WriteRecordsRequest writeRequest = WriteRecordsRequest.builder() + .databaseName(databaseName) + .tableName(tableName) + .records(batchRecords) + .build(); + final WriteRecordsResponse writeResponse = clientFactory.getTimestreamClient().writeRecords(writeRequest); + LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: batch size [{}], status [{}] ", batchRecords.size(), writeResponse.sdkHttpResponse().statusCode()); + } else { + LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: LiveAnalytics disabled"); + } + if (this.influxDBEnabled && influxWriteApi != null) { + LOGGER.info("INFO::TimeStreamWriter::writeRecords: InfluxDB writing {} records", batchRecords.size()); + + final ArrayList pointList = convertLiveAnalyticsRecord(batchRecords); + // enhance here + influxWriteApi.writePoints(pointList); + + /* // writing one record at time only + convertAndWriteLiveAnalyticsRecord(batchRecords); + */ + } else { + LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: InfluxDB disabled"); + } + } else { + LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: Batch ingestion is complete for the records of size [{}] ", records.size()); + } + } catch (RejectedRecordsException e) { + LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Few records have been rejected in the batch [{}] , due to [{}]", currentBatch, e.getLocalizedMessage()); + if (e.hasRejectedRecords()) { + rejectedRecords.addAll(getRejectedTimestreamRecords(e.rejectedRecords(), batchRecords)); + } + } catch (SdkException e) { + LOGGER.error("ERROR::TimeStreamWriter::writeRecords", e); } - } catch (SdkException e) { - LOGGER.error("ERROR::TimeStreamWriter::writeRecords", e); } } } @@ -373,4 +464,103 @@ private List getBatchRecords(final List allRecords, final int cu } return batch; } + + // InfluxDB enhancements + + public static InfluxDBClient getInfluxDBClient(String url, String token, String bucket, String org) { + if (influxDBClient != null) return influxDBClient; + + InfluxDBClient client = InfluxDBClientFactory.create( + url, + token.toCharArray(), org, bucket); + + influxDBClient = client; + + return influxDBClient; + } + + public static WriteApiBlocking getInfluxDBWriteApi(InfluxDBClient client) { + if (influxWriteApi==null) { + WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking(); + influxWriteApi = writeApi; + } + return influxWriteApi; + } + + private ArrayList convertLiveAnalyticsRecord(List recordList){ + ArrayList pointList = new ArrayList(); + for (final Record record : recordList) { + // LOGGER.trace("Sink Record: {} ", record); + + List dimensions = record.dimensions(); + List measureValues = record.measureValues(); + String measureName = record.measureName(); + Long time = Long.parseLong(record.time()); + + Point point = Point + .measurement(measureName) + .time(time, WritePrecision.MS); + + for (Dimension dimension : dimensions) { + String key = dimension.name(); + String value = dimension.value(); + point = point.addTag(key, value); + } + + for (MeasureValue measureValue : measureValues) { + String key = measureValue.name(); + String value = measureValue.value(); + MeasureValueType type = measureValue.type(); + + switch (type) { + case TIMESTAMP: // not existent in InfluxDB, use Long for now + case BIGINT: + Long value_l = Long.parseLong(value); + point = point.addField(key, value_l); + break; + case DOUBLE: + Double value_d = Double.parseDouble(value); + point = point.addField(key,value_d); + break; + case BOOLEAN: + Boolean valued_b = Boolean.parseBoolean(value); + point = point.addField(key, valued_b); + case VARCHAR: + point = point.addField(key, value); + break; + } + } + + /* // STATIC from InfluxDB example code + Point point = Point + .measurement("mem") + .addTag("host", "host1") + .addField("used_percent", 23.43234543) + .time(1714180752000L, WritePrecision.MS); + // replace above with data from SinkRecord + */ + + LOGGER.info("Complete::TimeStreamWriter::convertLiveAnalyticsRecord: line protocol [{}]", point.toLineProtocol()); + + pointList.add(point); + } + return pointList; + } + + private void convertAndWriteLiveAnalyticsRecord(List recordList){ + for (final Record record : recordList) { + // LOGGER.trace("Sink Record: {} ", record); + Point point = Point + .measurement("mem") + .addTag("host", "host1") + .addField("used_percent", 23.43234543) + .time(1714180752000L, WritePrecision.MS); + // replace above with data from SinkRecord + WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking(); + writeApi.writePoint(influxDBBucket, influxDBOrg, point); + } + } + + ////////////////// + } \ No newline at end of file