-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka influx #187
Kafka influx #187
Conversation
this.influxDBUrl, this.influxDBToken, this.influxDBBucket, this.influxDBOrg | ||
); | ||
if (this.influxDBClient != null) { | ||
LOGGER.error("INFO::TimeStreamWriter:: influxDB client successfull connected: [{}] [{}] [{}]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in latest commit
); | ||
this.influxWriteApi = getInfluxDBWriteApi(this.influxDBClient); | ||
if (this.influxWriteApi != null) { | ||
LOGGER.error("INFO::TimeStreamWriter:: influxDB writer API successfull connected: [{}] [{}] [{}]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in latest commit
LOGGER.error("INFO::TimeStreamWriter:: influxDB writer API successfull connected: [{}] [{}] [{}]", | ||
this.influxDBUrl, this.influxDBBucket, influxDBOrg | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.error if this is null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in latest commit
} | ||
} | ||
|
||
if (!this.influxDBEnabled && !this.liveAnalyticsEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be the first check in this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in latest commit
@@ -81,6 +139,8 @@ public TimestreamWriter(final DataModel schemaDefinition, final TimestreamSinkCo | |||
* @param sinkRecords List of incoming records from the source Kafka topic | |||
*/ | |||
public List<RejectedRecord> writeRecords(final AWSServiceClientFactory clientFactory, final Collection<SinkRecord> sinkRecords) { | |||
Boolean writeToLiveAnalytics = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, this was old code, replaced with configurable settings that were already implemented, also in latest commit
} | ||
else { | ||
try { | ||
batchRecords = getBatchRecords(records, currentBatch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not generate influxdb write records directly from 'records'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was a possible thought, but wanted to keep logic of max records simple as already done before, otherwise another logic needs to apply of how to limit total number of records to InfluxDB. Certainly open to revisit
Description of changes: Enhanced Connector to ingest records from Kafka Topic into LiveAnalytics and/or InfluxDB
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.