Skip to content

Commit

Permalink
feat/replace old properties by fetch type (#380)
Browse files Browse the repository at this point in the history
* feat(fetchType): refactor fetch, store, and fetchOne to use fetchType

Updated tests class

Updated .yml flows sample

Updated class of type Batch, Trigger, and Query

Clean unused imports on some class

updated doc messages, use setter and getters for old properties, FetchType.NONE as default

close #374
  • Loading branch information
mgabelle authored Sep 13, 2024
1 parent a36337e commit 5d966ea
Show file tree
Hide file tree
Showing 89 changed files with 262 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
username: db_user
password: db_password
sql: select * FROM departments
fetch: true
fetchType: FETCH
"""
),
@Example(
Expand All @@ -59,7 +59,7 @@
username: dremio_user
password: dremio_password
sql: select * FROM departments
fetch: true
fetchType: FETCH
"""
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
url: jdbc:arrow-flight-sql://dremio-coordinator:32010/?schema=postgres.public
interval: "PT5M"
sql: "SELECT * FROM my_table"
fetch: true
fetchType: FETCH
"""
)
}
Expand All @@ -69,6 +69,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.store(this.isStore())
.fetch(this.isFetch())
.fetchOne(this.isFetchOne())
.fetchType(this.getFetchType())
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
username: as400_user
password: as400_password
sql: select * from as400_types
fetchOne: true
fetchType: FETCH_ONE
"""
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
username: as400_user
password: as400_password
sql: "SELECT * FROM my_table"
fetch: true
fetchType: FETCH
"""
)
}
Expand All @@ -70,6 +70,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetch(this.isFetch())
.store(this.isStore())
.fetchOne(this.isFetchOne())
.fetchType(this.getFetchType())
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
username: ch_user
password: ch_password
sql: select * from clickhouse_types
store: true
fetchType: STORE
"""
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
username: ch_user
password: ch_password
sql: "SELECT * FROM my_table"
fetch: true
fetchType: FETCH
"""
)
}
Expand All @@ -68,6 +68,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetch(this.isFetch())
.store(this.isStore())
.fetchOne(this.isFetchOne())
.fetchType(this.getFetchType())
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
package io.kestra.plugin.jdbc.clickhouse;

import com.clickhouse.client.internal.google.protobuf.UInt32Value;
import com.clickhouse.client.internal.google.protobuf.UInt64Value;
import com.clickhouse.client.internal.google.type.Decimal;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.value.ClickHouseNestedValue;
import com.clickhouse.data.value.ClickHouseTupleValue;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.tasks.common.FetchType;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.jdbc.AbstractJdbcBatch;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractRdbmsTest;
import org.h2.value.ValueTinyint;
import org.junit.jupiter.api.Test;
import reactor.util.function.Tuple2;

import java.io.*;
import java.math.BigDecimal;
Expand All @@ -26,10 +19,10 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.List;

import static io.kestra.core.models.tasks.common.FetchType.FETCH;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

Expand All @@ -44,7 +37,7 @@ void select() throws Exception {
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.fetchType(FetchType.FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("select * from clickhouse_types")
.build();
Expand Down Expand Up @@ -82,7 +75,7 @@ void update() throws Exception {
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.fetchType(FetchType.FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("ALTER TABLE clickhouse_types UPDATE String = 'D' WHERE String = 'four'")
.build();
Expand All @@ -96,7 +89,7 @@ void update() throws Exception {
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.fetchType(FetchType.FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("select String from clickhouse_types")
.build();
Expand Down Expand Up @@ -140,7 +133,7 @@ void updateBatch() throws Exception {
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetch(true)
.fetchType(FETCH)
.timeZoneId("Europe/Paris")
.sql("select String from clickhouse_types")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ triggers:
type: io.kestra.plugin.jdbc.clickhouse.Trigger
sql: SELECT * FROM clickhouse_types
url: jdbc:clickhouse://127.0.0.1:28123/default
fetch: true
fetchType: FETCH
interval: PT10S

tasks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
Expand All @@ -14,7 +13,6 @@
import lombok.experimental.SuperBuilder;

import java.net.URI;
import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
Expand Down Expand Up @@ -44,7 +42,7 @@
username: db2inst
password: db2_password
sql: select * from db2_types
fetchOne: true
fetchType: FETCH_ONE
"""
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
username: db2inst
password: db2_password
sql: "SELECT * FROM my_table"
fetch: true
fetchType: FETCH
"""
)
}
Expand All @@ -68,6 +68,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetch(this.isFetch())
.store(this.isStore())
.fetchOne(this.isFetchOne())
.fetchType(this.getFetchType())
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractRdbmsTest;
import io.micronaut.context.annotation.Requires;
import io.kestra.core.junit.annotations.KestraTest;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIf;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import java.io.FileNotFoundException;
import java.math.BigDecimal;
Expand All @@ -20,6 +16,7 @@
import java.time.LocalDate;
import java.time.LocalTime;

import static io.kestra.core.models.tasks.common.FetchType.FETCH_ONE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

Expand All @@ -42,7 +39,7 @@ void checkInitiation() throws Exception {
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.fetchType(FETCH_ONE)
.sql("SELECT 1 AS result FROM SYSIBM.SYSDUMMY1")
.build();

Expand All @@ -61,7 +58,7 @@ void select() throws Exception {
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("select * from db2_types")
.build();
Expand Down Expand Up @@ -104,7 +101,7 @@ void update() throws Exception {
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("update db2_types set VARCHAR_col = 'VARCHAR_col'")
.build();
Expand All @@ -115,7 +112,7 @@ void update() throws Exception {
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("select VARCHAR_col from db2_types")
.build();
Expand All @@ -133,7 +130,7 @@ void updateBlob() throws Exception {
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("update db2_types set BLOB_col = CAST('VARCHAR_col' AS BLOB)")
.build();
Expand All @@ -144,7 +141,7 @@ void updateBlob() throws Exception {
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchOne(true)
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("select BLOB_col from db2_types")
.build();
Expand Down
2 changes: 1 addition & 1 deletion plugin-jdbc-db2/src/test/resources/flows/db2-listen.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ triggers:
username: db2inst1
password: password
sql: select * from db2_types
fetch: true
fetchType: FETCH
interval: PT10S

tasks:
Expand Down
2 changes: 1 addition & 1 deletion plugin-jdbc-db2/src/test/resources/flows/read_db2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ tasks:
username: db2inst1
password: password
sql: select * from db2_types
fetchOne: true
fetchType: FETCH_ONE
- id: flow-id
type: io.kestra.plugin.core.debug.Return
format: "{{outputs.update.row}}"
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
username: dremio_token
password: samplePersonalAccessToken
sql: select * FROM source.database.table
fetchOne: true
fetchType: FETCH_ONE
"""
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
username: dremio_token
password: samplePersonalAccessToken
sql: "SELECT * FROM source.database.my_table"
fetch: true
fetchType: FETCH
"""
)
}
Expand All @@ -68,6 +68,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.store(this.isStore())
.fetch(this.isFetch())
.fetchOne(this.isFetchOne())
.fetchType(this.getFetchType())
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
sql: |
SELECT *
FROM wikiticker
store: true
fetchType: STORE
"""
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
interval: "PT5M"
url: jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnection=true
sql: "SELECT * FROM my_table"
fetch: true
fetchType: FETCH
"""
)
}
Expand All @@ -68,6 +68,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetch(this.isFetch())
.store(this.isStore())
.fetchOne(this.isFetchOne())
.fetchType(this.getFetchType())
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static io.kestra.core.models.tasks.common.FetchType.FETCH_ONE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

Expand All @@ -28,7 +29,7 @@ void insertAndQuery() throws Exception {

Query task = Query.builder()
.url("jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnection=true")
.fetchOne(true)
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ triggers:
from products
limit 1
url: jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnection=true
fetch: true
fetchType: FETCH
interval: PT30S

tasks:
Expand Down
Loading

0 comments on commit 5d966ea

Please sign in to comment.