diff --git a/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConnectorConfig.java b/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConnectorConfig.java index 14457a6e..3c467215 100644 --- a/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConnectorConfig.java +++ b/integrations/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConnectorConfig.java @@ -141,7 +141,7 @@ public static ConfigDef conf() { /// INFLUXDB .define(TimestreamSinkConstants.LIVE_ANALYTICS_ENABLE, ConfigDef.Type.BOOLEAN, - true, + false, ConfigDef.Importance.HIGH, "LiveAnalytics Ingestion Enabled") .define(TimestreamSinkConstants.INFLUXDB_ENABLE, diff --git a/integrations/kafka_connector/src/main/java/software/amazon/timestream/exception/TimestreamSinkErrorCodes.java b/integrations/kafka_connector/src/main/java/software/amazon/timestream/exception/TimestreamSinkErrorCodes.java index eea20fd7..d293df18 100644 --- a/integrations/kafka_connector/src/main/java/software/amazon/timestream/exception/TimestreamSinkErrorCodes.java +++ b/integrations/kafka_connector/src/main/java/software/amazon/timestream/exception/TimestreamSinkErrorCodes.java @@ -120,6 +120,11 @@ public class TimestreamSinkErrorCodes { */ public static final String INVALID_MEASURE_VALUE ="invalid.measure.value"; + /** + * Error code: no target engine specified (InfluxDB or LiveAnalytics) + */ + public static final String NO_INGESTION_TARGET ="No ingestion engine specified, enable either LiveAnalytics or InfluxDB"; + //// INFLUX DB // #TODO fix later to add detailed errors } diff --git a/integrations/kafka_connector/src/main/java/software/amazon/timestream/utility/TimestreamWriter.java b/integrations/kafka_connector/src/main/java/software/amazon/timestream/utility/TimestreamWriter.java index 8bf7b15e..5356d024 100644 --- a/integrations/kafka_connector/src/main/java/software/amazon/timestream/utility/TimestreamWriter.java +++ b/integrations/kafka_connector/src/main/java/software/amazon/timestream/utility/TimestreamWriter.java @@ -125,6 +125,11 @@ public TimestreamWriter(final DataModel schemaDefinition, final TimestreamSinkCo } } + if (!this.influxDBEnabled && !this.liveAnalyticsEnabled) { + LOGGER.error("ERROR::TimeStreamWriter:: initialization failed on : [{}]", TimestreamSinkErrorCodes.NO_INGESTION_TARGET); + + } + ///////////////////////// } @@ -144,47 +149,55 @@ public List writeRecords(final AWSServiceClientFactory clientFac final int batchSize = records.size() / TimestreamSinkConstants.DEFAULT_BATCHSIZE + 1; List batchRecords = null; for (int currentBatch = 0; currentBatch < batchSize; currentBatch ++) { - try { + if (!writeToInfluxDB && !writeToLiveAnalytics) { + // no target specified, cannot write, send records to DLQ batchRecords = getBatchRecords(records, currentBatch); - if (batchRecords != null && !batchRecords.isEmpty()) { - if (writeToLiveAnalytics) { - - final WriteRecordsRequest writeRequest = WriteRecordsRequest.builder() - .databaseName(databaseName) - .tableName(tableName) - .records(batchRecords) - .build(); - final WriteRecordsResponse writeResponse = clientFactory.getTimestreamClient().writeRecords(writeRequest); - LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: batch size [{}], status [{}] ", batchRecords.size(), writeResponse.sdkHttpResponse().statusCode()); - } - else { - LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: LiveAnalytics disabled"); - } - if (writeToInfluxDB && influxWriteApi != null) { - LOGGER.info("INFO::TimeStreamWriter::writeRecords: InfluxDB writing {} records", batchRecords.size()); - - final ArrayList pointList = convertLiveAnalyticsRecord(batchRecords); - // enhance here - influxWriteApi.writePoints(pointList); - - /* // writing one record at time only - convertAndWriteLiveAnalyticsRecord(batchRecords); - */ + for (Record record : batchRecords) { + RejectedRecord rejectedRecord = new RejectedRecord(record,TimestreamSinkErrorCodes.NO_INGESTION_TARGET); + rejectedRecords.add(rejectedRecord); + } + LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Records have been rejected in the batch [{}] , due to [{}]", currentBatch, TimestreamSinkErrorCodes.NO_INGESTION_TARGET); + } + else { + try { + batchRecords = getBatchRecords(records, currentBatch); + if (batchRecords != null && !batchRecords.isEmpty()) { + if (writeToLiveAnalytics) { + + final WriteRecordsRequest writeRequest = WriteRecordsRequest.builder() + .databaseName(databaseName) + .tableName(tableName) + .records(batchRecords) + .build(); + final WriteRecordsResponse writeResponse = clientFactory.getTimestreamClient().writeRecords(writeRequest); + LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: batch size [{}], status [{}] ", batchRecords.size(), writeResponse.sdkHttpResponse().statusCode()); + } else { + LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: LiveAnalytics disabled"); + } + if (writeToInfluxDB && influxWriteApi != null) { + LOGGER.info("INFO::TimeStreamWriter::writeRecords: InfluxDB writing {} records", batchRecords.size()); + + final ArrayList pointList = convertLiveAnalyticsRecord(batchRecords); + // enhance here + influxWriteApi.writePoints(pointList); + + /* // writing one record at time only + convertAndWriteLiveAnalyticsRecord(batchRecords); + */ + } else { + LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: InfluxDB disabled"); + } + } else { + LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: Batch ingestion is complete for the records of size [{}] ", records.size()); } - else { - LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: InfluxDB disabled"); + } catch (RejectedRecordsException e) { + LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Few records have been rejected in the batch [{}] , due to [{}]", currentBatch, e.getLocalizedMessage()); + if (e.hasRejectedRecords()) { + rejectedRecords.addAll(getRejectedTimestreamRecords(e.rejectedRecords(), batchRecords)); } + } catch (SdkException e) { + LOGGER.error("ERROR::TimeStreamWriter::writeRecords", e); } - else { - LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: Batch ingestion is complete for the records of size [{}] ", records.size()); - } - } catch (RejectedRecordsException e) { - LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Few records have been rejected in the batch [{}] , due to [{}]", currentBatch, e.getLocalizedMessage()); - if (e.hasRejectedRecords()) { - rejectedRecords.addAll(getRejectedTimestreamRecords(e.rejectedRecords(), batchRecords)); - } - } catch (SdkException e) { - LOGGER.error("ERROR::TimeStreamWriter::writeRecords", e); } } }