Skip to content

Commit

Permalink
Merge pull request #187 from awslabs/kafka-influx
Browse files Browse the repository at this point in the history
Kafka connector modified to also ingest into Timestream for InfluxDB
  • Loading branch information
nfunke authored Jun 1, 2024
2 parents e9abacb + b298cf5 commit e3670c4
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 40 deletions.
73 changes: 52 additions & 21 deletions integrations/kafka_connector/README.md

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions integrations/kafka_connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@
<version>5.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<!-- <version>6.6.0</version> -->
<version>6.6.0</version>
</dependency>

</dependencies>

<build>
Expand All @@ -136,6 +143,11 @@
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
<!-- for debugging only -->
<!-- <addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.mypackage.MyClass</mainClass> -->
<!-- end for debugging -->
</manifest>
</archive>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,40 @@ public static ConfigDef conf() {
ConfigDef.Type.STRING,
"org.apache.kafka.common.serialization.StringSerializer",
ConfigDef.Importance.LOW,
"Serializer class for Value");
"Serializer class for Value")
/// INFLUXDB
.define(TimestreamSinkConstants.LIVE_ANALYTICS_ENABLE,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.HIGH,
"LiveAnalytics Ingestion Enabled")
.define(TimestreamSinkConstants.INFLUXDB_ENABLE,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.HIGH,
"InfluxDB Ingestion Enabled")
.define(TimestreamSinkConstants.INFLUXDB_BUCKET,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"InfluxDB Target Bucket")
.define(TimestreamSinkConstants.INFLUXDB_URL,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"InfluxDB API URL")
.define(TimestreamSinkConstants.INFLUXDB_TOKEN,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"InfluxDB API Token")
.define(TimestreamSinkConstants.INFLUXDB_ORG,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"InfluxDB Organization")
////////////////////////////////
;
}

/**
Expand Down Expand Up @@ -166,6 +199,69 @@ public Region getAWSRegion() {
}
}

////// INFLUXDB Configs
public boolean isLiveAnalyticsEnabled() {
try {
return getBoolean(TimestreamSinkConstants.LIVE_ANALYTICS_ENABLE);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.LIVE_ANALYTICS_ENABLE, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}

public boolean isInfluxDBEnabled() {
try {
return getBoolean(TimestreamSinkConstants.INFLUXDB_ENABLE);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.INFLUXDB_ENABLE, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}

public String getInfluxDBBucket() {
try {
return getString(TimestreamSinkConstants.INFLUXDB_BUCKET);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.INFLUXDB_BUCKET, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}

public String getInfluxDBUrl() {
try {
return getString(TimestreamSinkConstants.INFLUXDB_URL);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.INFLUXDB_URL, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}

public String getInfluxDBToken() {
try {
return getString(TimestreamSinkConstants.INFLUXDB_TOKEN);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.INFLUXDB_TOKEN, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}
public String getInfluxDBOrg() {
try {
return getString(TimestreamSinkConstants.INFLUXDB_ORG);
} catch (final ConfigException ex) {
final TimestreamSinkConnectorError error = new TimestreamSinkConnectorError(TimestreamSinkErrorCodes.UNKNOWN_CONFIG,
TimestreamSinkConstants.INFLUXDB_ORG, ex);
throw new TimestreamSinkConnectorException(error, ex);
}
}


////////////////////////////////////////////////////////

/**
*
* @return The maximum number of retry attempts for retryable errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,15 @@ public class TimestreamSinkConstants {
*/
public static final String CONST_DLQ_CLIENT_ID = "DLQPublisher";

public static final String LIVE_ANALYTICS_ENABLE = "timestream.target.liveanalytics";

public static final String INFLUXDB_ENABLE = "timestream.target.influxdb";

public static final String INFLUXDB_BUCKET = "timestream.influxdb.bucket";

public static final String INFLUXDB_URL = "timestream.influxdb.url";

public static final String INFLUXDB_TOKEN = "timestream.influxdb.token";

public static final String INFLUXDB_ORG = "timestream.influxdb.org";
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,11 @@ public class TimestreamSinkErrorCodes {
*/
public static final String INVALID_MEASURE_VALUE ="invalid.measure.value";

/**
* Error code: no target engine specified (InfluxDB or LiveAnalytics)
*/
public static final String NO_INGESTION_TARGET ="No ingestion engine specified, enable either LiveAnalytics or InfluxDB";

//// INFLUX DB
// #TODO fix later to add detailed errors
}
Loading

0 comments on commit e3670c4

Please sign in to comment.