Skip to content

Commit

Permalink
feat(clickhouse): add clickHouse bulk insert (#172)
Browse files Browse the repository at this point in the history
close #167
  • Loading branch information
iNikitaGricenko authored Nov 4, 2023
1 parent 70aa012 commit 770f859
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.kestra.plugin.jdbc.clickhouse;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcBatch;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Bulk Insert new rows into a ClickHouse database."
)
@Plugin(
examples = {
@Example(
title = "Insert rows from another table to a Clickhouse database using asynchronous inserts",
code = {
"from: \"{{ outputs.query.uri }}\"",
"url: jdbc:clickhouse://127.0.0.1:56982/",
"username: clickhouse",
"password: ch_passwd",
"sql: INSERT INTO YourTable SETTINGS async_insert=1, wait_for_async_insert=1 values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"
}
),
@Example(
title = "Insert data into specific columns via a SQL query to a ClickHouse database using asynchronous inserts",
code = {
"from: \"{{ outputs.query.uri }}\"",
"url: jdbc:clickhouse://127.0.0.1:56982/",
"username: clickhouse",
"password: ch_passwd",
"sql: INSERT INTO YourTable ( field1, field2, field3 ) SETTINGS async_insert=1, wait_for_async_insert=1 values( ?, ?, ? )"
}
)
}
)
public class BulkInsert extends AbstractJdbcBatch implements RunnableTask<AbstractJdbcBatch.Output> {

@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new ClickHouseCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new com.clickhouse.jdbc.ClickHouseDriver());
}

@Override
public Output run(RunContext runContext) throws Exception {
return super.run(runContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

import com.google.common.collect.ImmutableMap;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractRdbmsTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;

import java.io.FileNotFoundException;
import java.io.*;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.time.LocalDate;
Expand Down Expand Up @@ -92,6 +95,51 @@ void update() throws Exception {
assertThat(runOutput.getRow().get("String"), is("D"));
}

@Test
void updateBatch() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
OutputStream output = new FileOutputStream(tempFile);

for (int i = 0; i < 1000; i++) {
FileSerde.write(output, ImmutableMap.builder()
.put("String", "kestra")
.build()
);
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

BulkInsert taskUpdate = BulkInsert.builder()
.from(uri.toString())
.url(getUrl())
.username(getUsername())
.password(getPassword())
.timeZoneId("Europe/Paris")
.sql("INSERT INTO clickhouse_types (String) SETTINGS async_insert=1, wait_for_async_insert=1 values( ? )")
.build();

taskUpdate.run(runContext);

// clickhouse need some to refresh
Thread.sleep(500);

Query taskGet = Query.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetch(true)
.timeZoneId("Europe/Paris")
.sql("select String from clickhouse_types")
.build();

AbstractJdbcQuery.Output runOutput = taskGet.run(runContext);
assertThat(runOutput.getRows(), notNullValue());
assertThat(runOutput.getSize(), is(1001L));
assertThat(runOutput.getRows().stream().anyMatch(map -> map.get("String").equals("kestra")), is(true));
}

@Override
protected String getUrl() {
return "jdbc:clickhouse://127.0.0.1:28123/default";
Expand Down

0 comments on commit 770f859

Please sign in to comment.