Skip to content

Commit

Permalink
Added error messages when no ingestion target enabled (InfluxDB or Li…
Browse files Browse the repository at this point in the history
…veAnalytics)
  • Loading branch information
nfunke committed May 13, 2024
1 parent e711859 commit 2b0da1a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public static ConfigDef conf() {
/// INFLUXDB
.define(TimestreamSinkConstants.LIVE_ANALYTICS_ENABLE,
ConfigDef.Type.BOOLEAN,
true,
false,
ConfigDef.Importance.HIGH,
"LiveAnalytics Ingestion Enabled")
.define(TimestreamSinkConstants.INFLUXDB_ENABLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public class TimestreamSinkErrorCodes {
*/
public static final String INVALID_MEASURE_VALUE ="invalid.measure.value";

/**
* Error code: no target engine specified (InfluxDB or LiveAnalytics)
*/
public static final String NO_INGESTION_TARGET ="No ingestion engine specified, enable either LiveAnalytics or InfluxDB";

//// INFLUX DB
// #TODO fix later to add detailed errors
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ public TimestreamWriter(final DataModel schemaDefinition, final TimestreamSinkCo
}
}

if (!this.influxDBEnabled && !this.liveAnalyticsEnabled) {
LOGGER.error("ERROR::TimeStreamWriter:: initialization failed on : [{}]", TimestreamSinkErrorCodes.NO_INGESTION_TARGET);

}

/////////////////////////
}

Expand All @@ -144,47 +149,55 @@ public List<RejectedRecord> writeRecords(final AWSServiceClientFactory clientFac
final int batchSize = records.size() / TimestreamSinkConstants.DEFAULT_BATCHSIZE + 1;
List<Record> batchRecords = null;
for (int currentBatch = 0; currentBatch < batchSize; currentBatch ++) {
try {
if (!writeToInfluxDB && !writeToLiveAnalytics) {
// no target specified, cannot write, send records to DLQ
batchRecords = getBatchRecords(records, currentBatch);
if (batchRecords != null && !batchRecords.isEmpty()) {
if (writeToLiveAnalytics) {

final WriteRecordsRequest writeRequest = WriteRecordsRequest.builder()
.databaseName(databaseName)
.tableName(tableName)
.records(batchRecords)
.build();
final WriteRecordsResponse writeResponse = clientFactory.getTimestreamClient().writeRecords(writeRequest);
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: batch size [{}], status [{}] ", batchRecords.size(), writeResponse.sdkHttpResponse().statusCode());
}
else {
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: LiveAnalytics disabled");
}
if (writeToInfluxDB && influxWriteApi != null) {
LOGGER.info("INFO::TimeStreamWriter::writeRecords: InfluxDB writing {} records", batchRecords.size());

final ArrayList<Point> pointList = convertLiveAnalyticsRecord(batchRecords);
// enhance here
influxWriteApi.writePoints(pointList);

/* // writing one record at time only
convertAndWriteLiveAnalyticsRecord(batchRecords);
*/
for (Record record : batchRecords) {
RejectedRecord rejectedRecord = new RejectedRecord(record,TimestreamSinkErrorCodes.NO_INGESTION_TARGET);
rejectedRecords.add(rejectedRecord);
}
LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Records have been rejected in the batch [{}] , due to [{}]", currentBatch, TimestreamSinkErrorCodes.NO_INGESTION_TARGET);
}
else {
try {
batchRecords = getBatchRecords(records, currentBatch);
if (batchRecords != null && !batchRecords.isEmpty()) {
if (writeToLiveAnalytics) {

final WriteRecordsRequest writeRequest = WriteRecordsRequest.builder()
.databaseName(databaseName)
.tableName(tableName)
.records(batchRecords)
.build();
final WriteRecordsResponse writeResponse = clientFactory.getTimestreamClient().writeRecords(writeRequest);
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: batch size [{}], status [{}] ", batchRecords.size(), writeResponse.sdkHttpResponse().statusCode());
} else {
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: LiveAnalytics disabled");
}
if (writeToInfluxDB && influxWriteApi != null) {
LOGGER.info("INFO::TimeStreamWriter::writeRecords: InfluxDB writing {} records", batchRecords.size());

final ArrayList<Point> pointList = convertLiveAnalyticsRecord(batchRecords);
// enhance here
influxWriteApi.writePoints(pointList);

/* // writing one record at time only
convertAndWriteLiveAnalyticsRecord(batchRecords);
*/
} else {
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: InfluxDB disabled");
}
} else {
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: Batch ingestion is complete for the records of size [{}] ", records.size());
}
else {
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: InfluxDB disabled");
} catch (RejectedRecordsException e) {
LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Few records have been rejected in the batch [{}] , due to [{}]", currentBatch, e.getLocalizedMessage());
if (e.hasRejectedRecords()) {
rejectedRecords.addAll(getRejectedTimestreamRecords(e.rejectedRecords(), batchRecords));
}
} catch (SdkException e) {
LOGGER.error("ERROR::TimeStreamWriter::writeRecords", e);
}
else {
LOGGER.debug("DEBUG::TimeStreamWriter::writeRecords: Batch ingestion is complete for the records of size [{}] ", records.size());
}
} catch (RejectedRecordsException e) {
LOGGER.error("ERROR::TimeStreamWriter::writeRecords: Few records have been rejected in the batch [{}] , due to [{}]", currentBatch, e.getLocalizedMessage());
if (e.hasRejectedRecords()) {
rejectedRecords.addAll(getRejectedTimestreamRecords(e.rejectedRecords(), batchRecords));
}
} catch (SdkException e) {
LOGGER.error("ERROR::TimeStreamWriter::writeRecords", e);
}
}
}
Expand Down

0 comments on commit 2b0da1a

Please sign in to comment.