Skip to content

Commit

Permalink
feat: add queries to pinot (#423)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle authored Oct 31, 2024
1 parent ad39a7d commit 245970f
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.kestra.plugin.jdbc.pinot;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.apache.pinot.client.PinotDriver;

import java.sql.*;
import java.time.ZoneId;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Perform multiple queries on an Apache Pinot server."
)
@Plugin(
examples = {
@Example(
full = true,
code = """
id: pinot_queries
namespace: company.team
tasks:
- id: queries
type: o.kestra.plugin.jdbc.pinot.Queries
url: jdbc:pinot://localhost:9000
sql: |
SELECT * FROM airlineStats;
SELECT * FROM airlineStats;
fetchType: FETCH
"""
)
}
)
public class Queries extends AbstractJdbcQueries implements RunnableTask<AbstractJdbcQueries.MultiQueryOutput> {
@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new PinotCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new PinotDriver());
}

@Override
protected PreparedStatement createPreparedStatement(Connection conn, String preparedSql) throws SQLException {
return conn.prepareStatement(preparedSql);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.kestra.plugin.jdbc.pinot;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static io.kestra.core.models.tasks.common.FetchType.FETCH_ONE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

/**
* See :
* - https://docs.pinot.apache.org/configuration-reference/schema
*/
@KestraTest
class PinotQueriesTest {
@Inject
RunContextFactory runContextFactory;

@Test
void multiSelect() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

Queries task = Queries.builder()
.url("jdbc:pinot://localhost:49000")
.fetchType(FETCH_ONE)
.timeZoneId("Europe/Paris")
.sql("""
select count(*) as count from airlineStats;
""")
.build();

AbstractJdbcQueries.MultiQueryOutput runOutput = task.run(runContext);
assertThat(runOutput.getOutputs().getFirst().getRow(), notNullValue());
assertThat(runOutput.getOutputs().getFirst().getRow().get("count"), is(9746L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* - https://docs.pinot.apache.org/configuration-reference/schema
*/
@KestraTest
public class PinotTest {
class PinotTest {
@Inject
RunContextFactory runContextFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private PreparedStatement createPreparedStatementAndPopulateParameters(RunContex
Map<String, Object> namedParamsRendered = this.getParameters() == null ? null : this.getParameters().asMap(runContext, String.class, Object.class);

if(namedParamsRendered == null || namedParamsRendered.isEmpty()) {
return conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
return createPreparedStatement(conn, sql);
}

//Extract parameters in orders and replace them with '?'
Expand All @@ -184,12 +184,16 @@ private PreparedStatement createPreparedStatementAndPopulateParameters(RunContex
preparedSql = matcher.replaceFirst( " ?");
matcher = pattern.matcher(preparedSql);
}
stmt = conn.prepareStatement(preparedSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt = createPreparedStatement(conn, preparedSql);

for(int i=0; i<params.size(); i++) {
stmt.setObject(i+1, namedParamsRendered.get(params.get(i)));
}

return stmt;
}

protected PreparedStatement createPreparedStatement(Connection conn, String preparedSql) throws SQLException {
return conn.prepareStatement(preparedSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
}
}

0 comments on commit 245970f

Please sign in to comment.