-
Notifications
You must be signed in to change notification settings - Fork 178
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #174 from awslabs/kafka_connector
Initial commit: Timestream Kafka Sink Connector
- Loading branch information
Showing
23 changed files
with
2,242 additions
and
0 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>software.amazon.timestream</groupId> | ||
<artifactId>kafka-connector-timestream</artifactId> | ||
<version>1.0-SNAPSHOT</version> | ||
<packaging>jar</packaging> | ||
|
||
<name>kafka-connector-timestream</name> | ||
|
||
<properties> | ||
<kafka.version>2.8.1</kafka.version> | ||
<junit.version>4.13.1</junit.version> | ||
<java.version>1.8</java.version> | ||
<awssdk.version>2.20.121</awssdk.version> | ||
<slf4j.version>1.7.32</slf4j.version> | ||
<lombok.version>1.18.30</lombok.version> | ||
<jacoco.version>0.8.7</jacoco.version> | ||
<maven.compiler.source>${java.version}</maven.compiler.source> | ||
<maven.compiler.target>${java.version}</maven.compiler.target> | ||
</properties> | ||
<dependencyManagement> | ||
<dependencies> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>bom</artifactId> | ||
<version>${awssdk.version}</version> | ||
<type>pom</type> | ||
<scope>import</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>core</artifactId> | ||
<version>2.20.151</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>sts</artifactId> | ||
<version>2.20.151</version> | ||
</dependency> | ||
</dependencies> | ||
</dependencyManagement> | ||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>connect-api</artifactId> | ||
<version>${kafka.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<version>${junit.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-api</artifactId> | ||
<version>${slf4j.version}</version> | ||
</dependency> | ||
|
||
<!-- Lombok --> | ||
<dependency> | ||
<groupId>org.projectlombok</groupId> | ||
<artifactId>lombok</artifactId> | ||
<version>${lombok.version}</version> | ||
<scope>compile</scope> | ||
</dependency> | ||
<!-- aws dependencies--> | ||
<dependency> | ||
<groupId>com.google.code.gson</groupId> | ||
<artifactId>gson</artifactId> | ||
<version>2.10.1</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>auth</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>apache-client</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>s3</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>sts</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>timestreamwrite</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>regions</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>aws-crt-client</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.mockito</groupId> | ||
<artifactId>mockito-core</artifactId> | ||
<version>3.12.4</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.amazonaws</groupId> | ||
<artifactId>aws-java-sdk-core</artifactId> | ||
<version>1.12.524</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-api</artifactId> | ||
<version>5.8.0</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-engine</artifactId> | ||
<version>5.8.0</version> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-jar-plugin</artifactId> | ||
<configuration> | ||
<archive> | ||
<manifest> | ||
<addDefaultImplementationEntries>true</addDefaultImplementationEntries> | ||
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries> | ||
</manifest> | ||
</archive> | ||
</configuration> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-compiler-plugin</artifactId> | ||
<version>2.5.1</version> | ||
<inherited>true</inherited> | ||
<configuration> | ||
<source>8</source> | ||
<target>8</target> | ||
</configuration> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-assembly-plugin</artifactId> | ||
<version>3.3.0</version> | ||
<configuration> | ||
<descriptorRefs> | ||
<descriptorRef>jar-with-dependencies</descriptorRef> | ||
</descriptorRefs> | ||
</configuration> | ||
<executions> | ||
<execution> | ||
<id>make-assembly</id> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>single</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.jacoco</groupId> | ||
<artifactId>jacoco-maven-plugin</artifactId> | ||
<version>0.8.4</version> | ||
<executions> | ||
<execution> | ||
<goals> | ||
<goal>prepare-agent</goal> | ||
</goals> | ||
</execution> | ||
<execution> | ||
<id>report</id> | ||
<phase>prepare-package</phase> | ||
<goals> | ||
<goal>report</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
<resources> | ||
<resource> | ||
<directory>src/main/resources</directory> | ||
<filtering>true</filtering> | ||
</resource> | ||
</resources> | ||
</build> | ||
</project> |
Binary file added
BIN
+34.8 KB
integrations/kafka_connector/resources/Data Flow - Timestream Sink Connector.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
40 changes: 40 additions & 0 deletions
40
integrations/kafka_connector/src/main/assembly/package.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 | ||
http://maven.apache.org/xsd/assembly-1.1.2.xsd"> | ||
<!-- Assembles a packaged version targeting OS installation. --> | ||
<id>package</id> | ||
<formats> | ||
<format>dir</format> | ||
</formats> | ||
<includeBaseDirectory>false</includeBaseDirectory> | ||
<fileSets> | ||
<fileSet> | ||
<directory>${project.basedir}</directory> | ||
<outputDirectory>share/doc/${project.name}/</outputDirectory> | ||
<includes> | ||
<include>README*</include> | ||
<include>LICENSE*</include> | ||
<include>NOTICE*</include> | ||
<include>licenses/</include> | ||
</includes> | ||
</fileSet> | ||
<fileSet> | ||
<directory>${project.basedir}/config</directory> | ||
<outputDirectory>etc/${project.name}</outputDirectory> | ||
<includes> | ||
<include>*</include> | ||
</includes> | ||
</fileSet> | ||
</fileSets> | ||
<dependencySets> | ||
<dependencySet> | ||
<outputDirectory>share/java/${project.name}</outputDirectory> | ||
<useProjectArtifact>true</useProjectArtifact> | ||
<useTransitiveFiltering>true</useTransitiveFiltering> | ||
<excludes> | ||
<exclude>org.apache.kafka:connect-api</exclude> | ||
</excludes> | ||
</dependencySet> | ||
</dependencySets> | ||
</assembly> |
71 changes: 71 additions & 0 deletions
71
...ons/kafka_connector/src/main/java/software/amazon/timestream/TimestreamSinkConnector.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
package software.amazon.timestream; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
import org.apache.kafka.common.config.ConfigDef; | ||
import org.apache.kafka.connect.connector.Task; | ||
import org.apache.kafka.connect.sink.SinkConnector; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* Class that implements {@link SinkConnector} - meant for starting the connector, | ||
* loading configurations and stopping the connector | ||
*/ | ||
public class TimestreamSinkConnector extends SinkConnector { | ||
/** | ||
* Logger object | ||
*/ | ||
private static final Logger LOGGER = LoggerFactory.getLogger(TimestreamSinkConnector.class); | ||
/** | ||
* Timestream Sink Connector Config values | ||
*/ | ||
private TimestreamSinkConnectorConfig connectorConfig; | ||
|
||
@Override | ||
public String version() { | ||
return "1.0"; | ||
} | ||
|
||
@Override | ||
public void start(final Map<String, String> props) { | ||
LOGGER.info("Begin::TimestreamSinkConnector::start"); | ||
connectorConfig = new TimestreamSinkConnectorConfig(props); | ||
LOGGER.info("Complete::TimestreamSinkConnector::start"); | ||
} | ||
|
||
@Override | ||
public Class<? extends Task> taskClass() { | ||
return TimestreamSinkTask.class; | ||
} | ||
|
||
@Override | ||
public List<Map<String, String>> taskConfigs(final int maxTasks) { | ||
LOGGER.trace("Begin::TimestreamSinkConnector::taskConfigs"); | ||
if (maxTasks == 0) { | ||
LOGGER.warn("No Connector tasks have been configured."); | ||
} | ||
final List<Map<String, String>> configs = new ArrayList<>(); | ||
for (int i = 0; i < maxTasks; i++) { | ||
configs.add(new ConcurrentHashMap<>(connectorConfig.originalsStrings())); | ||
} | ||
LOGGER.trace("Complete::TimestreamSinkConnector::taskConfigs"); | ||
return configs; | ||
} | ||
|
||
@Override | ||
public void stop() { | ||
LOGGER.info("Begin::TimestreamSinkConnector::stop"); | ||
} | ||
|
||
@Override | ||
public ConfigDef config() { | ||
LOGGER.trace("Begin::TimestreamSinkConnector::config"); | ||
final ConfigDef conf = TimestreamSinkConnectorConfig.conf(); | ||
LOGGER.trace("Complete::TimestreamSinkConnector::config"); | ||
return conf; | ||
} | ||
} |
Oops, something went wrong.