diff --git a/docker-compose-ci.yml b/docker-compose-ci.yml
index a16bf2a5..ba0da043 100644
--- a/docker-compose-ci.yml
+++ b/docker-compose-ci.yml
@@ -1,5 +1,14 @@
version: "3.6"
+volumes:
+ metadata_data: {}
+ middle_var: {}
+ historical_var: {}
+ broker_var: {}
+ coordinator_var: {}
+ router_var: {}
+ druid_shared: {}
+
services:
mysql:
image: mysql:8
@@ -73,4 +82,107 @@ services:
ports:
- "9047:9047"
- "31010:31010"
- - "45678:45678"
\ No newline at end of file
+ - "45678:45678"
+
+ druid_postgres:
+ image: postgres:latest
+ ports:
+ - "5432:5432"
+ volumes:
+ - metadata_data:/var/lib/postgresql/data
+ environment:
+ - postgres_PASSWORD=FoolishPassword
+ - postgres_USER=druid
+ - postgres_DB=druid
+
+ # Need 3.5 or later for druid container nodes
+ zookeeper:
+ container_name: zookeeper
+ image: zookeeper:3.5.10
+ ports:
+ - "2181:2181"
+ environment:
+ - ZOO_MY_ID=1
+
+ druid_coordinator:
+ image: apache/druid:28.0.0
+ container_name: druid_coordinator
+ volumes:
+ - druid_shared:/opt/shared
+ - coordinator_var:/opt/druid/var
+ depends_on:
+ - zookeeper
+ - druid_postgres
+ ports:
+ - "11081:8081"
+ command:
+ - coordinator
+ env_file:
+ - environment_druid
+
+ druid_broker:
+ image: apache/druid:28.0.0
+ container_name: druid_broker
+ volumes:
+ - broker_var:/opt/druid/var
+ depends_on:
+ - zookeeper
+ - druid_postgres
+ - druid_coordinator
+ ports:
+ - "11082:8082"
+ command:
+ - broker
+ env_file:
+ - environment_druid
+
+ druid_historical:
+ image: apache/druid:28.0.0
+ container_name: druid_historical
+ volumes:
+ - druid_shared:/opt/shared
+ - historical_var:/opt/druid/var
+ depends_on:
+ - zookeeper
+ - druid_postgres
+ - druid_coordinator
+ ports:
+ - "11083:8083"
+ command:
+ - historical
+ env_file:
+ - environment_druid
+
+ druid_middlemanager:
+ image: apache/druid:28.0.0
+ container_name: druid_middlemanager
+ volumes:
+ - druid_shared:/opt/shared
+ - middle_var:/opt/druid/var
+ depends_on:
+ - zookeeper
+ - druid_postgres
+ - druid_coordinator
+ ports:
+ - "11091:8091"
+ - "11100-11105:8100-8105"
+ command:
+ - middleManager
+ env_file:
+ - environment_druid
+
+ druid_router:
+ image: apache/druid:28.0.0
+ container_name: druid_router
+ volumes:
+ - router_var:/opt/druid/var
+ depends_on:
+ - zookeeper
+ - druid_postgres
+ - druid_coordinator
+ ports:
+ - "8888:8888"
+ command:
+ - router
+ env_file:
+ - environment_druid
\ No newline at end of file
diff --git a/environment_druid b/environment_druid
new file mode 100644
index 00000000..be65030c
--- /dev/null
+++ b/environment_druid
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Java tuning
+#DRUID_XMX=1g
+#DRUID_XMS=1g
+#DRUID_MAXNEWSIZE=250m
+#DRUID_NEWSIZE=250m
+#DRUID_MAXDIRECTMEMORYSIZE=6172m
+DRUID_SINGLE_NODE_CONF=micro-quickstart
+
+druid_emitter_logging_logLevel=debug
+
+druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-multi-stage-query"]
+
+druid_zk_service_host=zookeeper
+
+druid_metadata_storage_host=
+druid_metadata_storage_type=postgresql
+druid_metadata_storage_connector_connectURI=jdbc:postgresql://druid_postgres:5432/druid
+druid_metadata_storage_connector_user=druid
+druid_metadata_storage_connector_password=FoolishPassword
+
+druid_coordinator_balancer_strategy=cachingCost
+
+druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
+druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB
+
+druid_storage_type=local
+druid_storage_storageDirectory=/opt/shared/segments
+druid_indexer_logs_type=file
+druid_indexer_logs_directory=/opt/shared/indexing-logs
+
+druid_processing_numThreads=2
+druid_processing_numMergeBuffers=2
+
+DRUID_LOG4J=
\ No newline at end of file
diff --git a/plugin-jdbc-druid/build.gradle b/plugin-jdbc-druid/build.gradle
new file mode 100644
index 00000000..5b311faf
--- /dev/null
+++ b/plugin-jdbc-druid/build.gradle
@@ -0,0 +1,21 @@
+project.description = 'Connect and query Apache Druid databases using Kestra\'s JDBC plugin.'
+
+jar {
+ manifest {
+ attributes(
+ "X-Kestra-Name": project.name,
+ "X-Kestra-Title": "Apache Druid",
+ "X-Kestra-Group": project.group + ".jdbc.druid",
+ "X-Kestra-Description": project.description,
+ "X-Kestra-Version": project.version
+ )
+ }
+}
+
+dependencies {
+ implementation("org.apache.calcite.avatica:avatica-core:1.23.0")
+ implementation project(':plugin-jdbc')
+
+ testImplementation project(':plugin-jdbc').sourceSets.test.output
+ testImplementation("org.json:json:20210307")
+}
diff --git a/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/DruidCellConverter.java b/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/DruidCellConverter.java
new file mode 100644
index 00000000..d0352919
--- /dev/null
+++ b/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/DruidCellConverter.java
@@ -0,0 +1,22 @@
+package io.kestra.plugin.jdbc.druid;
+
+import io.kestra.plugin.jdbc.AbstractCellConverter;
+import lombok.SneakyThrows;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.ZoneId;
+
+public class DruidCellConverter extends AbstractCellConverter {
+ public DruidCellConverter(ZoneId zoneId) {
+ super(zoneId);
+ }
+
+ @SneakyThrows
+ @Override
+ public Object convertCell(int columnIndex, ResultSet rs, Connection connection) throws SQLException {
+ return super.convert(columnIndex, rs);
+ }
+}
+
diff --git a/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/Query.java b/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/Query.java
new file mode 100644
index 00000000..e0eec8bd
--- /dev/null
+++ b/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/Query.java
@@ -0,0 +1,55 @@
+package io.kestra.plugin.jdbc.druid;
+
+import io.kestra.core.models.annotations.Example;
+import io.kestra.core.models.annotations.Plugin;
+import io.kestra.core.models.tasks.RunnableTask;
+import io.kestra.plugin.jdbc.AbstractCellConverter;
+import io.kestra.plugin.jdbc.AbstractJdbcQuery;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.calcite.avatica.remote.Driver;
+
+import java.sql.*;
+import java.time.ZoneId;
+
+@SuperBuilder
+@ToString
+@EqualsAndHashCode
+@Getter
+@NoArgsConstructor
+@Schema(
+ title = "Query a Apache Druid server"
+)
+@Plugin(
+ examples = {
+ @Example(
+ code = {
+ "url: jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnection=true",
+ "sql: |",
+ " SELECT *",
+ " FROM wikiticker",
+ "fetch: true"
+ }
+ )
+ }
+)
+public class Query extends AbstractJdbcQuery implements RunnableTask {
+ @Override
+ protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
+ return new DruidCellConverter(zoneId);
+ }
+
+ @Override
+ public void registerDriver() throws SQLException {
+ DriverManager.registerDriver(new Driver());
+ }
+
+ @Override
+ protected Statement createStatement(Connection conn) throws SQLException {
+ return conn.createStatement();
+ }
+}
diff --git a/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/Trigger.java b/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/Trigger.java
new file mode 100644
index 00000000..b0c3cc75
--- /dev/null
+++ b/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/Trigger.java
@@ -0,0 +1,79 @@
+package io.kestra.plugin.jdbc.druid;
+
+import io.kestra.core.models.annotations.Example;
+import io.kestra.core.models.annotations.Plugin;
+import io.kestra.core.runners.RunContext;
+import io.kestra.plugin.jdbc.AbstractJdbcQuery;
+import io.kestra.plugin.jdbc.AbstractJdbcTrigger;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.calcite.avatica.remote.Driver;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+@SuperBuilder
+@ToString
+@EqualsAndHashCode
+@Getter
+@NoArgsConstructor
+@Schema(
+ title = "Wait for query on a Druid database."
+)
+@Plugin(
+ examples = {
+ @Example(
+ title = "Wait for a sql query to return results and iterate through rows",
+ full = true,
+ code = {
+ "id: jdbc-trigger",
+ "namespace: io.kestra.tests",
+ "",
+ "tasks:",
+ " - id: each",
+ " type: io.kestra.core.tasks.flows.EachSequential",
+ " tasks:",
+ " - id: return",
+ " type: io.kestra.core.tasks.debugs.Return",
+ " format: \"{{json(taskrun.value)}}\"",
+ " value: \"{{ trigger.rows }}\"",
+ "",
+ "triggers:",
+ " - id: watch",
+ " type: io.kestra.plugin.jdbc.druid.Trigger",
+ " interval: \"PT5M\"",
+ " sql: \"SELECT * FROM my_table\""
+ }
+ )
+ }
+)
+public class Trigger extends AbstractJdbcTrigger {
+
+ @Override
+ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Exception {
+ var query = Query.builder()
+ .id(this.id)
+ .type(Query.class.getName())
+ .url(this.getUrl())
+ .username(this.getUsername())
+ .password(this.getPassword())
+ .timeZoneId(this.getTimeZoneId())
+ .sql(this.getSql())
+ .fetch(this.isFetch())
+ .store(this.isStore())
+ .fetchOne(this.isFetchOne())
+ .additionalVars(this.additionalVars)
+ .build();
+ return query.run(runContext);
+ }
+
+ @Override
+ public void registerDriver() throws SQLException {
+ DriverManager.registerDriver(new Driver());
+ }
+}
+
diff --git a/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/package-info.java b/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/package-info.java
new file mode 100644
index 00000000..f1cd04b4
--- /dev/null
+++ b/plugin-jdbc-druid/src/main/java/io/kestra/plugin/jdbc/druid/package-info.java
@@ -0,0 +1,7 @@
+@PluginSubGroup(
+ description = "This sub-group of plugins contains tasks for accessing the Druid database.",
+ categories = PluginSubGroup.PluginCategory.DATABASE
+)
+package io.kestra.plugin.jdbc.druid;
+
+import io.kestra.core.models.annotations.PluginSubGroup;
\ No newline at end of file
diff --git a/plugin-jdbc-druid/src/main/resources/io.kestra.plugin.jdbc.druid.svg b/plugin-jdbc-druid/src/main/resources/io.kestra.plugin.jdbc.druid.svg
new file mode 100644
index 00000000..f9159dc4
--- /dev/null
+++ b/plugin-jdbc-druid/src/main/resources/io.kestra.plugin.jdbc.druid.svg
@@ -0,0 +1,2 @@
+
+
\ No newline at end of file
diff --git a/plugin-jdbc-druid/src/main/resources/plugin-icon.svg b/plugin-jdbc-druid/src/main/resources/plugin-icon.svg
new file mode 100644
index 00000000..aac61a00
--- /dev/null
+++ b/plugin-jdbc-druid/src/main/resources/plugin-icon.svg
@@ -0,0 +1,25 @@
+
+
+
diff --git a/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidDriverTest.java b/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidDriverTest.java
new file mode 100644
index 00000000..2f2fe8de
--- /dev/null
+++ b/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidDriverTest.java
@@ -0,0 +1,13 @@
+package io.kestra.plugin.jdbc.druid;
+
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import io.kestra.plugin.jdbc.AbstractJdbcDriverTest;
+
+import java.sql.Driver;
+@MicronautTest
+public class DruidDriverTest extends AbstractJdbcDriverTest {
+ @Override
+ protected Class extends Driver> getDriverClass() {
+ return org.apache.calcite.avatica.remote.Driver.class;
+ }
+}
diff --git a/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidTest.java b/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidTest.java
new file mode 100644
index 00000000..d614a1f9
--- /dev/null
+++ b/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidTest.java
@@ -0,0 +1,114 @@
+package io.kestra.plugin.jdbc.druid;
+
+import com.google.common.collect.ImmutableMap;
+import io.kestra.core.runners.RunContext;
+import io.kestra.core.runners.RunContextFactory;
+import io.kestra.plugin.jdbc.AbstractJdbcQuery;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import jakarta.inject.Inject;
+import org.json.JSONObject;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.*;
+
+@MicronautTest
+public class DruidTest {
+ @Inject
+ RunContextFactory runContextFactory;
+
+ @BeforeAll
+ public static void startServer() throws Exception {
+ String payload = "{" +
+ " \"context\": " +
+ " {\"waitUntilSegmentsLoad\": true, " +
+ " \"finalizeAggregations\": false, \"groupByEnableMultiValueUnnesting\": false, " +
+ " \"executionMode\":\"async\", \"maxNumTasks\": 2}, " +
+ " \"header\": true, " +
+ " \"query\": " +
+ " \"REPLACE INTO \\\"products\\\" OVERWRITE ALL WITH \\\"ext\\\" AS ( SELECT * FROM TABLE(EXTERN('{\\\"type\\\":\\\"http\\\",\\\"uris\\\":[\\\"https://media.githubusercontent.com/media/datablist/sample-csv-files/main/files/products/products-1000.csv\\\"]}',\\n '{\\\"type\\\":\\\"csv\\\",\\\"findColumnsFromHeader\\\":true}'\\n )\\n ) EXTEND (\\\"index\\\" BIGINT, \\\"name\\\" VARCHAR, \\\"ean\\\" BIGINT)) SELECT TIMESTAMP '2000-01-01 00:00:00' AS \\\"__time\\\", \\\"index\\\", \\\"name\\\", \\\"ean\\\"FROM \\\"ext\\\"PARTITIONED BY ALL\"," +
+ " \"resultFormat\": \"array\", " +
+ " \"sqlTypesHeader\": true, " +
+ " \"typesHeader\": true}";
+ URL obj = new URL("http://localhost:8888/druid/v2/sql/statements");
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("POST");
+ con.setRequestProperty("Content-Type","application/json");
+ con.setDoOutput(true);
+ DataOutputStream wr = new DataOutputStream(con.getOutputStream());
+ wr.writeBytes(payload);
+ wr.flush();
+ wr.close();
+
+ String response = getResponseFromConnection(con);
+ JSONObject jsonObjectResponse = new JSONObject(response);
+ String queryId = jsonObjectResponse.getString("queryId");
+ String state = jsonObjectResponse.getString("state");
+ while (!state.equals("SUCCESS")) {
+ TimeUnit.SECONDS.sleep(30);
+ HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:8888/druid/v2/sql/statements/"+ queryId).openConnection();
+ connection.setRequestMethod("GET");
+ connection.setRequestProperty("Content-Type","application/json");
+ connection.setDoOutput(true);
+ String getCallResonse = getResponseFromConnection(connection);
+ System.out.println(getCallResonse);
+ jsonObjectResponse = new JSONObject(getCallResonse);
+ queryId = jsonObjectResponse.getString("queryId");
+ state = jsonObjectResponse.getString("state");
+ }
+ }
+
+ @Test
+ void insertAndQuery() throws Exception {
+ RunContext runContext = runContextFactory.of(ImmutableMap.of());
+
+ Query task = Query.builder()
+ .url("jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnection=true")
+ .fetchOne(true)
+ .timeZoneId("Europe/Paris")
+ .sql("select \n" +
+ " -- NULL as t_null,\n" +
+ " 'string' AS t_string,\n" +
+ " CAST(2147483647 AS INT) as t_integer,\n" +
+ " CAST(12345.124 AS FLOAT) as t_float,\n" +
+ " CAST(12345.124 AS DOUBLE) as t_double\n" +
+ " \n" +
+ "from restaurant_user_transactions \n" +
+ "limit 1")
+ .build();
+
+ AbstractJdbcQuery.Output runOutput = task.run(runContext);
+ assertThat(runOutput.getRow(), notNullValue());
+
+ assertThat(runOutput.getRow().get("t_null"), is(nullValue()));
+ assertThat(runOutput.getRow().get("t_string"), is("string"));
+ assertThat(runOutput.getRow().get("t_integer"), is(2147483647));
+ assertThat(runOutput.getRow().get("t_float"), is(12345.124));
+ assertThat(runOutput.getRow().get("t_double"), is(12345.124D));
+ }
+
+ public static String getResponseFromConnection(HttpURLConnection connection) throws Exception {
+ int responseCode = connection.getResponseCode();
+ System.out.println("Response Code : " + responseCode);
+
+ BufferedReader iny = new BufferedReader(
+ new InputStreamReader(connection.getInputStream()));
+ String output;
+ StringBuffer response = new StringBuffer();
+
+ while ((output = iny.readLine()) != null) {
+ response.append(output);
+ }
+ iny.close();
+ return response.toString();
+ }
+}
+
diff --git a/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidTriggerTest.java b/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidTriggerTest.java
new file mode 100644
index 00000000..5d97971d
--- /dev/null
+++ b/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidTriggerTest.java
@@ -0,0 +1,101 @@
+package io.kestra.plugin.jdbc.druid;
+
+import io.kestra.plugin.jdbc.AbstractJdbcTriggerTest;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import org.json.JSONObject;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+@MicronautTest
+class DruidTriggerTest extends AbstractJdbcTriggerTest {
+
+ @BeforeAll
+ public static void startServer() throws Exception {
+ String payload = "{" +
+ " \"context\": " +
+ " {\"waitUntilSegmentsLoad\": true, " +
+ " \"finalizeAggregations\": false, \"groupByEnableMultiValueUnnesting\": false, " +
+ " \"executionMode\":\"async\", \"maxNumTasks\": 2}, " +
+ " \"header\": true, " +
+ " \"query\": " +
+ " \"REPLACE INTO \\\"products\\\" OVERWRITE ALL WITH \\\"ext\\\" AS ( SELECT * FROM TABLE(EXTERN('{\\\"type\\\":\\\"http\\\",\\\"uris\\\":[\\\"https://media.githubusercontent.com/media/datablist/sample-csv-files/main/files/products/products-1000.csv\\\"]}',\\n '{\\\"type\\\":\\\"csv\\\",\\\"findColumnsFromHeader\\\":true}'\\n )\\n ) EXTEND (\\\"index\\\" BIGINT, \\\"name\\\" VARCHAR, \\\"ean\\\" BIGINT)) SELECT TIMESTAMP '2000-01-01 00:00:00' AS \\\"__time\\\", \\\"index\\\", \\\"name\\\", \\\"ean\\\"FROM \\\"ext\\\"PARTITIONED BY ALL\"," +
+ " \"resultFormat\": \"array\", " +
+ " \"sqlTypesHeader\": true, " +
+ " \"typesHeader\": true}";
+ URL obj = new URL("http://localhost:8888/druid/v2/sql/statements");
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("POST");
+ con.setRequestProperty("Content-Type","application/json");
+ con.setDoOutput(true);
+ DataOutputStream wr = new DataOutputStream(con.getOutputStream());
+ wr.writeBytes(payload);
+ wr.flush();
+ wr.close();
+
+ String response = getResponseFromConnection(con);
+ JSONObject jsonObjectResponse = new JSONObject(response);
+ String queryId = jsonObjectResponse.getString("queryId");
+ String state = jsonObjectResponse.getString("state");
+ while (!state.equals("SUCCESS")) {
+ TimeUnit.SECONDS.sleep(30);
+ HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:8888/druid/v2/sql/statements/"+ queryId).openConnection();
+ connection.setRequestMethod("GET");
+ connection.setRequestProperty("Content-Type","application/json");
+ connection.setDoOutput(true);
+ String getCallResonse = getResponseFromConnection(connection);
+ System.out.println(getCallResonse);
+ jsonObjectResponse = new JSONObject(getCallResonse);
+ queryId = jsonObjectResponse.getString("queryId");
+ state = jsonObjectResponse.getString("state");
+ }
+ }
+
+ @Test
+ void run() throws Exception {
+ var execution = triggerFlow(this.getClass().getClassLoader(), "flows","druid-listen");
+
+ var rows = (List