Skip to content

Commit

Permalink
OZ-163 + OZ-323: Follow-up updates connected to Flink jobs & Analytic…
Browse files Browse the repository at this point in the history
…s refactoring. (#11)

* OZ-323: Updates from testing against https://github.com/ozone-his/ozone-analytics

* OZ-323: Update JdbcCatalog

* OZ-323: Update JdbcCatalog

* Update src/main/java/com/ozonehis/data/pipelines/streaming/StreamingETLJob.java

---------

Co-authored-by: Kipchumba Bett <kipchubett@gmail.com>
  • Loading branch information
enyachoke and corneliouzbett authored Oct 19, 2023
1 parent 2a52f2d commit 929e7c2
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 41 deletions.
16 changes: 9 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
FROM maven:3.8.6-eclipse-temurin-8 as builder
FROM maven:3.8.6-eclipse-temurin-11 as builder
# add pom.xml and source code
ADD ./pom.xml pom.xml
#cache dependencies
RUN mvn dependency:go-offline
ADD ./src src/
RUN mvn clean package

FROM flink:1.14.5-scala_2.12-java8
FROM flink:1.17.1-scala_2.12-java11
ARG JAR_VERSION=1.0.0-SNAPSHOT
RUN wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.5/flink-connector-jdbc_2.12-1.14.5.jar -O /opt/flink/lib/flink-connector-jdbc_2.12-1.14.5.jar
RUN wget https://repo1.maven.org/maven2/org/apache/flink/flink-parquet_2.12/1.14.5/flink-parquet_2.12-1.14.5.jar -O /opt/flink/lib/flink-parquet_2.12-1.14.5.jar
RUN wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar -O /opt/flink/lib/flink-connector-jdbc-3.1.1-1.17.jar
RUN wget https://repo1.maven.org/maven2/org/apache/flink/flink-parquet/1.17.1/flink-parquet-1.17.1.jar -O /opt/flink/lib/flink-parquet-1.17.1.jar
RUN wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar -O /opt/flink/lib/flink-sql-connector-kafka-1.17.1.jar
RUN wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar -O /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
RUN wget https://repo1.maven.org/maven2/org/apache/parquet/parquet-hadoop/1.12.2/parquet-hadoop-1.12.2.jar -O /opt/flink/lib/parquet-hadoop-1.12.2.jar
RUN wget https://repo1.maven.org/maven2/org/apache/parquet/parquet-common/1.12.2/parquet-common-1.12.2.jar -O /opt/flink/lib/parquet-common-1.12.2.jar
RUN wget https://repo1.maven.org/maven2/org/apache/parquet/parquet-hadoop/1.13.1/parquet-hadoop-1.13.1.jar -O /opt/flink/lib/parquet-hadoop-1.13.1.jar
RUN wget https://repo1.maven.org/maven2/org/apache/parquet/parquet-common/1.13.1/parquet-common-1.13.1.jar -O /opt/flink/lib/parquet-common-1.13.1.jar
RUN wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.13/httpclient-4.5.13.jar -O /opt/flink/lib/httpclient-4.5.13.jar
RUN wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.15/httpcore-4.4.15.jar -O /opt/flink/lib/httpcore-4.4.15.jar
RUN wget https://repo1.maven.org/maven2/com/ecwid/consul/consul-api/1.4.5/consul-api-1.4.5.jar -O /opt/flink/lib/consul-api-1.4.5.jar
RUN wget https://repo1.maven.org/maven2/com/google/code/gson/gson/2.9.0/gson-2.9.0.jar -O /opt/flink/lib/gson-2.9.0.jar
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.17.1/flink-json-1.17.1.jar
RUN wget -P /opt/flink/lib/ https://jdbc.postgresql.org/download/postgresql-42.6.0.jar
COPY --from=builder target/flink-jobs-${JAR_VERSION}-etl-streaming.jar /opt/flink/usrlib/streaming-etl-job.jar
COPY run.sh /run.sh
RUN chmod +x /run.sh
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile_batch
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FROM maven:3.8.6-eclipse-temurin-8 as builder
FROM maven:3.8.6-eclipse-temurin-11 as builder
# add pom.xml and source code
ADD ./pom.xml pom.xml
RUN mvn dependency:go-offline
ADD ./src src/
RUN mvn clean install -Pbatch

FROM eclipse-temurin:8-jre
FROM eclipse-temurin:11-jre
ARG JAR_VERSION=1.0.0-SNAPSHOT
ENV OUTPUT_DIR=/parquet
COPY --from=builder target/flink-jobs-${JAR_VERSION}-etl-batch.jar etl-batch.jar
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile_parquet_export
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FROM maven:3.8.6-eclipse-temurin-8 as builder
FROM maven:3.8.6-eclipse-temurin-11 as builder
# add pom.xml and source code
ADD ./pom.xml pom.xml
RUN mvn dependency:go-offline
ADD ./src src/
RUN mvn clean install -Pbatch

FROM eclipse-temurin:8-jre
FROM eclipse-temurin:11-jre
ARG JAR_VERSION=1.0.0-SNAPSHOT
ENV OUTPUT_DIR=/parquet
COPY --from=builder target/flink-jobs-${JAR_VERSION}-etl-export.jar etl-export.jar
Expand Down
41 changes: 23 additions & 18 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

Expand All @@ -16,7 +17,7 @@
</organization>
<developers>
<developer>
<name>Mekom Solutions</name>
<name>Mekom Solutions</name>
<url>https://www.mekomsolutions.com</url>
</developer>
</developers>
Expand All @@ -27,7 +28,7 @@
<junit.version>4.11</junit.version>
<log4j.version>2.20.0</log4j.version>
<flink.version>1.17.1</flink.version>
<target.java.version>1.8</target.java.version>
<target.java.version>11</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
Expand All @@ -42,7 +43,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -54,19 +55,19 @@
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
<type>pom</type>
<scope>provided</scope>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -84,7 +85,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.0.0-1.16</version>
<version>3.1.1-1.17</version>
<scope>${scope}</scope>
</dependency>
<dependency>
Expand All @@ -109,16 +110,16 @@
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down Expand Up @@ -217,11 +218,13 @@
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.ozonehis.data.pipelines.streaming.StreamingETLJob
<mainClass>
com.ozonehis.data.pipelines.streaming.StreamingETLJob
</mainClass>
</transformer>
</transformers>
<finalName>${project.artifactId}-${project.version}-etl-streaming</finalName>
<finalName>
${project.artifactId}-${project.version}-etl-streaming</finalName>
</configuration>
</execution>
</executions>
Expand Down Expand Up @@ -276,7 +279,8 @@
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.ozonehis.data.pipelines.export.BatchParquetExport</mainClass>
<mainClass>
com.ozonehis.data.pipelines.export.BatchParquetExport</mainClass>
</transformer>
</transformers>
<finalName>${project.artifactId}-${project.version}-etl-export</finalName>
Expand Down Expand Up @@ -315,7 +319,7 @@
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.ozonehis.data.pipelines.batch.BatchETLJob
</mainClass>
</mainClass>
</transformer>
</transformers>
<finalName>${project.artifactId}-${project.version}-etl-batch</finalName>
Expand Down Expand Up @@ -369,7 +373,8 @@

<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<!-- This improves the out-of-the-box experience in Eclipse by resolving some
warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/com/ozonehis/data/pipelines/batch/BatchETLJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
import com.ozonehis.data.pipelines.utils.ConnectorUtils;
import com.ozonehis.data.pipelines.utils.Environment;

import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -75,9 +79,9 @@ public static void main(String[] args) throws Exception {
String odooDBurl = String.format("jdbc:postgresql://%s:%s/%s?sslmode=disable", odooDBhost, odooDBport, odooDBName);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSettings);
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
JdbcCatalog catalog = new JdbcCatalog(ClassLoader.getSystemClassLoader(),name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("analytics", catalog);
Stream<QueryFile> tables = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_SOURCE_TABLES_PATH", "")).stream();
Stream<QueryFile> tables = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_SOURCE_TABLES_PATH", "/analytics/source_tables")).stream();
tables.forEach(s -> {
Map<String, String> connectorOptions = null;
if (s.parent.equals("openmrs")) {
Expand All @@ -95,7 +99,7 @@ public static void main(String[] args) throws Exception {
+ ConnectorUtils.propertyJoiner(",", "=").apply(connectorOptions) + ")";
tableEnv.executeSql(queryDSL);
});
List<QueryFile> queries = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_QUERIES_PATH", ""));
List<QueryFile> queries = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_QUERIES_PATH", "/analytics/queries"));
StatementSet stmtSet = tableEnv.createStatementSet();
for (QueryFile query : queries) {
String queryDSL = "INSERT INTO `analytics`.`analytics`.`" + query.fileName + "`\n" + query.content;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
import com.ozonehis.data.pipelines.utils.QueryFile;
import com.ozonehis.data.pipelines.utils.Environment;

import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -30,23 +34,23 @@ public static void main(String[] args) throws Exception {
String baseUrl = String.format("jdbc:postgresql://%s:%s", Environment.getEnv("ANALYTICS_DB_HOST", "localhost"),
Environment.getEnv("ANALYTICS_DB_PORT", "5432"));
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSettings);
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
JdbcCatalog catalog = new JdbcCatalog(ClassLoader.getSystemClassLoader(),name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("analytics", catalog);
Stream<QueryFile> tables = CommonUtils.getSQL(Environment.getEnv("EXPORT_DESTINATION_TABLES_PATH", "")).stream();
Stream<QueryFile> tables = CommonUtils.getSQL(Environment.getEnv("EXPORT_DESTINATION_TABLES_PATH", "/export/destination-tables")).stream();
tables.forEach(s -> {
Map<String, String> connectorOptions = Stream
.of(new String[][] { { "connector", "filesystem" }, { "format", "parquet" },
{ "sink.rolling-policy.file-size", "10MB" },
{ "path",
Environment.getEnv("EXPORT_OUTPUT_PATH", "/tmp") + "/" + s.fileName + "/"
Environment.getEnv("EXPORT_OUTPUT_PATH", "/parquet") + "/" + s.fileName + "/"
+ Environment.getEnv("EXPORT_OUTPUT_TAG", "location1") + "/"
+ DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now()) }, })
.collect(Collectors.toMap(data -> data[0], data -> data[1]));
String queryDSL = s.content + "\n" + " WITH (\n"
+ ConnectorUtils.propertyJoiner(",", "=").apply(connectorOptions) + ")";
tableEnv.executeSql(queryDSL);
});
List<QueryFile> queries = CommonUtils.getSQL(Environment.getEnv("EXPORT_SOURCE_QUERIES_PATH", ""));
List<QueryFile> queries = CommonUtils.getSQL(Environment.getEnv("EXPORT_SOURCE_QUERIES_PATH", "/export/queries"));
StatementSet stmtSet = tableEnv.createStatementSet();
for (QueryFile query : queries) {
// String queryDSL = "INSERT INTO `analytics`.`analytics`.`" + query.fileName +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,21 @@ public static void main(String[] args) {
String baseUrl = String.format("jdbc:postgresql://%s:%s", Environment.getEnv("ANALYTICS_DB_HOST", "localhost"),
Environment.getEnv("ANALYTICS_DB_PORT", "5432"));
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSettings);
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
JdbcCatalog catalog = new JdbcCatalog(ClassLoader.getSystemClassLoader(),name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("analytics", catalog);
Stream<QueryFile> tables = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_SOURCE_TABLES_PATH", "")).stream();
Stream<QueryFile> tables = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_SOURCE_TABLES_PATH", "/analytics/source-tables")).stream();
tables.forEach(s -> {
Map<String, String> connectorOptions = null;
if (s.parent.equals("openmrs")) {
connectorOptions = Stream.of(
new String[][] { { "connector", "kafka" }, { "properties.bootstrap.servers", "localhost:29092" },
new String[][] { { "connector", "kafka" }, { "properties.bootstrap.servers", Environment.getEnv("ANALYTICS_KAFKA_URL", "localhost:29092") },
{ "properties.group.id", "flink" }, { "topic", String.format("openmrs.openmrs.%s", s.fileName) },
{ "scan.startup.mode", "earliest-offset" },
{ "value.debezium-json.ignore-parse-errors", "true" }, { "value.format", "debezium-json" }, })
.collect(Collectors.toMap(data -> data[0], data -> data[1]));
} else if (s.parent.equals("odoo")) {
connectorOptions = Stream.of(new String[][] { { "connector", "kafka" },
{ "properties.bootstrap.servers", "localhost:29092" }, { "properties.group.id", "flink" },
{ "properties.bootstrap.servers", Environment.getEnv("ANALYTICS_KAFKA_URL", "localhost:29092") }, { "properties.group.id", "flink" },
{ "topic", String.format("odoo.public.%s", s.fileName) }, { "scan.startup.mode", "earliest-offset" },
{ "value.debezium-json.ignore-parse-errors", "true" }, { "value.format", "debezium-json" }, })
.collect(Collectors.toMap(data -> data[0], data -> data[1]));
Expand All @@ -82,7 +82,7 @@ public static void main(String[] args) {
+ ConnectorUtils.propertyJoiner(",", "=").apply(connectorOptions) + ")";
tableEnv.executeSql(queryDSL);
});
List<QueryFile> queries = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_QUERIES_PATH", ""));
List<QueryFile> queries = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_QUERIES_PATH", "/analytics/queries"));
StatementSet stmtSet = tableEnv.createStatementSet();
for (QueryFile query : queries) {
String queryDSL = "INSERT INTO `analytics`.`analytics`.`" + query.fileName + "`\n" + query.content;
Expand Down

0 comments on commit 929e7c2

Please sign in to comment.