Skip to content

Commit

Permalink
feat(plugin): Add plugin for Apache Druid
Browse files Browse the repository at this point in the history
  • Loading branch information
smantri-moveworks committed Dec 4, 2023
1 parent 3fd824b commit 806b75e
Show file tree
Hide file tree
Showing 14 changed files with 629 additions and 1 deletion.
114 changes: 113 additions & 1 deletion docker-compose-ci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
version: "3.6"

volumes:
metadata_data: {}
middle_var: {}
historical_var: {}
broker_var: {}
coordinator_var: {}
router_var: {}
druid_shared: {}

services:
mysql:
image: mysql:8
Expand Down Expand Up @@ -73,4 +82,107 @@ services:
ports:
- "9047:9047"
- "31010:31010"
- "45678:45678"
- "45678:45678"

druid_postgres:
image: postgres:latest
ports:
- "5432:5432"
volumes:
- metadata_data:/var/lib/postgresql/data
environment:
- postgres_PASSWORD=FoolishPassword
- postgres_USER=druid
- postgres_DB=druid

# Need 3.5 or later for druid container nodes
zookeeper:
container_name: zookeeper
image: zookeeper:3.5.10
ports:
- "2181:2181"
environment:
- ZOO_MY_ID=1

druid_coordinator:
image: apache/druid:28.0.0
container_name: druid_coordinator
volumes:
- druid_shared:/opt/shared
- coordinator_var:/opt/druid/var
depends_on:
- zookeeper
- druid_postgres
ports:
- "11081:8081"
command:
- coordinator
env_file:
- environment_druid

druid_broker:
image: apache/druid:28.0.0
container_name: druid_broker
volumes:
- broker_var:/opt/druid/var
depends_on:
- zookeeper
- druid_postgres
- druid_coordinator
ports:
- "11082:8082"
command:
- broker
env_file:
- environment_druid

druid_historical:
image: apache/druid:28.0.0
container_name: druid_historical
volumes:
- druid_shared:/opt/shared
- historical_var:/opt/druid/var
depends_on:
- zookeeper
- druid_postgres
- druid_coordinator
ports:
- "11083:8083"
command:
- historical
env_file:
- environment_druid

druid_middlemanager:
image: apache/druid:28.0.0
container_name: druid_middlemanager
volumes:
- druid_shared:/opt/shared
- middle_var:/opt/druid/var
depends_on:
- zookeeper
- druid_postgres
- druid_coordinator
ports:
- "11091:8091"
- "11100-11105:8100-8105"
command:
- middleManager
env_file:
- environment_druid

druid_router:
image: apache/druid:28.0.0
container_name: druid_router
volumes:
- router_var:/opt/druid/var
depends_on:
- zookeeper
- druid_postgres
- druid_coordinator
ports:
- "8888:8888"
command:
- router
env_file:
- environment_druid
53 changes: 53 additions & 0 deletions environment_druid
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Java tuning
#DRUID_XMX=1g
#DRUID_XMS=1g
#DRUID_MAXNEWSIZE=250m
#DRUID_NEWSIZE=250m
#DRUID_MAXDIRECTMEMORYSIZE=6172m
DRUID_SINGLE_NODE_CONF=micro-quickstart

druid_emitter_logging_logLevel=debug

druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-multi-stage-query"]

druid_zk_service_host=zookeeper

druid_metadata_storage_host=
druid_metadata_storage_type=postgresql
druid_metadata_storage_connector_connectURI=jdbc:postgresql://druid_postgres:5432/druid
druid_metadata_storage_connector_user=druid
druid_metadata_storage_connector_password=FoolishPassword

druid_coordinator_balancer_strategy=cachingCost

druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB

druid_storage_type=local
druid_storage_storageDirectory=/opt/shared/segments
druid_indexer_logs_type=file
druid_indexer_logs_directory=/opt/shared/indexing-logs

druid_processing_numThreads=2
druid_processing_numMergeBuffers=2

DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration>
21 changes: 21 additions & 0 deletions plugin-jdbc-druid/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
project.description = 'Connect and query Apache Druid databases using Kestra\'s JDBC plugin.'

jar {
manifest {
attributes(
"X-Kestra-Name": project.name,
"X-Kestra-Title": "Apache Druid",
"X-Kestra-Group": project.group + ".jdbc.druid",
"X-Kestra-Description": project.description,
"X-Kestra-Version": project.version
)
}
}

dependencies {
implementation("org.apache.calcite.avatica:avatica-core:1.23.0")
implementation project(':plugin-jdbc')

testImplementation project(':plugin-jdbc').sourceSets.test.output
testImplementation("org.json:json:20210307")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.kestra.plugin.jdbc.druid;

import io.kestra.plugin.jdbc.AbstractCellConverter;
import lombok.SneakyThrows;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZoneId;

public class DruidCellConverter extends AbstractCellConverter {
public DruidCellConverter(ZoneId zoneId) {
super(zoneId);
}

@SneakyThrows
@Override
public Object convertCell(int columnIndex, ResultSet rs, Connection connection) throws SQLException {
return super.convert(columnIndex, rs);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.kestra.plugin.jdbc.druid;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.apache.calcite.avatica.remote.Driver;

import java.sql.*;
import java.time.ZoneId;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Query a Apache Druid server"
)
@Plugin(
examples = {
@Example(
code = {
"url: jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnection=true",
"sql: |",
" SELECT *",
" FROM wikiticker",
"fetch: true"
}
)
}
)
public class Query extends AbstractJdbcQuery implements RunnableTask<AbstractJdbcQuery.Output> {
@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new DruidCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new Driver());
}

@Override
protected Statement createStatement(Connection conn) throws SQLException {
return conn.createStatement();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.kestra.plugin.jdbc.druid;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractJdbcTrigger;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.apache.calcite.avatica.remote.Driver;

import java.sql.DriverManager;
import java.sql.SQLException;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Wait for query on a Druid database."
)
@Plugin(
examples = {
@Example(
title = "Wait for a sql query to return results and iterate through rows",
full = true,
code = {
"id: jdbc-trigger",
"namespace: io.kestra.tests",
"",
"tasks:",
" - id: each",
" type: io.kestra.core.tasks.flows.EachSequential",
" tasks:",
" - id: return",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{json(taskrun.value)}}\"",
" value: \"{{ trigger.rows }}\"",
"",
"triggers:",
" - id: watch",
" type: io.kestra.plugin.jdbc.druid.Trigger",
" interval: \"PT5M\"",
" sql: \"SELECT * FROM my_table\""
}
)
}
)
public class Trigger extends AbstractJdbcTrigger {

@Override
protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Exception {
var query = Query.builder()
.id(this.id)
.type(Query.class.getName())
.url(this.getUrl())
.username(this.getUsername())
.password(this.getPassword())
.timeZoneId(this.getTimeZoneId())
.sql(this.getSql())
.fetch(this.isFetch())
.store(this.isStore())
.fetchOne(this.isFetchOne())
.additionalVars(this.additionalVars)
.build();
return query.run(runContext);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new Driver());
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
@PluginSubGroup(
description = "This sub-group of plugins contains tasks for accessing the Druid database.",
categories = PluginSubGroup.PluginCategory.DATABASE
)
package io.kestra.plugin.jdbc.druid;

import io.kestra.core.models.annotations.PluginSubGroup;
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 806b75e

Please sign in to comment.