Skip to content

Commit

Permalink
Merge pull request #409
Browse files Browse the repository at this point in the history
* feat: add queries to sqlite

* fix: fix test trigger and add new test for parameters sqlite

* test: add test for rollback

* test: add test for non transactional
  • Loading branch information
mgabelle authored Oct 24, 2024
1 parent f41de10 commit eaa8a5d
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package io.kestra.plugin.jdbc.sqlite;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.runners.PluginUtilsService;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcBaseQuery;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.micronaut.http.uri.UriBuilder;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.net.URI;
import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Map;
import java.util.Properties;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Queries on a SQLite database."
)
@Plugin(
examples = {
@Example(
full = true,
title = "Execute multiple queries, using existing sqlite file, and pass the results to another task.",
code = """
id: sqlite_query_using_file
namespace: company.team
tasks:
- id: update
type: io.kestra.plugin.jdbc.sqlite.Queries
url: jdbc:sqlite:myfile.db
sqliteFile: {{ outputs.get.outputFiles['myfile.sqlite'] }}
sql: select * from pgsql_types
fetchType: FETCH
- id: use_fetched_data
type: io.kestra.plugin.jdbc.sqlite.Queries
url: jdbc:sqlite:myfile.db
sqliteFile: {{ outputs.get.outputFiles['myfile.sqlite'] }}
sql: |
{% for row in outputs.update.rows %}
INSERT INTO pl_store_distribute (year_month,store_code, update_date)
VALUES ({{row.play_time}}, {{row.concert_id}}, TO_TIMESTAMP('{{row.timestamp_type}}', 'YYYY-MM-DDTHH:MI:SS.US') );
{% endfor %}"
"""
)
}
)
public class Queries extends AbstractJdbcQueries implements RunnableTask<AbstractJdbcQueries.MultiQueryOutput> {

@Schema(
title = "Add sqlite file.",
description = "The file must be from Kestra's internal storage"
)
@PluginProperty(dynamic = true)
protected String sqliteFile;

@Getter(AccessLevel.NONE)
protected transient Path workingDirectory;

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
Properties properties = super.connectionProperties(runContext);

URI url = URI.create((String) properties.get("jdbc.url"));

// get file name from url scheme parts
String filename = url.getSchemeSpecificPart().split(":")[1];

Path path = runContext.workingDir().resolve(Path.of(filename));
if (path.toFile().exists()) {
url = URI.create(path.toString());

UriBuilder builder = UriBuilder.of(url);

builder.scheme("jdbc:sqlite");

properties.put("jdbc.url", builder.build().toString());
}

return properties;
}

@Override
public AbstractJdbcQueries.MultiQueryOutput run(RunContext runContext) throws Exception {
Properties properties = super.connectionProperties(runContext);

URI url = URI.create((String) properties.get("jdbc.url"));

this.workingDirectory = runContext.workingDir().path();

if (this.sqliteFile != null) {

// Get file name from url scheme parts, to be equally same as in connection url
String filename = url.getSchemeSpecificPart().split(":")[1];

PluginUtilsService.createInputFiles(
runContext,
workingDirectory,
Map.of(filename, this.sqliteFile),
additionalVars
);
}

return super.run(runContext);
}

@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new SqliteCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new org.sqlite.JDBC());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package io.kestra.plugin.jdbc.sqlite;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.kestra.plugin.jdbc.AbstractRdbmsTest;
import org.junit.jupiter.api.Test;

import java.io.FileNotFoundException;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static io.kestra.core.models.tasks.common.FetchType.FETCH;
import static io.kestra.core.models.tasks.common.FetchType.FETCH_ONE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;

@KestraTest
public class SqliteQueriesTest extends AbstractRdbmsTest {

@Test
void testMultiSelectWithParameters() throws Exception {
RunContext runContext = runContextFactory.of(Collections.emptyMap());

Map<String, Object> parameters = Map.of(
"age", 40,
"brand", "Apple",
"cpu_frequency", 1.5
);

Queries taskGet = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH)
.timeZoneId("Europe/Paris")
.sql("""
SELECT firstName, lastName, age FROM employee where age > :age and age < :age + 10;
SELECT brand, model FROM laptop where brand = :brand and cpu_frequency > :cpu_frequency;
""")
.parameters(Property.of(parameters))
.build();

AbstractJdbcQueries.MultiQueryOutput runOutput = taskGet.run(runContext);
assertThat(runOutput.getOutputs().size(), is(2));

List<Map<String, Object>> employees = runOutput.getOutputs().getFirst().getRows();
assertThat("employees", employees, notNullValue());
assertThat("employees", employees.size(), is(1));
assertThat("employee selected", employees.getFirst().get("age"), is(45));
assertThat("employee selected", employees.getFirst().get("firstName"), is("John"));
assertThat("employee selected", employees.getFirst().get("lastName"), is("Doe"));

List<Map<String, Object>>laptops = runOutput.getOutputs().getLast().getRows();
assertThat("laptops", laptops, notNullValue());
assertThat("laptops", laptops.size(), is(1));
assertThat("selected laptop", laptops.getFirst().get("brand"), is("Apple"));
}

@Test
void testRollback() throws Exception {
RunContext runContext = runContextFactory.of(Collections.emptyMap());

//Queries should pass in a transaction
Queries queriesPass = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
DROP TABLE IF EXISTS test_transaction;
CREATE TABLE test_transaction(id INTEGER PRIMARY KEY);
INSERT INTO test_transaction (id) VALUES (1);
SELECT COUNT(id) as transaction_count FROM test_transaction;
""")
.build();

AbstractJdbcQueries.MultiQueryOutput runOutput = queriesPass.run(runContext);
assertThat(runOutput.getOutputs().size(), is(1));
assertThat(runOutput.getOutputs().getFirst().getRow().get("transaction_count"), is(1));

//Queries should fail due to bad sql
Queries insertsFail = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
INSERT INTO test_transaction (id) VALUES (2);
INSERT INTO test_transaction (id) VALUES (3f);
""") //Try inserting before failing
.build();

assertThrows(Exception.class, () -> insertsFail.run(runContext));

//Final query to verify the amount of updated rows
Queries verifyQuery = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
SELECT COUNT(id) as transaction_count FROM test_transaction;
""") //Try inserting before failing
.build();

AbstractJdbcQueries.MultiQueryOutput verifyOutput = verifyQuery.run(runContext);
assertThat(verifyOutput.getOutputs().size(), is(1));
assertThat(verifyOutput.getOutputs().getFirst().getRow().get("transaction_count"), is(1));
}

@Test
void testNonTransactionalShouldNotRollback() throws Exception {
RunContext runContext = runContextFactory.of(Collections.emptyMap());

//Queries should pass in a transaction
Queries insertOneAndFail = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.transaction(Property.of(false))
.timeZoneId("Europe/Paris")
.sql("""
DROP TABLE IF EXISTS test_transaction;
CREATE TABLE test_transaction(id INTEGER PRIMARY KEY);
INSERT INTO test_transaction (id) VALUES (1);
INSERT INTO test_transaction (id) VALUES (1f);
INSERT INTO test_transaction (id) VALUES (2);
""")
.build();

assertThrows(Exception.class, () -> insertOneAndFail.run(runContext));

//Final query to verify the amount of updated rows
Queries verifyQuery = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
SELECT COUNT(id) as transaction_count FROM test_transaction;
""") //Try inserting before failing
.build();

AbstractJdbcQueries.MultiQueryOutput verifyOutput = verifyQuery.run(runContext);
assertThat(verifyOutput.getOutputs().size(), is(1));
assertThat(verifyOutput.getOutputs().getFirst().getRow().get("transaction_count"), is(1));
}

@Override
protected String getUrl() {
return TestUtils.url();
}

@Override
protected String getUsername() {
return TestUtils.username();
}

@Override
protected String getPassword() {
return TestUtils.password();
}

protected Connection getConnection() throws SQLException {
Properties props = new Properties();
props.put("jdbc.url", getUrl());
props.put("user", getUsername());
props.put("password", getPassword());

return DriverManager.getConnection(props.getProperty("jdbc.url"), props);
}

@Override
protected void initDatabase() throws SQLException, FileNotFoundException, URISyntaxException {
executeSqlScript("scripts/sqlite_queries.sql");
}
}
2 changes: 1 addition & 1 deletion plugin-jdbc-sqlite/src/test/resources/scripts/sqlite.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ INSERT INTO lite_types (
'{"key": "value"}',
X'0102030405060708',
NULL
);
);
36 changes: 36 additions & 0 deletions plugin-jdbc-sqlite/src/test/resources/scripts/sqlite_queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

-- Create table employee
DROP TABLE IF EXISTS employee;

CREATE TABLE employee (
employee_id INTEGER PRIMARY KEY,
firstName TEXT,
lastName TEXT,
age INTEGER
);

INSERT INTO employee (employee_id, firstName, lastName, age)
VALUES
(1, 'John', 'Doe', 45),
(2, 'Bryan', 'Grant', 33),
(3, 'Jude', 'Philips', 25),
(4, 'Michael', 'Page', 62);


-- Create table laptop
DROP TABLE IF EXISTS laptop;

CREATE TABLE laptop
(
laptop_id INTEGER PRIMARY KEY,
brand TEXT,
model TEXT,
cpu_frequency REAL
);

INSERT INTO laptop (laptop_id, brand, model, cpu_frequency)
VALUES
(1, 'Apple', 'MacBookPro M1 13', 2.2),
(2, 'Apple', 'MacBookPro M3 16', 1.5),
(3, 'LG', 'Gram', 1.95),
(4, 'Lenovo', 'ThinkPad', 1.05);
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ public AbstractJdbcQueries.MultiQueryOutput run(RunContext runContext) throws Ex
savepoint = conn.setSavepoint();

String sqlRendered = runContext.render(this.sql, this.additionalVars);
String[] queries = isTransactional ? new String[]{sqlRendered} : sqlRendered.split("(?<='\\);)");
String[] queries = sqlRendered.split(";[^']");

for(String query : queries) {
//Create statement, execute
stmt = createPreparedStatementAndPopulateParameters(runContext, conn, query);
stmt.setFetchSize(this.getFetchSize());
logger.debug("Starting query: {}", query);
boolean hasMoreResult = stmt.execute();
conn.commit();
if(!isTransactional) {
conn.commit();
}

//Create Outputs
while (hasMoreResult || stmt.getUpdateCount() != -1) {
Expand Down

0 comments on commit eaa8a5d

Please sign in to comment.