Skip to content

Commit

Permalink
refactor: migrate jdbc plugin to dynamic properties (#471)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle authored Dec 23, 2024
1 parent c4f8877 commit 89f8f34
Show file tree
Hide file tree
Showing 84 changed files with 1,450 additions and 1,500 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractJdbcTrigger;
Expand Down Expand Up @@ -32,7 +33,7 @@
code = """
id: jdbc_trigger
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
Expand All @@ -41,7 +42,7 @@
- id: return
type: io.kestra.plugin.core.debug.Return
format: "{{ json(taskrun.value) }}"
triggers:
- id: watch
type: io.kestra.plugin.jdbc.arrowflight.Trigger
Expand All @@ -58,7 +59,7 @@
public class Trigger extends AbstractJdbcTrigger {
@Override
protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Exception {
var query = Query.builder()
Query query = Query.builder()
.id(this.id)
.type(Query.class.getName())
.url(this.getUrl())
Expand All @@ -69,7 +70,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.store(this.isStore())
.fetch(this.isFetch())
.fetchOne(this.isFetchOne())
.fetchType(this.getFetchType())
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractJdbcTrigger;
Expand Down Expand Up @@ -31,7 +32,7 @@
code = """
id: jdbc_trigger
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
Expand All @@ -40,7 +41,7 @@
- id: return
type: io.kestra.plugin.core.debug.Return
format: "{{ json(taskrun.value) }}"
triggers:
- id: watch
type: io.kestra.plugin.jdbc.as400.Trigger
Expand Down Expand Up @@ -70,7 +71,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetch(this.isFetch())
.store(this.isStore())
.fetchOne(this.isFetchOne())
.fetchType(this.getFetchType())
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractJdbcTrigger;
Expand Down Expand Up @@ -31,7 +32,7 @@
code = """
id: jdbc_trigger
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
Expand All @@ -40,7 +41,7 @@
- id: return
type: io.kestra.plugin.core.debug.Return
format: "{{ json(taskrun.value) }}"
triggers:
- id: watch
type: io.kestra.plugin.jdbc.clickhouse.Trigger
Expand Down Expand Up @@ -68,7 +69,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetch(this.isFetch())
.store(this.isStore())
.fetchOne(this.isFetchOne())
.fetchType(this.getFetchType())
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ void testMultiSelectWithParameters() throws Exception {
);

Queries taskGet = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH)
.timeZoneId("Europe/Paris")
.sql("""
.url(Property.of(getUrl()))
.username(null)
.password(null)
.fetchType(Property.of(FETCH))
.timeZoneId(Property.of("Europe/Paris"))
.sql(Property.of("""
SELECT firstName, lastName, age FROM employee where age > :age and age < :age + 10;
SELECT brand, model FROM laptop where brand = :brand and cpu_frequency > :cpu_frequency;
""")
"""))
.parameters(Property.of(parameters))
.build();

Expand All @@ -71,22 +71,22 @@ void testRollback() throws Exception {

//Queries should pass in a transaction
Queries queriesPass = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
.url(Property.of(getUrl()))
.username(null)
.password(null)
.fetchType(Property.of(FETCH_ONE))
.timeZoneId(Property.of("Europe/Paris"))
.sql(Property.of("""
DROP TABLE IF EXISTS test_transaction;
CREATE TABLE test_transaction(id Int64)
ENGINE = MergeTree()
ORDER BY (id)
SETTINGS index_granularity = 8192;
INSERT INTO test_transaction (id) VALUES (1);
SELECT COUNT(id) as transaction_count FROM test_transaction;
""")
"""))
.build();

AbstractJdbcQueries.MultiQueryOutput runOutput = queriesPass.run(runContext);
Expand All @@ -95,30 +95,30 @@ ORDER BY (id)

//Queries should fail due to bad sql
Queries insertsFail = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
.url(Property.of(getUrl()))
.username(null)
.password(null)
.fetchType(Property.of(FETCH_ONE))
.timeZoneId(Property.of("Europe/Paris"))
.sql(Property.of("""
INSERT INTO test_transaction (id) VALUES (2);
INSERT INTO test_transaction (id) VALUES ('random');
""") //Try inserting before failing
""")) //Try inserting before failing
.build();

assertThrows(Exception.class, () -> insertsFail.run(runContext));

//Final query to verify the amount of updated rows
Queries verifyQuery = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH)
.timeZoneId("Europe/Paris")
.sql("""
.url(Property.of(getUrl()))
.username(null)
.password(null)
.fetchType(Property.of(FETCH))
.timeZoneId(Property.of("Europe/Paris"))
.sql(Property.of("""
SELECT COUNT(id) as transaction_count FROM test_transaction;
SELECT * FROM test_transaction;
""") //Try inserting before failing
""")) //Try inserting before failing
.build();

AbstractJdbcQueries.MultiQueryOutput verifyOutput = verifyQuery.run(runContext);
Expand All @@ -132,38 +132,38 @@ void testNonTransactionalShouldNotRollback() throws Exception {

//Queries should pass in a transaction
Queries insertOneAndFail = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.url(Property.of(getUrl()))
.username(null)
.password(null)
.fetchType(Property.of(FETCH_ONE))
.transaction(Property.of(false))
.timeZoneId("Europe/Paris")
.sql("""
.timeZoneId(Property.of("Europe/Paris"))
.sql(Property.of("""
DROP TABLE IF EXISTS test_transaction;
CREATE TABLE test_transaction(id Int64)
ENGINE = MergeTree()
ORDER BY (id)
SETTINGS index_granularity = 8192;
INSERT INTO test_transaction (id) VALUES (1);
INSERT INTO test_transaction (id) VALUES ('random');
INSERT INTO test_transaction (id) VALUES (2);
""")
"""))
.build();

assertThrows(Exception.class, () -> insertOneAndFail.run(runContext));

//Final query to verify the amount of updated rows
Queries verifyQuery = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
.url(Property.of(getUrl()))
.username(null)
.password(null)
.fetchType(Property.of(FETCH_ONE))
.timeZoneId(Property.of("Europe/Paris"))
.sql(Property.of("""
SELECT COUNT(id) as transaction_count FROM test_transaction;
""") //Try inserting before failing
""")) //Try inserting before failing
.build();

AbstractJdbcQueries.MultiQueryOutput verifyOutput = verifyQuery.run(runContext);
Expand Down
Loading

0 comments on commit 89f8f34

Please sign in to comment.