Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka influx #187

Merged
merged 9 commits into from
Jun 1, 2024
Merged

Kafka influx #187

merged 9 commits into from
Jun 1, 2024

Conversation

nfunke
Copy link
Contributor

@nfunke nfunke commented May 14, 2024

Description of changes: Enhanced Connector to ingest records from Kafka Topic into LiveAnalytics and/or InfluxDB

  • Added configuration parameter for InfluxDB and flags for Target LiveAnalytics/InfluxDB or both
  • Updated documentation to describe new parameters

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

this.influxDBUrl, this.influxDBToken, this.influxDBBucket, this.influxDBOrg
);
if (this.influxDBClient != null) {
LOGGER.error("INFO::TimeStreamWriter:: influxDB client successfull connected: [{}] [{}] [{}]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOGGER.info?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in latest commit

);
this.influxWriteApi = getInfluxDBWriteApi(this.influxDBClient);
if (this.influxWriteApi != null) {
LOGGER.error("INFO::TimeStreamWriter:: influxDB writer API successfull connected: [{}] [{}] [{}]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOGGER.info

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in latest commit

LOGGER.error("INFO::TimeStreamWriter:: influxDB writer API successfull connected: [{}] [{}] [{}]",
this.influxDBUrl, this.influxDBBucket, influxDBOrg
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOGGER.error if this is null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in latest commit

}
}

if (!this.influxDBEnabled && !this.liveAnalyticsEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be the first check in this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in latest commit

@@ -81,6 +139,8 @@ public TimestreamWriter(final DataModel schemaDefinition, final TimestreamSinkCo
* @param sinkRecords List of incoming records from the source Kafka topic
*/
public List<RejectedRecord> writeRecords(final AWSServiceClientFactory clientFactory, final Collection<SinkRecord> sinkRecords) {
Boolean writeToLiveAnalytics = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, this was old code, replaced with configurable settings that were already implemented, also in latest commit

}
else {
try {
batchRecords = getBatchRecords(records, currentBatch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not generate influxdb write records directly from 'records'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was a possible thought, but wanted to keep logic of max records simple as already done before, otherwise another logic needs to apply of how to limit total number of records to InfluxDB. Certainly open to revisit

@nfunke nfunke merged commit e3670c4 into mainline Jun 1, 2024
4 checks passed
@nfunke nfunke deleted the kafka-influx branch June 1, 2024 18:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants