From 314ac78c739279c0a9d69c49d044cf0cb156206d Mon Sep 17 00:00:00 2001
From: daidai <2017501503@qq.com>
Date: Tue, 23 Jul 2024 10:14:01 +0800
Subject: [PATCH] [kudu] pick kudu connector to 452 version. (#232)
Migrate the changes of trino Kudu connector from version 435 to version 452.
Changed some code to adapt to the changes of Trino Spi.
Performed tpch tests on both Trino and Doris.
It is recommended to compile with mvn package -Dmaven.test.skip=true.
---
plugin/trino-kudu/pom.xml | 34 +-
.../trino/plugin/kudu/KuduClientConfig.java | 17 +-
.../trino/plugin/kudu/KuduClientSession.java | 81 ++--
.../trino/plugin/kudu/KuduColumnHandle.java | 76 +---
.../io/trino/plugin/kudu/KuduMetadata.java | 74 ++--
.../io/trino/plugin/kudu/KuduPageSink.java | 28 +-
.../io/trino/plugin/kudu/KuduRecordSet.java | 4 +-
.../trino/plugin/kudu/KuduSplitManager.java | 101 +++--
.../io/trino/plugin/kudu/KuduTableHandle.java | 2 +-
.../java/io/trino/plugin/kudu/TypeHelper.java | 171 ++++-----
.../properties/HashPartitionDefinition.java | 26 +-
.../kudu/properties/KuduTableProperties.java | 142 +++----
.../kudu/properties/PartitionDesign.java | 4 +-
.../properties/RangeBoundValueSerializer.java | 2 +-
.../kudu/properties/RangePartition.java | 28 +-
.../properties/RangePartitionDefinition.java | 17 +-
.../plugin/kudu/schema/NoSchemaEmulation.java | 4 +-
.../schema/SchemaAlreadyExistsException.java | 41 ---
.../SchemaEmulationByTableNameConvention.java | 4 +-
.../kudu/client/KeyEncoderAccessor.java | 5 -
.../kudu/BaseKuduConnectorSmokeTest.java | 28 +-
.../kudu/KuduCreateAndInsertDataSetup.java | 48 +++
.../plugin/kudu/KuduQueryRunnerFactory.java | 194 ++++------
.../plugin/kudu/KuduTabletWaitStrategy.java | 73 ++++
.../plugin/kudu/TestKuduClientConfig.java | 7 +-
.../plugin/kudu/TestKuduConnectorTest.java | 93 +++--
.../TestKuduIntegrationDecimalColumns.java | 3 +-
.../TestKuduIntegrationDynamicFilter.java | 61 +--
.../TestKuduIntegrationHashPartitioning.java | 3 +-
.../TestKuduIntegrationIntegerColumns.java | 3 +-
.../TestKuduIntegrationRangePartitioning.java | 3 +-
.../plugin/kudu/TestKuduTypeMapping.java | 347 ++++++++++++++++++
.../trino/plugin/kudu/TestingKuduServer.java | 11 +-
.../io/trino/testing/BaseConnectorTest.java | 6 +-
34 files changed, 1007 insertions(+), 734 deletions(-)
delete mode 100644 plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaAlreadyExistsException.java
create mode 100644 plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduCreateAndInsertDataSetup.java
create mode 100644 plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduTabletWaitStrategy.java
create mode 100644 plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java
diff --git a/plugin/trino-kudu/pom.xml b/plugin/trino-kudu/pom.xml
index a27bbd0bc26..9878e885ff6 100644
--- a/plugin/trino-kudu/pom.xml
+++ b/plugin/trino-kudu/pom.xml
@@ -10,11 +10,12 @@
trino-kudu
trino-plugin
- Trino - Kudu Connector
+ Trino - Kudu connector
${project.parent.basedir}
- 1.15.0
+ true
+ 1.17.0
@@ -131,12 +132,30 @@
provided
+
+ com.google.errorprone
+ error_prone_annotations
+ runtime
+
+
io.airlift
log-manager
runtime
+
+ com.github.docker-java
+ docker-java-api
+ test
+
+
+
+ dev.failsafe
+ failsafe
+ test
+
+
io.airlift
junit-extensions
@@ -218,20 +237,21 @@
- org.testcontainers
- testcontainers
+ org.rnorth.duct-tape
+ duct-tape
+ 1.0.8
test
org.testcontainers
- toxiproxy
+ testcontainers
test
- org.testng
- testng
+ org.testcontainers
+ toxiproxy
test
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java
index 6d02132f0e7..6a0a281ec8c 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java
@@ -13,7 +13,6 @@
*/
package io.trino.plugin.kudu;
-import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
@@ -35,11 +34,11 @@
@DefunctConfig("kudu.client.default-socket-read-timeout")
public class KuduClientConfig
{
- private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
+ private static final Duration DEFAULT_OPERATION_TIMEOUT = new Duration(30, TimeUnit.SECONDS);
private List masterAddresses = ImmutableList.of();
- private Duration defaultAdminOperationTimeout = new Duration(30, TimeUnit.SECONDS);
- private Duration defaultOperationTimeout = new Duration(30, TimeUnit.SECONDS);
+ private Duration defaultAdminOperationTimeout = DEFAULT_OPERATION_TIMEOUT;
+ private Duration defaultOperationTimeout = DEFAULT_OPERATION_TIMEOUT;
private boolean disableStatistics;
private boolean schemaEmulationEnabled;
private String schemaEmulationPrefix = "presto::";
@@ -54,15 +53,9 @@ public List getMasterAddresses()
}
@Config("kudu.client.master-addresses")
- public KuduClientConfig setMasterAddresses(String commaSeparatedList)
+ public KuduClientConfig setMasterAddresses(List commaSeparatedList)
{
- this.masterAddresses = SPLITTER.splitToList(commaSeparatedList);
- return this;
- }
-
- public KuduClientConfig setMasterAddresses(String... contactPoints)
- {
- this.masterAddresses = ImmutableList.copyOf(contactPoints);
+ this.masterAddresses = commaSeparatedList;
return this;
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java
index f3b2ae96c8e..eec1ac45dd8 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java
@@ -15,6 +15,8 @@
import com.google.common.collect.ImmutableList;
import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
import io.trino.plugin.kudu.properties.ColumnDesign;
import io.trino.plugin.kudu.properties.HashPartitionDefinition;
import io.trino.plugin.kudu.properties.KuduTableProperties;
@@ -58,6 +60,7 @@
import java.io.IOException;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -178,7 +181,7 @@ public List buildKuduSplits(KuduTableHandle tableHandle, DynamicFilte
.boxed().collect(toList());
for (ColumnHandle column : desiredColumns.get()) {
KuduColumnHandle k = (KuduColumnHandle) column;
- int index = k.getOrdinalPosition();
+ int index = k.ordinalPosition();
if (index >= primaryKeyColumnCount) {
columnIndexes.add(index);
}
@@ -194,7 +197,7 @@ public List buildKuduSplits(KuduTableHandle tableHandle, DynamicFilte
else {
if (desiredColumns.isPresent()) {
columnIndexes = desiredColumns.get().stream()
- .map(handle -> ((KuduColumnHandle) handle).getOrdinalPosition())
+ .map(handle -> ((KuduColumnHandle) handle).ordinalPosition())
.collect(toImmutableList());
}
else {
@@ -323,12 +326,12 @@ public void addColumn(SchemaTableName schemaTableName, ColumnMetadata column)
String rawName = schemaEmulation.toRawName(schemaTableName);
AlterTableOptions alterOptions = new AlterTableOptions();
Type type = TypeHelper.toKuduClientType(column.getType());
- alterOptions.addColumn(
- new ColumnSchemaBuilder(column.getName(), type)
- .nullable(true)
- .defaultValue(null)
- .comment(nullToEmpty(column.getComment())) // Kudu doesn't allow null comment
- .build());
+ ColumnSchemaBuilder builder = new ColumnSchemaBuilder(column.getName(), type)
+ .nullable(true)
+ .defaultValue(null)
+ .comment(nullToEmpty(column.getComment())); // Kudu doesn't allow null comment
+ setTypeAttributes(column, builder);
+ alterOptions.addColumn(builder.build());
client.alterTable(rawName, alterOptions);
}
catch (KuduException e) {
@@ -384,8 +387,8 @@ private void changeRangePartition(SchemaTableName schemaTableName, RangePartitio
if (definition == null) {
throw new TrinoException(QUERY_REJECTED, "Table " + schemaTableName + " has no range partition");
}
- PartialRow lowerBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.getLower());
- PartialRow upperBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.getUpper());
+ PartialRow lowerBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.lower());
+ PartialRow upperBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.upper());
AlterTableOptions alterOptions = new AlterTableOptions();
switch (change) {
case ADD:
@@ -467,19 +470,19 @@ private CreateTableOptions buildCreateTableOptions(Schema schema, Map rangePartitions = KuduTableProperties.getRangePartitions(properties);
if (rangePartitionDefinition != null && !rangePartitions.isEmpty()) {
for (RangePartition rangePartition : rangePartitions) {
- PartialRow lower = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.getLower());
- PartialRow upper = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.getUpper());
+ PartialRow lower = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.lower());
+ PartialRow upper = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.upper());
options.addRangePartition(lower, upper);
}
}
@@ -503,7 +506,7 @@ private void addConstraintPredicates(KuduTable table, KuduScanToken.KuduScanToke
Schema schema = table.getSchema();
constraintSummary.getDomains().orElseThrow().forEach((columnHandle, domain) -> {
- int position = ((KuduColumnHandle) columnHandle).getOrdinalPosition();
+ int position = ((KuduColumnHandle) columnHandle).ordinalPosition();
ColumnSchema columnSchema = schema.getColumnByIndex(position);
verify(!domain.isNone(), "Domain is none");
if (domain.isAll()) {
@@ -529,8 +532,8 @@ else if (domain.isSingleValue()) {
KuduPredicate predicate = createInListPredicate(columnSchema, discreteValues);
builder.addPredicate(predicate);
}
- else if (valueSet instanceof SortedRangeSet) {
- Ranges ranges = ((SortedRangeSet) valueSet).getRanges();
+ else if (valueSet instanceof SortedRangeSet sortedRangeSet) {
+ Ranges ranges = sortedRangeSet.getRanges();
List rangeList = ranges.getOrderedRanges();
if (rangeList.stream().allMatch(Range::isSingleValue)) {
io.trino.spi.type.Type type = TypeHelper.fromKuduColumn(columnSchema);
@@ -577,35 +580,39 @@ private KuduPredicate createComparisonPredicate(ColumnSchema columnSchema, KuduP
{
io.trino.spi.type.Type type = TypeHelper.fromKuduColumn(columnSchema);
Object javaValue = TypeHelper.getJavaValue(type, value);
- if (javaValue instanceof Long) {
- return KuduPredicate.newComparisonPredicate(columnSchema, op, (Long) javaValue);
+ if (javaValue instanceof Long longValue) {
+ return KuduPredicate.newComparisonPredicate(columnSchema, op, longValue);
}
- if (javaValue instanceof BigDecimal) {
- return KuduPredicate.newComparisonPredicate(columnSchema, op, (BigDecimal) javaValue);
+ if (javaValue instanceof BigDecimal bigDecimal) {
+ return KuduPredicate.newComparisonPredicate(columnSchema, op, bigDecimal);
}
- if (javaValue instanceof Integer) {
- return KuduPredicate.newComparisonPredicate(columnSchema, op, (Integer) javaValue);
+ if (javaValue instanceof Integer integerValue) {
+ return KuduPredicate.newComparisonPredicate(columnSchema, op, integerValue);
}
- if (javaValue instanceof Short) {
- return KuduPredicate.newComparisonPredicate(columnSchema, op, (Short) javaValue);
+ if (javaValue instanceof Short shortValue) {
+ return KuduPredicate.newComparisonPredicate(columnSchema, op, shortValue);
}
- if (javaValue instanceof Byte) {
- return KuduPredicate.newComparisonPredicate(columnSchema, op, (Byte) javaValue);
+ if (javaValue instanceof Byte byteValue) {
+ return KuduPredicate.newComparisonPredicate(columnSchema, op, byteValue);
}
- if (javaValue instanceof String) {
- return KuduPredicate.newComparisonPredicate(columnSchema, op, (String) javaValue);
+ if (javaValue instanceof String stringValue) {
+ return KuduPredicate.newComparisonPredicate(columnSchema, op, stringValue);
}
- if (javaValue instanceof Double) {
- return KuduPredicate.newComparisonPredicate(columnSchema, op, (Double) javaValue);
+ if (javaValue instanceof Double doubleValue) {
+ return KuduPredicate.newComparisonPredicate(columnSchema, op, doubleValue);
}
- if (javaValue instanceof Float) {
- return KuduPredicate.newComparisonPredicate(columnSchema, op, (Float) javaValue);
+ if (javaValue instanceof Float floatValue) {
+ return KuduPredicate.newComparisonPredicate(columnSchema, op, floatValue);
}
- if (javaValue instanceof Boolean) {
- return KuduPredicate.newComparisonPredicate(columnSchema, op, (Boolean) javaValue);
+ if (javaValue instanceof Boolean booleanValue) {
+ return KuduPredicate.newComparisonPredicate(columnSchema, op, booleanValue);
}
- if (javaValue instanceof byte[]) {
- return KuduPredicate.newComparisonPredicate(columnSchema, op, (byte[]) javaValue);
+ if (javaValue instanceof byte[] byteArrayValue) {
+ return KuduPredicate.newComparisonPredicate(columnSchema, op, byteArrayValue);
+ }
+ if (javaValue instanceof ByteBuffer byteBuffer) {
+ Slice slice = Slices.wrappedHeapBuffer(byteBuffer);
+ return KuduPredicate.newComparisonPredicate(columnSchema, op, slice.getBytes(0, slice.length()));
}
if (javaValue == null) {
throw new IllegalStateException("Unexpected null java value for column " + columnSchema.getName());
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java
index 35c1f639c93..64d9cfc06c6 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java
@@ -13,19 +13,14 @@
*/
package io.trino.plugin.kudu;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
-import java.util.Objects;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
-public class KuduColumnHandle
+public record KuduColumnHandle(String name, int ordinalPosition, Type type)
implements ColumnHandle
{
public static final String ROW_ID = "row_uuid";
@@ -33,40 +28,13 @@ public class KuduColumnHandle
public static final KuduColumnHandle ROW_ID_HANDLE = new KuduColumnHandle(ROW_ID, ROW_ID_POSITION, VarbinaryType.VARBINARY);
- private final String name;
- private final int ordinalPosition;
- private final Type type;
-
- @JsonCreator
- public KuduColumnHandle(
- @JsonProperty("name") String name,
- @JsonProperty("ordinalPosition") int ordinalPosition,
- @JsonProperty("type") Type type)
- {
- this.name = requireNonNull(name, "name is null");
- this.ordinalPosition = ordinalPosition;
- this.type = requireNonNull(type, "type is null");
- }
-
- @JsonProperty
- public String getName()
- {
- return name;
- }
-
- @JsonProperty
- public int getOrdinalPosition()
- {
- return ordinalPosition;
- }
-
- @JsonProperty
- public Type getType()
+ public KuduColumnHandle
{
- return type;
+ requireNonNull(name, "name is null");
+ requireNonNull(type, "type is null");
}
- public ColumnMetadata getColumnMetadata()
+ public ColumnMetadata columnMetadata()
{
return new ColumnMetadata(name, type);
}
@@ -75,38 +43,4 @@ public boolean isVirtualRowId()
{
return name.equals(ROW_ID);
}
-
- @Override
- public int hashCode()
- {
- return Objects.hash(
- name,
- ordinalPosition,
- type);
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- KuduColumnHandle other = (KuduColumnHandle) obj;
- return Objects.equals(this.name, other.name) &&
- Objects.equals(this.ordinalPosition, other.ordinalPosition) &&
- Objects.equals(this.type, other.type);
- }
-
- @Override
- public String toString()
- {
- return toStringHelper(this)
- .add("name", name)
- .add("ordinalPosition", ordinalPosition)
- .add("type", type)
- .toString();
- }
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java
index 4e16c1441e3..cb5993bfe78 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java
@@ -32,16 +32,17 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableLayout;
import io.trino.spi.connector.ConnectorTableMetadata;
-import io.trino.spi.connector.ConnectorTablePartitioning;
import io.trino.spi.connector.ConnectorTableProperties;
+import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.LimitApplicationResult;
-import io.trino.spi.connector.LocalProperty;
import io.trino.spi.connector.NotFoundException;
import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.RowChangeParadigm;
+import io.trino.spi.connector.SaveMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.expression.ConnectorExpression;
@@ -59,13 +60,16 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
+import java.util.Set;
import java.util.function.Consumer;
+import java.util.function.UnaryOperator;
import static com.google.common.base.Strings.emptyToNull;
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -73,6 +77,8 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.RowChangeParadigm.CHANGE_ONLY_UPDATED_COLUMNS;
+import static io.trino.spi.connector.SaveMode.IGNORE;
+import static io.trino.spi.connector.SaveMode.REPLACE;
import static java.util.Objects.requireNonNull;
public class KuduMetadata
@@ -99,9 +105,13 @@ public List listTables(ConnectorSession session, Optional> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
+ public Iterator streamRelationColumns(
+ ConnectorSession session,
+ Optional schemaName,
+ UnaryOperator> relationFilter)
{
- requireNonNull(prefix, "prefix is null");
+ SchemaTablePrefix prefix = schemaName.map(SchemaTablePrefix::new)
+ .orElseGet(SchemaTablePrefix::new);
List tables;
if (prefix.getTable().isEmpty()) {
@@ -111,15 +121,17 @@ public Map> listTableColumns(ConnectorSess
tables = ImmutableList.of(prefix.toSchemaTableName());
}
- ImmutableMap.Builder> columns = ImmutableMap.builder();
+ Map relationColumns = new HashMap<>();
for (SchemaTableName tableName : tables) {
- KuduTableHandle tableHandle = getTableHandle(session, tableName);
+ KuduTableHandle tableHandle = getTableHandle(session, tableName, Optional.empty(), Optional.empty());
if (tableHandle != null) {
- ConnectorTableMetadata tableMetadata = getTableMetadata(tableHandle);
- columns.put(tableName, tableMetadata.getColumns());
+ KuduTable table = tableHandle.getTable(clientSession);
+ relationColumns.put(tableName, RelationColumnsMetadata.forTable(tableName, getColumnsMetadata(table.getSchema())));
}
}
- return columns.buildOrThrow();
+ return relationFilter.apply(relationColumns.keySet()).stream()
+ .map(relationColumns::get)
+ .iterator();
}
private ColumnMetadata getColumnMetadata(ColumnSchema column)
@@ -165,13 +177,18 @@ private ConnectorTableMetadata getTableMetadata(KuduTableHandle tableHandle)
// Kudu returns empty string as a table comment by default
Optional tableComment = Optional.ofNullable(emptyToNull(table.getComment()));
- List columnsMetaList = schema.getColumns().stream()
+ List columns = getColumnsMetadata(schema);
+
+ Map properties = clientSession.getTableProperties(tableHandle);
+ return new ConnectorTableMetadata(tableHandle.getSchemaTableName(), columns, properties, tableComment);
+ }
+
+ private List getColumnsMetadata(Schema schema)
+ {
+ return schema.getColumns().stream()
.filter(column -> !column.isKey() || !column.getName().equals(ROW_ID))
.map(this::getColumnMetadata)
.collect(toImmutableList());
-
- Map properties = clientSession.getTableProperties(tableHandle);
- return new ConnectorTableMetadata(tableHandle.getSchemaTableName(), columnsMetaList, properties, tableComment);
}
@Override
@@ -180,7 +197,7 @@ public Map getColumnHandles(ConnectorSession session, Conn
KuduTableHandle tableHandle = (KuduTableHandle) connectorTableHandle;
ImmutableMap.Builder columnHandles = ImmutableMap.builder();
Schema schema = clientSession.getTableSchema(tableHandle);
- forAllColumnHandles(schema, column -> columnHandles.put(column.getName(), column));
+ forAllColumnHandles(schema, column -> columnHandles.put(column.name(), column));
return columnHandles.buildOrThrow();
}
@@ -206,12 +223,16 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
.setHidden(true)
.build();
}
- return kuduColumnHandle.getColumnMetadata();
+ return kuduColumnHandle.columnMetadata();
}
@Override
- public KuduTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
+ public KuduTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName, Optional startVersion, Optional endVersion)
{
+ if (startVersion.isPresent() || endVersion.isPresent()) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
+ }
+
try {
KuduTable table = clientSession.openTable(schemaTableName);
OptionalInt bucketCount = OptionalInt.empty();
@@ -248,12 +269,15 @@ public void dropSchema(ConnectorSession session, String schemaName, boolean casc
}
@Override
- public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
+ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, SaveMode saveMode)
{
+ if (saveMode == REPLACE) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
+ }
if (tableMetadata.getColumns().stream().anyMatch(column -> column.getComment() != null)) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with column comment");
}
- clientSession.createTable(tableMetadata, ignoreExisting);
+ clientSession.createTable(tableMetadata, saveMode == IGNORE);
}
@Override
@@ -282,7 +306,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
{
KuduTableHandle kuduTableHandle = (KuduTableHandle) tableHandle;
KuduColumnHandle kuduColumnHandle = (KuduColumnHandle) column;
- clientSession.dropColumn(kuduTableHandle.getSchemaTableName(), kuduColumnHandle.getName());
+ clientSession.dropColumn(kuduTableHandle.getSchemaTableName(), kuduColumnHandle.name());
}
@Override
@@ -290,7 +314,7 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan
{
KuduTableHandle kuduTableHandle = (KuduTableHandle) tableHandle;
KuduColumnHandle kuduColumnHandle = (KuduColumnHandle) source;
- clientSession.renameColumn(kuduTableHandle.getSchemaTableName(), kuduColumnHandle.getName(), target);
+ clientSession.renameColumn(kuduTableHandle.getSchemaTableName(), kuduColumnHandle.name(), target);
}
@Override
@@ -328,6 +352,7 @@ public Optional finishInsert(
}
@Override
+ @SuppressWarnings("deprecation")
public ConnectorOutputTableHandle beginCreateTable(
ConnectorSession session,
ConnectorTableMetadata tableMetadata,
@@ -432,14 +457,11 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
{
KuduTableHandle handle = (KuduTableHandle) table;
- Optional tablePartitioning = Optional.empty();
- List> localProperties = ImmutableList.of();
-
return new ConnectorTableProperties(
handle.getConstraint(),
- tablePartitioning,
Optional.empty(),
- localProperties);
+ Optional.empty(),
+ ImmutableList.of());
}
@Override
@@ -508,7 +530,7 @@ public Optional> applyProjecti
ImmutableList.Builder assignmentList = ImmutableList.builder();
assignments.forEach((name, column) -> {
desiredColumns.add(column);
- assignmentList.add(new Assignment(name, column, ((KuduColumnHandle) column).getType()));
+ assignmentList.add(new Assignment(name, column, ((KuduColumnHandle) column).type()));
});
handle = new KuduTableHandle(
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java
index 29dc98d338a..8ce926a56a0 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java
@@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.trino.spi.Page;
+import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorMergeSink;
import io.trino.spi.connector.ConnectorPageSink;
@@ -47,6 +48,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateType.DATE;
@@ -61,6 +63,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.kudu.util.DateUtil.epochDaysToSqlDate;
public class KuduPageSink
implements ConnectorPageSink, ConnectorMergeSink
@@ -138,8 +141,8 @@ public CompletableFuture> appendPage(Page page)
}
return NOT_BLOCKED;
}
- catch (KuduException e) {
- throw new RuntimeException(e);
+ catch (KuduException | RuntimeException e) {
+ throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
}
}
@@ -150,6 +153,9 @@ private void appendColumn(PartialRow row, Page page, int position, int channel,
if (block.isNull(position)) {
row.setNull(destChannel);
}
+ else if (DATE.equals(type)) {
+ row.addDate(destChannel, epochDaysToSqlDate(INTEGER.getInt(block, position)));
+ }
else if (TIMESTAMP_MILLIS.equals(type)) {
row.addLong(destChannel, truncateEpochMicrosToMillis(TIMESTAMP_MILLIS.getLong(block, position)));
}
@@ -174,7 +180,8 @@ else if (BOOLEAN.equals(type)) {
else if (DOUBLE.equals(type)) {
row.addDouble(destChannel, DOUBLE.getDouble(block, position));
}
- else if (type instanceof VarcharType varcharType) {
+ else if (type instanceof VarcharType) {
+ VarcharType varcharType = (VarcharType) type;
Type originalType = originalColumnTypes.get(destChannel);
if (DATE.equals(originalType)) {
SqlDate date = (SqlDate) originalType.getObjectValue(connectorSession, block, position);
@@ -189,7 +196,8 @@ else if (type instanceof VarcharType varcharType) {
else if (VARBINARY.equals(type)) {
row.addBinary(destChannel, VARBINARY.getSlice(block, position).toByteBuffer());
}
- else if (type instanceof DecimalType decimalType) {
+ else if (type instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) type;
SqlDecimal sqlDecimal = (SqlDecimal) decimalType.getObjectValue(connectorSession, block, position);
row.addDecimal(destChannel, sqlDecimal.toBigDecimal());
}
@@ -228,8 +236,8 @@ public void storeMergedRows(Page page)
try {
operationApplier.applyOperationAsync(delete);
}
- catch (KuduException e) {
- throw new RuntimeException(e);
+ catch (KuduException | RuntimeException e) {
+ throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
}
}
@@ -248,14 +256,14 @@ public void storeMergedRows(Page page)
try {
operationApplier.applyOperationAsync(insert);
}
- catch (KuduException e) {
- throw new RuntimeException(e);
+ catch (KuduException | RuntimeException e) {
+ throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
}
}
}
}
- catch (KuduException e) {
- throw new RuntimeException(e);
+ catch (KuduException | RuntimeException e) {
+ throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
}
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java
index cc686497c8c..da0949aa28e 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java
@@ -47,7 +47,7 @@ public KuduRecordSet(KuduClientSession clientSession, KuduSplit kuduSplit, List<
public List getColumnTypes()
{
return columns.stream()
- .map(column -> ((KuduColumnHandle) column).getType())
+ .map(column -> ((KuduColumnHandle) column).type())
.collect(toImmutableList());
}
@@ -63,7 +63,7 @@ public RecordCursor cursor()
builder.put(i, ROW_ID_POSITION);
}
else {
- builder.put(i, projectedSchema.getColumnIndex(handle.getName()));
+ builder.put(i, projectedSchema.getColumnIndex(handle.name()));
}
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java
index 52be6748547..4246ac1f52c 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java
@@ -13,6 +13,7 @@
*/
package io.trino.plugin.kudu;
+import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
@@ -25,16 +26,15 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import static io.trino.plugin.kudu.KuduSessionProperties.getDynamicFilteringWaitTimeout;
-import static io.trino.spi.connector.DynamicFilter.NOT_BLOCKED;
import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class KuduSplitManager
implements ConnectorSplitManager
{
+ private static final ConnectorSplitSource.ConnectorSplitBatch EMPTY_BATCH = new ConnectorSplitSource.ConnectorSplitBatch(ImmutableList.of(), false);
private final KuduClientSession clientSession;
@Inject
@@ -51,82 +51,79 @@ public ConnectorSplitSource getSplits(
DynamicFilter dynamicFilter,
Constraint constraint)
{
- long timeoutMillis = getDynamicFilteringWaitTimeout(session).toMillis();
- if (timeoutMillis == 0 || !dynamicFilter.isAwaitable()) {
- return getSplitSource(table, dynamicFilter);
- }
- CompletableFuture> dynamicFilterFuture = whenCompleted(dynamicFilter)
- .completeOnTimeout(null, timeoutMillis, MILLISECONDS);
- CompletableFuture splitSourceFuture = dynamicFilterFuture.thenApply(
- ignored -> getSplitSource(table, dynamicFilter));
- return new KuduDynamicFilteringSplitSource(dynamicFilterFuture, splitSourceFuture);
- }
-
- private ConnectorSplitSource getSplitSource(
- ConnectorTableHandle table,
- DynamicFilter dynamicFilter)
- {
- KuduTableHandle handle = (KuduTableHandle) table;
-
- List splits = clientSession.buildKuduSplits(handle, dynamicFilter);
-
- return new FixedSplitSource(splits);
- }
-
- private static CompletableFuture> whenCompleted(DynamicFilter dynamicFilter)
- {
- if (dynamicFilter.isAwaitable()) {
- return dynamicFilter.isBlocked().thenCompose(ignored -> whenCompleted(dynamicFilter));
- }
- return NOT_BLOCKED;
+ return new KuduDynamicFilteringSplitSource(session, clientSession, dynamicFilter, table);
}
private static class KuduDynamicFilteringSplitSource
implements ConnectorSplitSource
{
- private final CompletableFuture> dynamicFilterFuture;
- private final CompletableFuture splitSourceFuture;
+ private final KuduClientSession clientSession;
+ private final DynamicFilter dynamicFilter;
+ private final ConnectorTableHandle tableHandle;
+ private final long dynamicFilteringTimeoutNanos;
+ private ConnectorSplitSource delegateSplitSource;
+ private final long startNanos;
private KuduDynamicFilteringSplitSource(
- CompletableFuture> dynamicFilterFuture,
- CompletableFuture splitSourceFuture)
+ ConnectorSession connectorSession,
+ KuduClientSession clientSession,
+ DynamicFilter dynamicFilter,
+ ConnectorTableHandle tableHandle)
{
- this.dynamicFilterFuture = requireNonNull(dynamicFilterFuture, "dynamicFilterFuture is null");
- this.splitSourceFuture = requireNonNull(splitSourceFuture, "splitSourceFuture is null");
+ this.clientSession = requireNonNull(clientSession, "clientSession is null");
+ this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilterFuture is null");
+ this.tableHandle = requireNonNull(tableHandle, "splitSourceFuture is null");
+ this.dynamicFilteringTimeoutNanos = (long) getDynamicFilteringWaitTimeout(connectorSession).getValue(NANOSECONDS);
+ this.startNanos = System.nanoTime();
}
@Override
public CompletableFuture getNextBatch(int maxSize)
{
- return splitSourceFuture.thenCompose(splitSource -> splitSource.getNextBatch(maxSize));
+ CompletableFuture> blocked = dynamicFilter.isBlocked();
+ long remainingTimeoutNanos = getRemainingTimeoutNanos();
+ if (remainingTimeoutNanos > 0 && dynamicFilter.isAwaitable()) {
+ // wait for dynamic filter and yield
+ return blocked
+ .thenApply(x -> EMPTY_BATCH)
+ .completeOnTimeout(EMPTY_BATCH, remainingTimeoutNanos, NANOSECONDS);
+ }
+
+ if (delegateSplitSource == null) {
+ KuduTableHandle handle = (KuduTableHandle) tableHandle;
+
+ List splits = clientSession.buildKuduSplits(handle, dynamicFilter);
+ delegateSplitSource = new FixedSplitSource(splits);
+ }
+
+ return delegateSplitSource.getNextBatch(maxSize);
}
@Override
public void close()
{
- if (!dynamicFilterFuture.cancel(true)) {
- splitSourceFuture.thenAccept(ConnectorSplitSource::close);
+ if (delegateSplitSource != null) {
+ delegateSplitSource.close();
}
}
@Override
public boolean isFinished()
{
- if (!splitSourceFuture.isDone()) {
- return false;
- }
- if (splitSourceFuture.isCompletedExceptionally()) {
+ if (getRemainingTimeoutNanos() > 0 && dynamicFilter.isAwaitable()) {
return false;
}
- try {
- return splitSourceFuture.get().isFinished();
- }
- catch (InterruptedException | ExecutionException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- throw new RuntimeException(e);
+
+ if (delegateSplitSource != null) {
+ return delegateSplitSource.isFinished();
}
+
+ return false;
+ }
+
+ private long getRemainingTimeoutNanos()
+ {
+ return dynamicFilteringTimeoutNanos - (System.nanoTime() - startNanos);
}
}
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java
index cc63f844849..aaaa738c94b 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java
@@ -146,7 +146,7 @@ public boolean equals(Object obj)
return Objects.equals(this.schemaTableName, other.schemaTableName) &&
Objects.equals(this.constraint, other.constraint) &&
Objects.equals(this.desiredColumns, other.desiredColumns) &&
- Objects.equals(this.requiresRowId, other.requiresRowId) &&
+ this.requiresRowId == other.requiresRowId &&
Objects.equals(this.bucketCount, other.bucketCount) &&
Objects.equals(this.limit, other.limit);
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java
index 5fdd31d30ab..3c294029659 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java
@@ -50,23 +50,20 @@ private TypeHelper() {}
public static org.apache.kudu.Type toKuduClientType(Type type)
{
- if (type instanceof VarcharType) {
- return org.apache.kudu.Type.STRING;
+ if (type == BooleanType.BOOLEAN) {
+ return org.apache.kudu.Type.BOOL;
}
- if (type.equals(TIMESTAMP_MILLIS)) {
- return org.apache.kudu.Type.UNIXTIME_MICROS;
+ if (type == TinyintType.TINYINT) {
+ return org.apache.kudu.Type.INT8;
}
- if (type == BigintType.BIGINT) {
- return org.apache.kudu.Type.INT64;
+ if (type == SmallintType.SMALLINT) {
+ return org.apache.kudu.Type.INT16;
}
if (type == IntegerType.INTEGER) {
return org.apache.kudu.Type.INT32;
}
- if (type == SmallintType.SMALLINT) {
- return org.apache.kudu.Type.INT16;
- }
- if (type == TinyintType.TINYINT) {
- return org.apache.kudu.Type.INT8;
+ if (type == BigintType.BIGINT) {
+ return org.apache.kudu.Type.INT64;
}
if (type == RealType.REAL) {
return org.apache.kudu.Type.FLOAT;
@@ -74,21 +71,24 @@ public static org.apache.kudu.Type toKuduClientType(Type type)
if (type == DoubleType.DOUBLE) {
return org.apache.kudu.Type.DOUBLE;
}
- if (type == BooleanType.BOOLEAN) {
- return org.apache.kudu.Type.BOOL;
- }
- if (type instanceof VarbinaryType) {
- return org.apache.kudu.Type.BINARY;
- }
if (type instanceof DecimalType) {
return org.apache.kudu.Type.DECIMAL;
}
- if (type == DateType.DATE) {
+ if (type instanceof CharType) {
return org.apache.kudu.Type.STRING;
}
- if (type instanceof CharType) {
+ if (type instanceof VarcharType) {
return org.apache.kudu.Type.STRING;
}
+ if (type instanceof VarbinaryType) {
+ return org.apache.kudu.Type.BINARY;
+ }
+ if (type == DateType.DATE) {
+ return org.apache.kudu.Type.DATE;
+ }
+ if (type.equals(TIMESTAMP_MILLIS)) {
+ return org.apache.kudu.Type.UNIXTIME_MICROS;
+ }
throw new TrinoException(NOT_SUPPORTED, "Unsupported type: " + type);
}
@@ -100,31 +100,32 @@ public static Type fromKuduColumn(ColumnSchema column)
private static Type fromKuduClientType(org.apache.kudu.Type ktype, ColumnTypeAttributes attributes)
{
switch (ktype) {
- case STRING:
- return VarcharType.VARCHAR;
- case UNIXTIME_MICROS:
- return TIMESTAMP_MILLIS;
- case INT64:
- return BigintType.BIGINT;
- case INT32:
- return IntegerType.INTEGER;
- case INT16:
- return SmallintType.SMALLINT;
+ case BOOL:
+ return BooleanType.BOOLEAN;
case INT8:
return TinyintType.TINYINT;
+ case INT16:
+ return SmallintType.SMALLINT;
+ case INT32:
+ return IntegerType.INTEGER;
+ case INT64:
+ return BigintType.BIGINT;
case FLOAT:
return RealType.REAL;
case DOUBLE:
return DoubleType.DOUBLE;
- case BOOL:
- return BooleanType.BOOLEAN;
- case BINARY:
- return VarbinaryType.VARBINARY;
case DECIMAL:
return DecimalType.createDecimalType(attributes.getPrecision(), attributes.getScale());
- // TODO: add support for varchar and date types: https://github.com/trinodb/trino/issues/11009
- case VARCHAR:
+ case STRING:
+ return VarcharType.VARCHAR;
+ case BINARY:
+ return VarbinaryType.VARBINARY;
case DATE:
+ return DateType.DATE;
+ case UNIXTIME_MICROS:
+ return TIMESTAMP_MILLIS;
+ // TODO: add support for varchar types: https://github.com/trinodb/trino/issues/11009
+ case VARCHAR:
break;
}
throw new IllegalStateException("Kudu type not implemented for " + ktype);
@@ -132,114 +133,90 @@ private static Type fromKuduClientType(org.apache.kudu.Type ktype, ColumnTypeAtt
public static Object getJavaValue(Type type, Object nativeValue)
{
- if (type instanceof VarcharType) {
- return ((Slice) nativeValue).toStringUtf8();
- }
- if (type.equals(TIMESTAMP_MILLIS)) {
- // Kudu's native format is in microseconds
- return nativeValue;
- }
- if (type == BigintType.BIGINT) {
+ if (type == BooleanType.BOOLEAN) {
return nativeValue;
}
- if (type == IntegerType.INTEGER) {
- return ((Long) nativeValue).intValue();
+ if (type == TinyintType.TINYINT) {
+ return ((Long) nativeValue).byteValue();
}
if (type == SmallintType.SMALLINT) {
return ((Long) nativeValue).shortValue();
}
- if (type == TinyintType.TINYINT) {
- return ((Long) nativeValue).byteValue();
+ if (type == IntegerType.INTEGER) {
+ return ((Long) nativeValue).intValue();
}
- if (type == DoubleType.DOUBLE) {
+ if (type == BigintType.BIGINT) {
return nativeValue;
}
if (type == RealType.REAL) {
// conversion can result in precision lost
return intBitsToFloat(((Long) nativeValue).intValue());
}
- if (type == BooleanType.BOOLEAN) {
+ if (type == DoubleType.DOUBLE) {
return nativeValue;
}
- if (type instanceof VarbinaryType) {
- return ((Slice) nativeValue).toByteBuffer();
- }
- if (type instanceof DecimalType decimalType) {
+ if (type instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) type;
if (decimalType.isShort()) {
return new BigDecimal(BigInteger.valueOf((long) nativeValue), decimalType.getScale());
}
return new BigDecimal(((Int128) nativeValue).toBigInteger(), decimalType.getScale());
}
- throw new IllegalStateException("Back conversion not implemented for " + type);
- }
-
- public static Object getObject(Type type, RowResult row, int field)
- {
- if (row.isNull(field)) {
- return null;
- }
if (type instanceof VarcharType) {
- return row.getString(field);
- }
- if (type.equals(TIMESTAMP_MILLIS)) {
- return truncateEpochMicrosToMillis(row.getLong(field));
- }
- if (type == BigintType.BIGINT) {
- return row.getLong(field);
- }
- if (type == IntegerType.INTEGER) {
- return row.getInt(field);
- }
- if (type == SmallintType.SMALLINT) {
- return row.getShort(field);
- }
- if (type == TinyintType.TINYINT) {
- return row.getByte(field);
- }
- if (type == DoubleType.DOUBLE) {
- return row.getDouble(field);
+ return ((Slice) nativeValue).toStringUtf8();
}
- if (type == RealType.REAL) {
- return row.getFloat(field);
+ if (type instanceof VarbinaryType) {
+ return ((Slice) nativeValue).toByteBuffer();
}
- if (type == BooleanType.BOOLEAN) {
- return row.getBoolean(field);
+ if (type.equals(DateType.DATE)) {
+ return nativeValue;
}
- if (type instanceof VarbinaryType) {
- return Slices.wrappedHeapBuffer(row.getBinary(field));
+ if (type.equals(TIMESTAMP_MILLIS)) {
+ // Kudu's native format is in microseconds
+ return nativeValue;
}
+ throw new IllegalStateException("Back conversion not implemented for " + type);
+ }
+
+ public static Object getObject(Type type, RowResult row, int field)
+ {
if (type instanceof DecimalType) {
- return Decimals.encodeScaledValue(row.getDecimal(field), ((DecimalType) type).getScale());
+ DecimalType decimalType = (DecimalType) type;
+ return Decimals.encodeScaledValue(row.getDecimal(field), decimalType.getScale());
}
throw new IllegalStateException("getObject not implemented for " + type);
}
public static long getLong(Type type, RowResult row, int field)
{
- if (type.equals(TIMESTAMP_MILLIS)) {
- return truncateEpochMicrosToMillis(row.getLong(field));
+ if (type == TinyintType.TINYINT) {
+ return row.getByte(field);
}
- if (type == BigintType.BIGINT) {
- return row.getLong(field);
+ if (type == SmallintType.SMALLINT) {
+ return row.getShort(field);
}
if (type == IntegerType.INTEGER) {
return row.getInt(field);
}
- if (type == SmallintType.SMALLINT) {
- return row.getShort(field);
- }
- if (type == TinyintType.TINYINT) {
- return row.getByte(field);
+ if (type == BigintType.BIGINT) {
+ return row.getLong(field);
}
if (type == RealType.REAL) {
return floatToRawIntBits(row.getFloat(field));
}
- if (type instanceof DecimalType decimalType) {
+ if (type instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) type;
if (decimalType.isShort()) {
return row.getDecimal(field).unscaledValue().longValue();
}
throw new IllegalStateException("getLong not supported for long decimal: " + type);
}
+ if (type.equals(DateType.DATE)) {
+ return row.getInt(field);
+ }
+ if (type.equals(TIMESTAMP_MILLIS)) {
+ return truncateEpochMicrosToMillis(row.getLong(field));
+ }
throw new IllegalStateException("getLong not implemented for " + type);
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java
index cdbc1bfc8ae..3eac4e128b8 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java
@@ -15,28 +15,4 @@
import java.util.List;
-public class HashPartitionDefinition
-{
- private List columns;
- private int buckets;
-
- public List getColumns()
- {
- return columns;
- }
-
- public void setColumns(List columns)
- {
- this.columns = columns;
- }
-
- public int getBuckets()
- {
- return buckets;
- }
-
- public void setBuckets(int buckets)
- {
- this.buckets = buckets;
- }
-}
+public record HashPartitionDefinition(List columns, int buckets) {}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java
index 48c3bead663..909a3f7cc86 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java
@@ -190,9 +190,7 @@ else if (!hashColumns2.isEmpty()) {
@SuppressWarnings("unchecked")
List rangeColumns = (List) tableProperties.get(PARTITION_BY_RANGE_COLUMNS);
if (!rangeColumns.isEmpty()) {
- RangePartitionDefinition range = new RangePartitionDefinition();
- range.setColumns(rangeColumns);
- design.setRange(range);
+ design.setRange(new RangePartitionDefinition(rangeColumns));
}
return design;
@@ -234,10 +232,7 @@ private static HashPartitionDefinition getHashPartitionDefinition(Map getRangePartitions(Map tableProperties)
@@ -285,17 +280,17 @@ public static Map toMap(KuduTable table)
if (partitionDesign.getHash() != null) {
List list = partitionDesign.getHash();
if (!list.isEmpty()) {
- properties.put(PARTITION_BY_HASH_COLUMNS, list.get(0).getColumns());
- properties.put(PARTITION_BY_HASH_BUCKETS, list.get(0).getBuckets());
+ properties.put(PARTITION_BY_HASH_COLUMNS, list.get(0).columns());
+ properties.put(PARTITION_BY_HASH_BUCKETS, list.get(0).buckets());
}
if (list.size() >= 2) {
- properties.put(PARTITION_BY_HASH_COLUMNS_2, list.get(1).getColumns());
- properties.put(PARTITION_BY_HASH_BUCKETS_2, list.get(1).getBuckets());
+ properties.put(PARTITION_BY_HASH_COLUMNS_2, list.get(1).columns());
+ properties.put(PARTITION_BY_HASH_BUCKETS_2, list.get(1).buckets());
}
}
if (partitionDesign.getRange() != null) {
- properties.put(PARTITION_BY_RANGE_COLUMNS, partitionDesign.getRange().getColumns());
+ properties.put(PARTITION_BY_RANGE_COLUMNS, partitionDesign.getRange().columns());
}
String partitionRangesValue = mapper.writeValueAsString(rangePartitionList);
@@ -400,22 +395,17 @@ public static PartitionDesign getPartitionDesign(KuduTable table)
List hashPartitions = partitionSchema.getHashBucketSchemas().stream()
.map(hashBucketSchema -> {
- HashPartitionDefinition hash = new HashPartitionDefinition();
List cols = hashBucketSchema.getColumnIds().stream()
.map(idx -> schema.getColumnByIndex(idx).getName()).collect(toImmutableList());
- hash.setColumns(cols);
- hash.setBuckets(hashBucketSchema.getNumBuckets());
- return hash;
+ return new HashPartitionDefinition(cols, hashBucketSchema.getNumBuckets());
}).collect(toImmutableList());
partitionDesign.setHash(hashPartitions);
List rangeColumns = partitionSchema.getRangeSchema().getColumnIds();
if (!rangeColumns.isEmpty()) {
- RangePartitionDefinition definition = new RangePartitionDefinition();
- definition.setColumns(rangeColumns.stream()
+ partitionDesign.setRange(new RangePartitionDefinition(rangeColumns.stream()
.map(i -> schema.getColumns().get(i).getName())
- .collect(toImmutableList()));
- partitionDesign.setRange(definition);
+ .collect(toImmutableList())));
}
return partitionDesign;
@@ -426,7 +416,7 @@ public static PartialRow toRangeBoundToPartialRow(Schema schema, RangePartitionD
{
PartialRow partialRow = new PartialRow(schema);
if (boundValue != null) {
- List rangeColumns = definition.getColumns().stream()
+ List rangeColumns = definition.columns().stream()
.map(schema::getColumnIndex).collect(toImmutableList());
if (rangeColumns.size() != boundValue.getValues().size()) {
@@ -555,91 +545,53 @@ private static void handleInvalidValue(String name, Type type, Object obj)
public static ColumnSchema.CompressionAlgorithm lookupCompression(String compression)
{
- switch (compression.toLowerCase(Locale.ENGLISH)) {
- case "default":
- case "default_compression":
- return ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION;
- case "no":
- case "no_compression":
- return ColumnSchema.CompressionAlgorithm.NO_COMPRESSION;
- case "lz4":
- return ColumnSchema.CompressionAlgorithm.LZ4;
- case "snappy":
- return ColumnSchema.CompressionAlgorithm.SNAPPY;
- case "zlib":
- return ColumnSchema.CompressionAlgorithm.ZLIB;
- default:
- throw new IllegalArgumentException();
- }
+ return switch (compression.toLowerCase(Locale.ENGLISH)) {
+ case "default", "default_compression" -> ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION;
+ case "no", "no_compression" -> ColumnSchema.CompressionAlgorithm.NO_COMPRESSION;
+ case "lz4" -> ColumnSchema.CompressionAlgorithm.LZ4;
+ case "snappy" -> ColumnSchema.CompressionAlgorithm.SNAPPY;
+ case "zlib" -> ColumnSchema.CompressionAlgorithm.ZLIB;
+ default -> throw new IllegalArgumentException();
+ };
}
public static String lookupCompressionString(ColumnSchema.CompressionAlgorithm algorithm)
{
- switch (algorithm) {
- case DEFAULT_COMPRESSION:
- return "default";
- case NO_COMPRESSION:
- return "no";
- case LZ4:
- return "lz4";
- case SNAPPY:
- return "snappy";
- case ZLIB:
- return "zlib";
- default:
- return "unknown";
- }
+ return switch (algorithm) {
+ case DEFAULT_COMPRESSION -> "default";
+ case NO_COMPRESSION -> "no";
+ case LZ4 -> "lz4";
+ case SNAPPY -> "snappy";
+ case ZLIB -> "zlib";
+ default -> "unknown";
+ };
}
public static ColumnSchema.Encoding lookupEncoding(String encoding)
{
- switch (encoding.toLowerCase(Locale.ENGLISH)) {
- case "auto":
- case "auto_encoding":
- return ColumnSchema.Encoding.AUTO_ENCODING;
- case "bitshuffle":
- case "bit_shuffle":
- return ColumnSchema.Encoding.BIT_SHUFFLE;
- case "dictionary":
- case "dict_encoding":
- return ColumnSchema.Encoding.DICT_ENCODING;
- case "plain":
- case "plain_encoding":
- return ColumnSchema.Encoding.PLAIN_ENCODING;
- case "prefix":
- case "prefix_encoding":
- return ColumnSchema.Encoding.PREFIX_ENCODING;
- case "runlength":
- case "run_length":
- case "run length":
- case "rle":
- return ColumnSchema.Encoding.RLE;
- case "group_varint":
- return ColumnSchema.Encoding.GROUP_VARINT;
- default:
- throw new IllegalArgumentException();
- }
+ return switch (encoding.toLowerCase(Locale.ENGLISH)) {
+ case "auto", "auto_encoding" -> ColumnSchema.Encoding.AUTO_ENCODING;
+ case "bitshuffle", "bit_shuffle" -> ColumnSchema.Encoding.BIT_SHUFFLE;
+ case "dictionary", "dict_encoding" -> ColumnSchema.Encoding.DICT_ENCODING;
+ case "plain", "plain_encoding" -> ColumnSchema.Encoding.PLAIN_ENCODING;
+ case "prefix", "prefix_encoding" -> ColumnSchema.Encoding.PREFIX_ENCODING;
+ case "runlength", "run_length", "run length", "rle" -> ColumnSchema.Encoding.RLE;
+ case "group_varint" -> ColumnSchema.Encoding.GROUP_VARINT;
+ default -> throw new IllegalArgumentException();
+ };
}
public static String lookupEncodingString(ColumnSchema.Encoding encoding)
{
- switch (encoding) {
- case AUTO_ENCODING:
- return "auto";
- case BIT_SHUFFLE:
- return "bitshuffle";
- case DICT_ENCODING:
- return "dictionary";
- case PLAIN_ENCODING:
- return "plain";
- case PREFIX_ENCODING:
- return "prefix";
- case RLE:
- return "runlength";
- case GROUP_VARINT:
- return "group_varint";
- default:
- return "unknown";
- }
+ return switch (encoding) {
+ case AUTO_ENCODING -> "auto";
+ case BIT_SHUFFLE -> "bitshuffle";
+ case DICT_ENCODING -> "dictionary";
+ case PLAIN_ENCODING -> "plain";
+ case PREFIX_ENCODING -> "prefix";
+ case RLE -> "runlength";
+ case GROUP_VARINT -> "group_varint";
+ default -> "unknown";
+ };
}
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java
index eb06436a144..90d556dd2e4 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java
@@ -42,7 +42,7 @@ public void setRange(RangePartitionDefinition range)
public boolean hasPartitions()
{
- return hash != null && !hash.isEmpty() && !hash.get(0).getColumns().isEmpty()
- || range != null && !range.getColumns().isEmpty();
+ return hash != null && !hash.isEmpty() && !hash.get(0).columns().isEmpty()
+ || range != null && !range.columns().isEmpty();
}
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java
index b1c484b3012..f9dcad16dec 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java
@@ -36,7 +36,7 @@ public void serialize(RangeBoundValue value, JsonGenerator gen, SerializerProvid
writeValue(value.getValues().get(0), gen);
}
else {
- gen.writeStartArray(value.getValues().size());
+ gen.writeStartArray();
for (Object obj : value.getValues()) {
writeValue(obj, gen);
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java
index d8f5919c05d..f1188e09ca3 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java
@@ -13,30 +13,6 @@
*/
package io.trino.plugin.kudu.properties;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.annotation.Nullable;
-public class RangePartition
-{
- private final RangeBoundValue lower;
- private final RangeBoundValue upper;
-
- @JsonCreator
- public RangePartition(
- @JsonProperty("lower") RangeBoundValue lower,
- @JsonProperty("upper") RangeBoundValue upper)
- {
- this.lower = lower;
- this.upper = upper;
- }
-
- public RangeBoundValue getLower()
- {
- return lower;
- }
-
- public RangeBoundValue getUpper()
- {
- return upper;
- }
-}
+public record RangePartition(@Nullable RangeBoundValue lower, @Nullable RangeBoundValue upper) {}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java
index 1fdf49aabf7..1fed3bc203b 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java
@@ -13,19 +13,16 @@
*/
package io.trino.plugin.kudu.properties;
-import java.util.List;
+import com.google.common.collect.ImmutableList;
-public class RangePartitionDefinition
-{
- private List columns;
+import java.util.List;
- public List getColumns()
- {
- return columns;
- }
+import static java.util.Objects.requireNonNull;
- public void setColumns(List columns)
+public record RangePartitionDefinition(List columns)
+{
+ public RangePartitionDefinition
{
- this.columns = columns;
+ columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
}
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java
index ad3c828445d..e1647995846 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java
@@ -23,6 +23,8 @@
import static io.trino.plugin.kudu.KuduClientSession.DEFAULT_SCHEMA;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
+import static io.trino.spi.StandardErrorCode.SCHEMA_ALREADY_EXISTS;
+import static java.lang.String.format;
public class NoSchemaEmulation
implements SchemaEmulation
@@ -31,7 +33,7 @@ public class NoSchemaEmulation
public void createSchema(KuduClientWrapper client, String schemaName)
{
if (DEFAULT_SCHEMA.equals(schemaName)) {
- throw new SchemaAlreadyExistsException(schemaName);
+ throw new TrinoException(SCHEMA_ALREADY_EXISTS, format("Schema already exists: '%s'", schemaName));
}
throw new TrinoException(GENERIC_USER_ERROR, "Creating schema in Kudu connector not allowed if schema emulation is disabled.");
}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaAlreadyExistsException.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaAlreadyExistsException.java
deleted file mode 100644
index 64c9ff9ac45..00000000000
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaAlreadyExistsException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kudu.schema;
-
-import io.trino.spi.TrinoException;
-
-import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
-import static java.lang.String.format;
-
-public class SchemaAlreadyExistsException
- extends TrinoException
-{
- private final String schemaName;
-
- public SchemaAlreadyExistsException(String schemaName)
- {
- this(schemaName, format("Schema already exists: '%s'", schemaName));
- }
-
- public SchemaAlreadyExistsException(String schemaName, String message)
- {
- super(ALREADY_EXISTS, message);
- this.schemaName = schemaName;
- }
-
- public String getSchemaName()
- {
- return schemaName;
- }
-}
diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java
index 076001d11ee..c66f08b3a4f 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java
@@ -39,7 +39,9 @@
import static io.trino.plugin.kudu.KuduClientSession.DEFAULT_SCHEMA;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
+import static io.trino.spi.StandardErrorCode.SCHEMA_ALREADY_EXISTS;
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
+import static java.lang.String.format;
public class SchemaEmulationByTableNameConvention
implements SchemaEmulation
@@ -58,7 +60,7 @@ public SchemaEmulationByTableNameConvention(String commonPrefix)
public void createSchema(KuduClientWrapper client, String schemaName)
{
if (DEFAULT_SCHEMA.equals(schemaName)) {
- throw new SchemaAlreadyExistsException(schemaName);
+ throw new TrinoException(SCHEMA_ALREADY_EXISTS, format("Schema already exists: '%s'", schemaName));
}
try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientWrapper(client)) {
KuduTable schemasTable = getSchemasTable(client);
diff --git a/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java b/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java
index 2573db30616..9a111f38334 100644
--- a/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java
+++ b/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java
@@ -33,11 +33,6 @@ public static PartialRow decodePrimaryKey(Schema schema, byte[] key)
return KeyEncoder.decodePrimaryKey(schema, key);
}
- public static byte[] encodeRangePartitionKey(PartialRow row, PartitionSchema.RangeSchema rangeSchema)
- {
- return KeyEncoder.encodeRangePartitionKey(row, rangeSchema);
- }
-
public static PartialRow decodeRangePartitionKey(Schema schema, PartitionSchema partitionSchema, byte[] key)
{
return KeyEncoder.decodeRangePartitionKey(schema, partitionSchema, key);
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java
index 847a818e3ca..999b4d50642 100644
--- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java
@@ -20,7 +20,6 @@
import java.util.Optional;
-import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunnerTpch;
import static io.trino.plugin.kudu.TestKuduConnectorTest.REGION_COLUMNS;
import static io.trino.plugin.kudu.TestKuduConnectorTest.createKuduTableForWrites;
import static io.trino.plugin.kudu.TestingKuduServer.EARLIEST_TAG;
@@ -40,9 +39,10 @@ public abstract class BaseKuduConnectorSmokeTest
protected QueryRunner createQueryRunner()
throws Exception
{
- return createKuduQueryRunnerTpch(
- closeAfterClass(new TestingKuduServer(getKuduServerVersion())),
- getKuduSchemaEmulationPrefix(), REQUIRED_TPCH_TABLES);
+ return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer(getKuduServerVersion())))
+ .setKuduSchemaEmulationPrefix(getKuduSchemaEmulationPrefix())
+ .setInitialTables(REQUIRED_TPCH_TABLES)
+ .build();
}
@Override
@@ -50,16 +50,16 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_ARRAY,
- SUPPORTS_COMMENT_ON_COLUMN,
- SUPPORTS_COMMENT_ON_TABLE,
- SUPPORTS_CREATE_MATERIALIZED_VIEW,
- SUPPORTS_CREATE_VIEW,
- SUPPORTS_NEGATIVE_DATE,
- SUPPORTS_NOT_NULL_CONSTRAINT,
- SUPPORTS_RENAME_SCHEMA,
- SUPPORTS_ROW_TYPE,
- SUPPORTS_TOPN_PUSHDOWN,
- SUPPORTS_TRUNCATE -> false;
+ SUPPORTS_COMMENT_ON_COLUMN,
+ SUPPORTS_COMMENT_ON_TABLE,
+ SUPPORTS_CREATE_MATERIALIZED_VIEW,
+ SUPPORTS_CREATE_VIEW,
+ SUPPORTS_NEGATIVE_DATE,
+ SUPPORTS_NOT_NULL_CONSTRAINT,
+ SUPPORTS_RENAME_SCHEMA,
+ SUPPORTS_ROW_TYPE,
+ SUPPORTS_TOPN_PUSHDOWN,
+ SUPPORTS_TRUNCATE -> false;
default -> super.hasBehavior(connectorBehavior);
};
}
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduCreateAndInsertDataSetup.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduCreateAndInsertDataSetup.java
new file mode 100644
index 00000000000..5645162cd9c
--- /dev/null
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduCreateAndInsertDataSetup.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kudu;
+
+import io.trino.testing.datatype.ColumnSetup;
+import io.trino.testing.datatype.CreateAndInsertDataSetup;
+import io.trino.testing.sql.SqlExecutor;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.joining;
+
+public class KuduCreateAndInsertDataSetup
+ extends CreateAndInsertDataSetup
+{
+ public KuduCreateAndInsertDataSetup(SqlExecutor sqlExecutor, String tableNamePrefix)
+ {
+ super(sqlExecutor, tableNamePrefix);
+ }
+
+ @Override
+ protected String tableDefinition(List inputs)
+ {
+ return IntStream.range(0, inputs.size())
+ .mapToObj(column -> {
+ ColumnSetup input = inputs.get(column);
+ if (input.getDeclaredType().isEmpty()) {
+ return format("%s AS col_%d", input.getInputLiteral(), column);
+ }
+
+ return format("CAST(%s AS %s) AS col_%d", input.getInputLiteral(), input.getDeclaredType().get(), column);
+ })
+ .collect(joining(",\n", "AS\nSELECT\n", "\nWHERE 'with no' = 'data'"));
+ }
+}
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java
index 9494b78bca0..d83ec457ec5 100644
--- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java
@@ -14,157 +14,119 @@
package io.trino.plugin.kudu;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.net.HostAndPort;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
-import io.trino.Session;
+import io.trino.plugin.base.util.Closables;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
-import static io.airlift.testing.Closeables.closeAllSuppress;
-import static io.trino.Session.SessionBuilder;
import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.trino.testing.QueryAssertions.copyTpchTables;
import static io.trino.testing.TestingSession.testSessionBuilder;
+import static java.util.Objects.requireNonNull;
public final class KuduQueryRunnerFactory
{
private KuduQueryRunnerFactory() {}
- public static QueryRunner createKuduQueryRunner(TestingKuduServer kuduServer, Session session)
- throws Exception
+ public static Builder builder(TestingKuduServer kuduServer)
{
- QueryRunner runner = null;
- try {
- runner = DistributedQueryRunner.builder(session).build();
-
- installKuduConnector(kuduServer.getMasterAddress(), runner, session.getSchema().orElse("kudu_smoke_test"), Optional.of(""));
-
- return runner;
- }
- catch (Throwable e) {
- closeAllSuppress(e, runner);
- throw e;
- }
+ return new Builder(kuduServer);
}
- public static QueryRunner createKuduQueryRunner(TestingKuduServer kuduServer, String kuduSchema)
- throws Exception
+ public static class Builder
+ extends DistributedQueryRunner.Builder
{
- QueryRunner runner = null;
- try {
- runner = DistributedQueryRunner.builder(createSession(kuduSchema)).build();
-
- installKuduConnector(kuduServer.getMasterAddress(), runner, kuduSchema, Optional.of(""));
-
- return runner;
+ private final TestingKuduServer kuduServer;
+ private final Map connectorProperties = new HashMap<>();
+ private Optional kuduSchemaEmulationPrefix = Optional.empty();
+ private List> initialTables = ImmutableList.of();
+
+ private Builder(TestingKuduServer kuduServer)
+ {
+ super(testSessionBuilder()
+ .setCatalog("kudu")
+ .setSchema("default")
+ .build());
+ this.kuduServer = requireNonNull(kuduServer, "kuduServer is null");
}
- catch (Throwable e) {
- closeAllSuppress(e, runner);
- throw e;
- }
- }
-
- public static QueryRunner createKuduQueryRunnerTpch(TestingKuduServer kuduServer, Optional kuduSchemaEmulationPrefix, TpchTable>... tables)
- throws Exception
- {
- return createKuduQueryRunnerTpch(kuduServer, kuduSchemaEmulationPrefix, ImmutableList.copyOf(tables));
- }
-
- public static QueryRunner createKuduQueryRunnerTpch(TestingKuduServer kuduServer, Optional kuduSchemaEmulationPrefix, Iterable> tables)
- throws Exception
- {
- return createKuduQueryRunnerTpch(kuduServer, kuduSchemaEmulationPrefix, ImmutableMap.of(), ImmutableMap.of(), tables);
- }
-
- public static QueryRunner createKuduQueryRunnerTpch(
- TestingKuduServer kuduServer,
- Optional kuduSchemaEmulationPrefix,
- Map kuduSessionProperties,
- Map extraProperties,
- Iterable> tables)
- throws Exception
- {
- DistributedQueryRunner runner = null;
- try {
- String kuduSchema = kuduSchemaEmulationPrefix.isPresent() ? "tpch" : "default";
- Session session = createSession(kuduSchema, kuduSessionProperties);
- runner = DistributedQueryRunner.builder(session)
- .setExtraProperties(extraProperties)
- .build();
-
- runner.installPlugin(new TpchPlugin());
- runner.createCatalog("tpch", "tpch");
-
- installKuduConnector(kuduServer.getMasterAddress(), runner, kuduSchema, kuduSchemaEmulationPrefix);
-
- copyTpchTables(runner, "tpch", TINY_SCHEMA_NAME, session, tables);
- return runner;
+ @CanIgnoreReturnValue
+ public Builder setKuduSchemaEmulationPrefix(Optional kuduSchemaEmulationPrefix)
+ {
+ this.kuduSchemaEmulationPrefix = requireNonNull(kuduSchemaEmulationPrefix, "kuduSchemaEmulationPrefix is null");
+ return this;
}
- catch (Throwable e) {
- closeAllSuppress(e, runner);
- throw e;
- }
- }
- private static void installKuduConnector(HostAndPort masterAddress, QueryRunner runner, String kuduSchema, Optional kuduSchemaEmulationPrefix)
- {
- Map properties;
- if (kuduSchemaEmulationPrefix.isPresent()) {
- properties = ImmutableMap.of(
- "kudu.schema-emulation.enabled", "true",
- "kudu.schema-emulation.prefix", kuduSchemaEmulationPrefix.get(),
- "kudu.client.master-addresses", masterAddress.toString());
+ @CanIgnoreReturnValue
+ public Builder addConnectorProperty(String key, String value)
+ {
+ this.connectorProperties.put(key, value);
+ return this;
}
- else {
- properties = ImmutableMap.of(
- "kudu.schema-emulation.enabled", "false",
- "kudu.client.master-addresses", masterAddress.toString());
- }
-
- runner.installPlugin(new KuduPlugin());
- runner.createCatalog("kudu", "kudu", properties);
- if (kuduSchemaEmulationPrefix.isPresent()) {
- runner.execute("DROP SCHEMA IF EXISTS " + kuduSchema);
- runner.execute("CREATE SCHEMA " + kuduSchema);
+ @CanIgnoreReturnValue
+ public Builder setInitialTables(List> initialTables)
+ {
+ this.initialTables = ImmutableList.copyOf(initialTables);
+ return this;
}
- }
- public static Session createSession(String schema, Map kuduSessionProperties)
- {
- SessionBuilder builder = testSessionBuilder()
- .setCatalog("kudu")
- .setSchema(schema);
- kuduSessionProperties.forEach((k, v) -> builder.setCatalogSessionProperty("kudu", k, v));
- return builder.build();
- }
-
- public static Session createSession(String schema)
- {
- return testSessionBuilder()
- .setCatalog("kudu")
- .setSchema(schema)
- .build();
+ @Override
+ public DistributedQueryRunner build()
+ throws Exception
+ {
+ String kuduSchema = kuduSchemaEmulationPrefix.isPresent() ? "tpch" : "default";
+ amendSession(sessionBuilder -> sessionBuilder.setSchema(kuduSchema));
+ DistributedQueryRunner queryRunner = super.build();
+ try {
+ queryRunner.installPlugin(new TpchPlugin());
+ queryRunner.createCatalog("tpch", "tpch");
+
+ if (kuduSchemaEmulationPrefix.isPresent()) {
+ addConnectorProperty("kudu.schema-emulation.enabled", "true");
+ addConnectorProperty("kudu.schema-emulation.prefix", kuduSchemaEmulationPrefix.get());
+ addConnectorProperty("kudu.client.master-addresses", kuduServer.getMasterAddress().toString());
+ }
+ else {
+ addConnectorProperty("kudu.schema-emulation.enabled", "false");
+ addConnectorProperty("kudu.client.master-addresses", kuduServer.getMasterAddress().toString());
+ }
+
+ queryRunner.installPlugin(new KuduPlugin());
+ queryRunner.createCatalog("kudu", "kudu", connectorProperties);
+
+ if (kuduSchemaEmulationPrefix.isPresent()) {
+ queryRunner.execute("DROP SCHEMA IF EXISTS " + kuduSchema);
+ queryRunner.execute("CREATE SCHEMA " + kuduSchema);
+ }
+
+ copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, queryRunner.getDefaultSession(), initialTables);
+ return queryRunner;
+ }
+ catch (Throwable e) {
+ Closables.closeAllSuppress(e, queryRunner);
+ throw e;
+ }
+ }
}
public static void main(String[] args)
throws Exception
{
Logging.initialize();
- DistributedQueryRunner queryRunner = (DistributedQueryRunner) createKuduQueryRunnerTpch(
- new TestingKuduServer(),
- Optional.empty(),
- ImmutableMap.of(),
- ImmutableMap.of("http-server.http.port", "8080"),
- TpchTable.getTables());
+ DistributedQueryRunner queryRunner = builder(new TestingKuduServer())
+ .addCoordinatorProperty("http-server.http.port", "8080")
+ .setInitialTables(TpchTable.getTables())
+ .build();
+
Logger log = Logger.get(KuduQueryRunnerFactory.class);
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduTabletWaitStrategy.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduTabletWaitStrategy.java
new file mode 100644
index 00000000000..2bcf6aec80e
--- /dev/null
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduTabletWaitStrategy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kudu;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public final class KuduTabletWaitStrategy
+ extends AbstractWaitStrategy
+{
+ private final GenericContainer> master;
+
+ public KuduTabletWaitStrategy(GenericContainer> master)
+ {
+ this.master = requireNonNull(master, "master is null");
+ }
+
+ @Override
+ protected void waitUntilReady()
+ {
+ Failsafe.with(RetryPolicy.builder()
+ .withMaxDuration(startupTimeout)
+ .withMaxAttempts(Integer.MAX_VALUE) // limited by MaxDuration
+ .abortOn(e -> getExitCode().isPresent())
+ .build())
+ .run(() -> {
+ // Note: This condition requires a dependency on org.rnorth.duct-tape:duct-tape
+ if (!getRateLimiter().getWhenReady(() -> master.getLogs().contains("Registered new tserver with Master"))) {
+ // We say "timed out" immediately. Failsafe will propagate this only when timeout reached.
+ throw new ContainerLaunchException("Timed out waiting for container to register tserver");
+ }
+ });
+ }
+
+ private Optional getExitCode()
+ {
+ if (waitStrategyTarget.getContainerId() == null) {
+ // Not yet started
+ return Optional.empty();
+ }
+
+ InspectContainerResponse currentContainerInfo = waitStrategyTarget.getCurrentContainerInfo();
+ if (currentContainerInfo.getState().getStartedAt() == null) {
+ // not yet running
+ return Optional.empty();
+ }
+ // currentContainerInfo.getState().getExitCode() is present (0) even in "running" state
+ if (Boolean.TRUE.equals(currentContainerInfo.getState().getRunning())) {
+ // running
+ return Optional.empty();
+ }
+ return Optional.ofNullable(currentContainerInfo.getState().getExitCodeLong());
+ }
+}
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java
index 78580ad2e1d..ec75f4a77af 100644
--- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java
@@ -13,6 +13,7 @@
*/
package io.trino.plugin.kudu;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.Duration;
import org.junit.jupiter.api.Test;
@@ -31,7 +32,7 @@ public class TestKuduClientConfig
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(KuduClientConfig.class)
- .setMasterAddresses("")
+ .setMasterAddresses(ImmutableList.of())
.setDefaultAdminOperationTimeout(new Duration(30, SECONDS))
.setDefaultOperationTimeout(new Duration(30, SECONDS))
.setDisableStatistics(false)
@@ -45,7 +46,7 @@ public void testDefaults()
public void testExplicitPropertyMappingsWithCredentialsKey()
{
Map properties = ImmutableMap.builder()
- .put("kudu.client.master-addresses", "localhost")
+ .put("kudu.client.master-addresses", "localhost,localhost2")
.put("kudu.client.default-admin-operation-timeout", "1m")
.put("kudu.client.default-operation-timeout", "5m")
.put("kudu.client.disable-statistics", "true")
@@ -56,7 +57,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.buildOrThrow();
KuduClientConfig expected = new KuduClientConfig()
- .setMasterAddresses("localhost")
+ .setMasterAddresses(ImmutableList.of("localhost", "localhost2"))
.setDefaultAdminOperationTimeout(new Duration(1, MINUTES))
.setDefaultOperationTimeout(new Duration(5, MINUTES))
.setDisableStatistics(true)
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java
index 48363c5d2f4..ca85a1a2390 100644
--- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java
@@ -29,7 +29,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunnerTpch;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.TestingNames.randomNameSuffix;
@@ -51,10 +50,9 @@ public class TestKuduConnectorTest
protected QueryRunner createQueryRunner()
throws Exception
{
- return createKuduQueryRunnerTpch(
- closeAfterClass(new TestingKuduServer()),
- Optional.empty(),
- REQUIRED_TPCH_TABLES);
+ return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer()))
+ .setInitialTables(REQUIRED_TPCH_TABLES)
+ .build();
}
@Override
@@ -62,18 +60,18 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_ARRAY,
- SUPPORTS_COMMENT_ON_COLUMN,
- SUPPORTS_COMMENT_ON_TABLE,
- SUPPORTS_CREATE_MATERIALIZED_VIEW,
- SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT,
- SUPPORTS_CREATE_VIEW,
- SUPPORTS_NEGATIVE_DATE,
- SUPPORTS_NOT_NULL_CONSTRAINT,
- SUPPORTS_RENAME_SCHEMA,
- SUPPORTS_ROW_TYPE,
- SUPPORTS_SET_COLUMN_TYPE,
- SUPPORTS_TOPN_PUSHDOWN,
- SUPPORTS_TRUNCATE -> false;
+ SUPPORTS_COMMENT_ON_COLUMN,
+ SUPPORTS_COMMENT_ON_TABLE,
+ SUPPORTS_CREATE_MATERIALIZED_VIEW,
+ SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT,
+ SUPPORTS_CREATE_VIEW,
+ SUPPORTS_NEGATIVE_DATE,
+ SUPPORTS_NOT_NULL_CONSTRAINT,
+ SUPPORTS_RENAME_SCHEMA,
+ SUPPORTS_ROW_TYPE,
+ SUPPORTS_SET_COLUMN_TYPE,
+ SUPPORTS_TOPN_PUSHDOWN,
+ SUPPORTS_TRUNCATE -> false;
default -> super.hasBehavior(connectorBehavior);
};
}
@@ -175,7 +173,7 @@ protected MaterializedResult getDescribeOrdersResult()
.row("custkey", "bigint", extra, "")
.row("orderstatus", "varchar", extra, "")
.row("totalprice", "double", extra, "")
- .row("orderdate", "varchar", extra, "")
+ .row("orderdate", "date", extra, "")
.row("orderpriority", "varchar", extra, "")
.row("clerk", "varchar", extra, "")
.row("shippriority", "integer", extra, "")
@@ -200,7 +198,7 @@ public void testShowCreateTable()
" custkey bigint COMMENT '' WITH ( nullable = true ),\n" +
" orderstatus varchar COMMENT '' WITH ( nullable = true ),\n" +
" totalprice double COMMENT '' WITH ( nullable = true ),\n" +
- " orderdate varchar COMMENT '' WITH ( nullable = true ),\n" +
+ " orderdate date COMMENT '' WITH ( nullable = true ),\n" +
" orderpriority varchar COMMENT '' WITH ( nullable = true ),\n" +
" clerk varchar COMMENT '' WITH ( nullable = true ),\n" +
" shippriority integer COMMENT '' WITH ( nullable = true ),\n" +
@@ -537,6 +535,28 @@ public void testAddColumnWithCommentSpecialCharacter(String comment)
}
}
+ @Test
+ public void testAddColumnWithDecimal()
+ {
+ String tableName = "test_add_column_with_decimal" + randomNameSuffix();
+
+ assertUpdate("CREATE TABLE " + tableName + "(" +
+ "id INT WITH (primary_key=true), " +
+ "a_varchar VARCHAR" +
+ ") WITH (" +
+ " partition_by_hash_columns = ARRAY['id'], " +
+ " partition_by_hash_buckets = 2" +
+ ")");
+
+ assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN b_decimal decimal(14,5)");
+ assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c_decimal decimal(35,5)");
+
+ assertThat(getColumnType(tableName, "b_decimal")).isEqualTo("decimal(14,5)");
+ assertThat(getColumnType(tableName, "c_decimal")).isEqualTo("decimal(35,5)");
+
+ assertUpdate("DROP TABLE " + tableName);
+ }
+
@Test
public void testInsertIntoTableHavingRowUuid()
{
@@ -603,8 +623,6 @@ public void testInsertHighestUnicodeCharacter()
public void testInsertNegativeDate()
{
// TODO Remove this overriding test once kudu connector can create tables with default partitions
- // TODO Update this test once kudu connector supports DATE type: https://github.com/trinodb/trino/issues/11009
- // DATE type is not supported by Kudu connector
try (TestTable table = new TestTable(getQueryRunner()::execute, "insert_date",
"(dt DATE WITH (primary_key=true)) " +
"WITH (partition_by_hash_columns = ARRAY['dt'], partition_by_hash_buckets = 2)")) {
@@ -615,7 +633,7 @@ public void testInsertNegativeDate()
@Override
protected String errorMessageForInsertNegativeDate(String date)
{
- return "Insert query has mismatched column types: Table: \\[varchar\\], Query: \\[date\\]";
+ return "Date value <-719893>} is out of range '0001-01-01':'9999-12-31'";
}
@Test
@@ -738,22 +756,19 @@ public void testCreateTableAsSelectNegativeDate()
// Map date column type to varchar
String tableName = "negative_date_" + randomNameSuffix();
- try {
- assertUpdate(format("CREATE TABLE %s AS SELECT DATE '-0001-01-01' AS dt", tableName), 1);
- assertQuery("SELECT * FROM " + tableName, "VALUES '-0001-01-01'");
- assertQuery(format("SELECT * FROM %s WHERE dt = '-0001-01-01'", tableName), "VALUES '-0001-01-01'");
- }
- finally {
- assertUpdate("DROP TABLE IF EXISTS " + tableName);
- }
+ abort("TODO: implement the test for Kudu");
}
@Test
@Override
+ @SuppressWarnings("deprecation")
public void testDateYearOfEraPredicate()
{
- assertThatThrownBy(super::testDateYearOfEraPredicate)
- .hasStackTraceContaining("Cannot apply operator: varchar = date");
+ // Override because the connector throws an exception instead of an empty result when the value is out of supported range
+ assertQuery("SELECT orderdate FROM orders WHERE orderdate = DATE '1997-09-14'", "VALUES DATE '1997-09-14'");
+ // TODO Replace failure with a TrinoException
+ assertThatThrownBy(() -> query("SELECT * FROM orders WHERE orderdate = DATE '-1996-09-14'"))
+ .hasMessageContaining("integer value out of range for Type: date column: -1448295");
}
@Test
@@ -1013,13 +1028,17 @@ protected Optional filterDataMappingSmokeTestData(DataMapp
return Optional.of(dataMappingTestSetup.asUnsupported());
}
- if (typeName.equals("date") // date gets stored as varchar
- || typeName.equals("varbinary") // TODO (https://github.com/trinodb/trino/issues/3416)
+ if (typeName.equals("varbinary") // TODO (https://github.com/trinodb/trino/issues/3416)
|| (typeName.startsWith("char") && dataMappingTestSetup.getSampleValueLiteral().contains(" "))) { // TODO: https://github.com/trinodb/trino/issues/3597
// TODO this should either work or fail cleanly
return Optional.empty();
}
+ if (typeName.equals("date") && dataMappingTestSetup.getSampleValueLiteral().equals("DATE '1582-10-05'")) {
+ // Kudu connector returns +10 days during julian->gregorian switch. The test case exists in TestKuduTypeMapping.testDate().
+ return Optional.empty();
+ }
+
return Optional.of(dataMappingTestSetup);
}
@@ -1067,6 +1086,12 @@ protected void verifyColumnNameLengthFailurePermissible(Throwable e)
assertThat(e).hasMessageContaining("invalid column name: identifier");
}
+ @Override
+ protected String errorMessageForCreateTableAsSelectNegativeDate(String date)
+ {
+ return ".*Date value <-719893>} is out of range '0001-01-01':'9999-12-31'.*";
+ }
+
private void assertTableProperty(String tableProperties, String key, String regexValue)
{
assertThat(Pattern.compile(key + "\\s*=\\s*" + regexValue + ",?\\s+").matcher(tableProperties).find())
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java
index c2420d76be8..87c98224ab1 100644
--- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java
@@ -22,7 +22,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
-import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.offset;
@@ -49,7 +48,7 @@ public class TestKuduIntegrationDecimalColumns
protected QueryRunner createQueryRunner()
throws Exception
{
- return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), "decimal");
+ return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())).build();
}
@AfterAll
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java
index a6b0c6be87c..480c80645c1 100644
--- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java
@@ -13,7 +13,6 @@
*/
package io.trino.plugin.kudu;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import io.opentelemetry.api.trace.Span;
@@ -33,7 +32,6 @@
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResultWithQueryId;
import io.trino.testing.QueryRunner;
-import io.trino.tpch.TpchTable;
import io.trino.transaction.TransactionId;
import io.trino.transaction.TransactionManager;
import org.intellij.lang.annotations.Language;
@@ -45,15 +43,17 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
-import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunnerTpch;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST;
import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy.NONE;
+import static io.trino.tpch.TpchTable.LINE_ITEM;
+import static io.trino.tpch.TpchTable.ORDERS;
+import static io.trino.tpch.TpchTable.PART;
+import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
public class TestKuduIntegrationDynamicFilter
@@ -63,14 +63,13 @@ public class TestKuduIntegrationDynamicFilter
protected QueryRunner createQueryRunner()
throws Exception
{
- return createKuduQueryRunnerTpch(
- closeAfterClass(new TestingKuduServer()),
- Optional.of(""),
- ImmutableMap.of("dynamic_filtering_wait_timeout", "1h"),
- ImmutableMap.of(
- "dynamic-filtering.small.max-distinct-values-per-driver", "100",
- "dynamic-filtering.small.range-row-limit-per-driver", "100"),
- TpchTable.getTables());
+ return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer()))
+ .setKuduSchemaEmulationPrefix(Optional.of(""))
+ .addConnectorProperty("kudu.dynamic-filtering.wait-timeout", "1h")
+ .addExtraProperty("dynamic-filtering.small.max-distinct-values-per-driver", "100")
+ .addExtraProperty("dynamic-filtering.small.range-row-limit-per-driver", "100")
+ .setInitialTables(List.of(LINE_ITEM, ORDERS, PART))
+ .build();
}
@Test
@@ -88,19 +87,32 @@ public void testIncompleteDynamicFilterTimeout()
QualifiedObjectName tableName = new QualifiedObjectName("kudu", "tpch", "orders");
Optional tableHandle = runner.getMetadata().getTableHandle(session, tableName);
assertThat(tableHandle.isPresent()).isTrue();
- SplitSource splitSource = runner.getSplitManager()
- .getSplits(session, Span.getInvalid(), tableHandle.get(), new IncompleteDynamicFilter(), alwaysTrue());
- List splits = new ArrayList<>();
- while (!splitSource.isFinished()) {
- splits.addAll(splitSource.getNextBatch(1000).get().getSplits());
+ CompletableFuture dynamicFilterBlocked = new CompletableFuture<>();
+ try {
+ SplitSource splitSource = runner.getSplitManager()
+ .getSplits(session, Span.getInvalid(), tableHandle.get(), new BlockedDynamicFilter(dynamicFilterBlocked), alwaysTrue());
+ List splits = new ArrayList<>();
+ while (!splitSource.isFinished()) {
+ splits.addAll(splitSource.getNextBatch(1000).get().getSplits());
+ }
+ splitSource.close();
+ assertThat(splits.isEmpty()).isFalse();
+ }
+ finally {
+ dynamicFilterBlocked.complete(null);
}
- splitSource.close();
- assertThat(splits.isEmpty()).isFalse();
}
- private static class IncompleteDynamicFilter
+ private static class BlockedDynamicFilter
implements DynamicFilter
{
+ private final CompletableFuture> isBlocked;
+
+ public BlockedDynamicFilter(CompletableFuture> isBlocked)
+ {
+ this.isBlocked = requireNonNull(isBlocked, "isBlocked is null");
+ }
+
@Override
public Set getColumnsCovered()
{
@@ -110,14 +122,7 @@ public Set getColumnsCovered()
@Override
public CompletableFuture> isBlocked()
{
- return CompletableFuture.runAsync(() -> {
- try {
- TimeUnit.HOURS.sleep(1);
- }
- catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
- });
+ return isBlocked;
}
@Override
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java
index 3c0cb1b917a..fde6eeffd8a 100644
--- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java
@@ -19,7 +19,6 @@
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.Test;
-import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner;
import static org.assertj.core.api.Assertions.assertThat;
public class TestKuduIntegrationHashPartitioning
@@ -29,7 +28,7 @@ public class TestKuduIntegrationHashPartitioning
protected QueryRunner createQueryRunner()
throws Exception
{
- return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), "hash");
+ return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())).build();
}
@Test
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java
index c20bddf6564..d87be9f3e72 100644
--- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java
@@ -18,7 +18,6 @@
import io.trino.testing.QueryRunner;
import org.junit.jupiter.api.Test;
-import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail;
@@ -36,7 +35,7 @@ public class TestKuduIntegrationIntegerColumns
protected QueryRunner createQueryRunner()
throws Exception
{
- return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), "test_integer");
+ return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())).build();
}
@Test
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java
index 0e2975b1d0e..6ac167647df 100644
--- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java
@@ -18,7 +18,6 @@
import io.trino.testing.QueryRunner;
import org.junit.jupiter.api.Test;
-import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner;
import static java.lang.String.join;
import static org.assertj.core.api.Assertions.assertThat;
@@ -84,7 +83,7 @@ public class TestKuduIntegrationRangePartitioning
protected QueryRunner createQueryRunner()
throws Exception
{
- return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), "range_partitioning");
+ return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())).build();
}
@Test
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java
new file mode 100644
index 00000000000..57060be2b20
--- /dev/null
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kudu;
+
+import io.trino.Session;
+import io.trino.spi.type.TimeZoneKey;
+import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.TestingSession;
+import io.trino.testing.datatype.CreateAsSelectDataSetup;
+import io.trino.testing.datatype.DataSetup;
+import io.trino.testing.datatype.SqlDataTypeTest;
+import io.trino.testing.sql.TrinoSqlExecutor;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.function.Function;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DateType.DATE;
+import static io.trino.spi.type.DecimalType.createDecimalType;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.SmallintType.SMALLINT;
+import static io.trino.spi.type.TimestampType.createTimestampType;
+import static io.trino.spi.type.TinyintType.TINYINT;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.lang.String.format;
+import static java.time.ZoneOffset.UTC;
+
+final class TestKuduTypeMapping
+ extends AbstractTestQueryFramework
+{
+ private final ZoneId jvmZone = ZoneId.systemDefault();
+ private final LocalDateTime timeGapInJvmZone1 = LocalDateTime.of(1970, 1, 1, 0, 13, 42);
+ private final LocalDateTime timeGapInJvmZone2 = LocalDateTime.of(2018, 4, 1, 2, 13, 55, 123_000_000);
+ private final LocalDateTime timeDoubledInJvmZone = LocalDateTime.of(2018, 10, 28, 1, 33, 17, 456_000_000);
+
+ // no DST in 1970, but has DST in later years (e.g. 2018)
+ private final ZoneId vilnius = ZoneId.of("Europe/Vilnius");
+ private final LocalDateTime timeGapInVilnius = LocalDateTime.of(2018, 3, 25, 3, 17, 17);
+ private final LocalDateTime timeDoubledInVilnius = LocalDateTime.of(2018, 10, 28, 3, 33, 33, 333_000_000);
+
+ // minutes offset change since 1970-01-01, no DST
+ private final ZoneId kathmandu = ZoneId.of("Asia/Kathmandu");
+ private final LocalDateTime timeGapInKathmandu = LocalDateTime.of(1986, 1, 1, 0, 13, 7);
+
+ @BeforeAll
+ public void setUp()
+ {
+ checkState(jvmZone.getId().equals("America/Bahia_Banderas"), "This test assumes certain JVM time zone");
+ LocalDate dateOfLocalTimeChangeForwardAtMidnightInJvmZone = LocalDate.of(1970, 1, 1);
+ checkIsGap(jvmZone, dateOfLocalTimeChangeForwardAtMidnightInJvmZone.atStartOfDay());
+ checkIsGap(jvmZone, timeGapInJvmZone1);
+ checkIsGap(jvmZone, timeGapInJvmZone2);
+ checkIsDoubled(jvmZone, timeDoubledInJvmZone);
+
+ LocalDate dateOfLocalTimeChangeForwardAtMidnightInSomeZone = LocalDate.of(1983, 4, 1);
+ checkIsGap(vilnius, dateOfLocalTimeChangeForwardAtMidnightInSomeZone.atStartOfDay());
+ LocalDate dateOfLocalTimeChangeBackwardAtMidnightInSomeZone = LocalDate.of(1983, 10, 1);
+ checkIsDoubled(vilnius, dateOfLocalTimeChangeBackwardAtMidnightInSomeZone.atStartOfDay().minusMinutes(1));
+ checkIsGap(vilnius, timeGapInVilnius);
+ checkIsDoubled(vilnius, timeDoubledInVilnius);
+
+ checkIsGap(kathmandu, timeGapInKathmandu);
+ }
+
+ private static void checkIsGap(ZoneId zone, LocalDateTime dateTime)
+ {
+ verify(isGap(zone, dateTime), "Expected %s to be a gap in %s", dateTime, zone);
+ }
+
+ private static boolean isGap(ZoneId zone, LocalDateTime dateTime)
+ {
+ return zone.getRules().getValidOffsets(dateTime).isEmpty();
+ }
+
+ private static void checkIsDoubled(ZoneId zone, LocalDateTime dateTime)
+ {
+ verify(zone.getRules().getValidOffsets(dateTime).size() == 2, "Expected %s to be doubled in %s", dateTime, zone);
+ }
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())).build();
+ }
+
+ @Test
+ void testBoolean()
+ {
+ SqlDataTypeTest.create()
+ .addRoundTrip("boolean", "NULL", BOOLEAN, "CAST(NULL AS BOOLEAN)")
+ .addRoundTrip("boolean", "true", BOOLEAN)
+ .addRoundTrip("boolean", "false", BOOLEAN)
+ .execute(getQueryRunner(), trinoCreateAsSelect("test_boolean"))
+ .execute(getQueryRunner(), trinoCreateAndInsert("test_boolean"));
+ }
+
+ @Test
+ void testTinyint()
+ {
+ SqlDataTypeTest.create()
+ .addRoundTrip("tinyint", "NULL", TINYINT, "CAST(NULL AS TINYINT)")
+ .addRoundTrip("tinyint", "-128", TINYINT, "TINYINT '-128'")
+ .addRoundTrip("tinyint", "5", TINYINT, "TINYINT '5'")
+ .addRoundTrip("tinyint", "127", TINYINT, "TINYINT '127'")
+ .execute(getQueryRunner(), trinoCreateAsSelect("test_tinyint"))
+ .execute(getQueryRunner(), trinoCreateAndInsert("test_tinyint"));
+ }
+
+ @Test
+ void testSmallint()
+ {
+ SqlDataTypeTest.create()
+ .addRoundTrip("smallint", "NULL", SMALLINT, "CAST(NULL AS SMALLINT)")
+ .addRoundTrip("smallint", "-32768", SMALLINT, "SMALLINT '-32768'")
+ .addRoundTrip("smallint", "32456", SMALLINT, "SMALLINT '32456'")
+ .addRoundTrip("smallint", "32767", SMALLINT, "SMALLINT '32767'")
+ .execute(getQueryRunner(), trinoCreateAsSelect("test_smallint"))
+ .execute(getQueryRunner(), trinoCreateAndInsert("test_smallint"));
+ }
+
+ @Test
+ void testInt()
+ {
+ SqlDataTypeTest.create()
+ .addRoundTrip("int", "NULL", INTEGER, "CAST(NULL AS INTEGER)")
+ .addRoundTrip("int", "-2147483648", INTEGER, "-2147483648")
+ .addRoundTrip("int", "1234567890", INTEGER, "1234567890")
+ .addRoundTrip("int", "2147483647", INTEGER, "2147483647")
+ .execute(getQueryRunner(), trinoCreateAsSelect("test_int"))
+ .execute(getQueryRunner(), trinoCreateAndInsert("test_int"));
+ }
+
+ @Test
+ void testBigint()
+ {
+ SqlDataTypeTest.create()
+ .addRoundTrip("bigint", "NULL", BIGINT, "CAST(NULL AS BIGINT)")
+ .addRoundTrip("bigint", "-9223372036854775808", BIGINT, "-9223372036854775808")
+ .addRoundTrip("bigint", "123456789012", BIGINT, "123456789012")
+ .addRoundTrip("bigint", "9223372036854775807", BIGINT, "9223372036854775807")
+ .execute(getQueryRunner(), trinoCreateAsSelect("test_bigint"))
+ .execute(getQueryRunner(), trinoCreateAndInsert("test_bigint"));
+ }
+
+ @Test
+ void testReal()
+ {
+ SqlDataTypeTest.create()
+ .addRoundTrip("real", "NULL", REAL, "CAST(NULL AS REAL)")
+ .addRoundTrip("real", "12.5", REAL, "REAL '12.5'")
+ .addRoundTrip("real", "nan()", REAL, "CAST(nan() AS REAL)")
+ .addRoundTrip("real", "-infinity()", REAL, "CAST(-infinity() AS REAL)")
+ .addRoundTrip("real", "+infinity()", REAL, "CAST(+infinity() AS REAL)")
+ .execute(getQueryRunner(), trinoCreateAsSelect("test_real"))
+ .execute(getQueryRunner(), trinoCreateAndInsert("test_real"));
+ }
+
+ @Test
+ void testDouble()
+ {
+ SqlDataTypeTest.create()
+ .addRoundTrip("double", "NULL", DOUBLE, "CAST(NULL AS DOUBLE)")
+ .addRoundTrip("double", "3.1415926835", DOUBLE, "DOUBLE '3.1415926835'")
+ .addRoundTrip("double", "1.79769E308", DOUBLE, "DOUBLE '1.79769E308'")
+ .addRoundTrip("double", "2.225E-307", DOUBLE, "DOUBLE '2.225E-307'")
+ .execute(getQueryRunner(), trinoCreateAsSelect("trino_test_double"))
+ .execute(getQueryRunner(), trinoCreateAndInsert("trino_test_double"));
+ }
+
+ @Test
+ void testDecimal()
+ {
+ SqlDataTypeTest.create()
+ .addRoundTrip("decimal(3, 0)", "CAST(NULL AS decimal(3, 0))", createDecimalType(3, 0), "CAST(NULL AS decimal(3, 0))")
+ .addRoundTrip("decimal(3, 0)", "CAST('193' AS decimal(3, 0))", createDecimalType(3, 0), "CAST('193' AS decimal(3, 0))")
+ .addRoundTrip("decimal(3, 0)", "CAST('19' AS decimal(3, 0))", createDecimalType(3, 0), "CAST('19' AS decimal(3, 0))")
+ .addRoundTrip("decimal(3, 0)", "CAST('-193' AS decimal(3, 0))", createDecimalType(3, 0), "CAST('-193' AS decimal(3, 0))")
+ .addRoundTrip("decimal(3, 1)", "CAST('10.0' AS decimal(3, 1))", createDecimalType(3, 1), "CAST('10.0' AS decimal(3, 1))")
+ .addRoundTrip("decimal(3, 1)", "CAST('10.1' AS decimal(3, 1))", createDecimalType(3, 1), "CAST('10.1' AS decimal(3, 1))")
+ .addRoundTrip("decimal(3, 1)", "CAST('-10.1' AS decimal(3, 1))", createDecimalType(3, 1), "CAST('-10.1' AS decimal(3, 1))")
+ .addRoundTrip("decimal(4, 2)", "CAST('2' AS decimal(4, 2))", createDecimalType(4, 2), "CAST('2' AS decimal(4, 2))")
+ .addRoundTrip("decimal(4, 2)", "CAST('2.3' AS decimal(4, 2))", createDecimalType(4, 2), "CAST('2.3' AS decimal(4, 2))")
+ .addRoundTrip("decimal(24, 2)", "CAST('2' AS decimal(24, 2))", createDecimalType(24, 2), "CAST('2' AS decimal(24, 2))")
+ .addRoundTrip("decimal(24, 2)", "CAST('2.3' AS decimal(24, 2))", createDecimalType(24, 2), "CAST('2.3' AS decimal(24, 2))")
+ .addRoundTrip("decimal(24, 2)", "CAST('123456789.3' AS decimal(24, 2))", createDecimalType(24, 2), "CAST('123456789.3' AS decimal(24, 2))")
+ .addRoundTrip("decimal(24, 4)", "CAST('12345678901234567890.31' AS decimal(24, 4))", createDecimalType(24, 4), "CAST('12345678901234567890.31' AS decimal(24, 4))")
+ .addRoundTrip("decimal(30, 5)", "CAST('3141592653589793238462643.38327' AS decimal(30, 5))", createDecimalType(30, 5), "CAST('3141592653589793238462643.38327' AS decimal(30, 5))")
+ .addRoundTrip("decimal(30, 5)", "CAST('-3141592653589793238462643.38327' AS decimal(30, 5))", createDecimalType(30, 5), "CAST('-3141592653589793238462643.38327' AS decimal(30, 5))")
+ .addRoundTrip("decimal(38, 0)", "CAST(NULL AS decimal(38, 0))", createDecimalType(38, 0), "CAST(NULL AS decimal(38, 0))")
+ .addRoundTrip("decimal(38, 0)", "CAST('27182818284590452353602874713526624977' AS decimal(38, 0))", createDecimalType(38, 0), "CAST('27182818284590452353602874713526624977' AS decimal(38, 0))")
+ .addRoundTrip("decimal(38, 0)", "CAST('-27182818284590452353602874713526624977' AS decimal(38, 0))", createDecimalType(38, 0), "CAST('-27182818284590452353602874713526624977' AS decimal(38, 0))")
+ .addRoundTrip("decimal(38, 38)", "CAST('0.27182818284590452353602874713526624977' AS decimal(38, 38))", createDecimalType(38, 38), "CAST('0.27182818284590452353602874713526624977' AS decimal(38, 38))")
+ .execute(getQueryRunner(), trinoCreateAsSelect("trino_test_decimal"))
+ .execute(getQueryRunner(), trinoCreateAndInsert("trino_test_decimal"));
+ }
+
+ @Test
+ void testVarchar()
+ {
+ SqlDataTypeTest.create()
+ .addRoundTrip("varchar", "NULL", VARCHAR, "CAST(NULL AS varchar)")
+ .addRoundTrip("varchar", "'text_a'", VARCHAR, "CAST('text_a' AS varchar)")
+ .addRoundTrip("varchar", "'text_b'", VARCHAR, "CAST('text_b' AS varchar)")
+ .addRoundTrip("varchar", "'攻殻機動隊'", VARCHAR, "CAST('攻殻機動隊' AS varchar)")
+ .addRoundTrip("varchar", "'攻殻機動隊'", VARCHAR, "CAST('攻殻機動隊' AS varchar)")
+ .addRoundTrip("varchar", "'😂'", VARCHAR, "CAST('😂' AS varchar)")
+ .addRoundTrip("varchar", "'Ну, погоди!'", VARCHAR, "CAST('Ну, погоди!' AS varchar)")
+ .execute(getQueryRunner(), trinoCreateAndInsert("test_varchar"))
+ .execute(getQueryRunner(), trinoCreateAsSelect("test_varchar"));
+ }
+
+ @Test
+ void testVarbinary()
+ {
+ SqlDataTypeTest.create()
+ .addRoundTrip("varbinary", "NULL", VARBINARY, "CAST(NULL AS varbinary)")
+ .addRoundTrip("varbinary", "X''", VARBINARY, "X''")
+ .addRoundTrip("varbinary", "X'68656C6C6F'", VARBINARY, "to_utf8('hello')")
+ .addRoundTrip("varbinary", "X'5069C4996B6E6120C582C4856B61207720E69DB1E4BAACE983BD'", VARBINARY, "to_utf8('Piękna łąka w 東京都')")
+ .addRoundTrip("varbinary", "X'4261672066756C6C206F6620F09F92B0'", VARBINARY, "to_utf8('Bag full of 💰')")
+ .addRoundTrip("varbinary", "X'0001020304050607080DF9367AA7000000'", VARBINARY, "X'0001020304050607080DF9367AA7000000'") // non-text
+ .addRoundTrip("varbinary", "X'000000000000'", VARBINARY, "X'000000000000'")
+ .execute(getQueryRunner(), trinoCreateAsSelect("test_varbinary"))
+ .execute(getQueryRunner(), trinoCreateAndInsert("test_varbinary"));
+ }
+
+ @Test
+ void testDate()
+ {
+ testDate(UTC);
+ testDate(jvmZone);
+ // using two non-JVM zones
+ testDate(vilnius);
+ testDate(kathmandu);
+ testDate(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId());
+ }
+
+ private void testDate(ZoneId sessionZone)
+ {
+ Session session = Session.builder(getSession())
+ .setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId()))
+ .build();
+
+ dateTest(inputLiteral -> format("DATE %s", inputLiteral))
+ .execute(getQueryRunner(), session, trinoCreateAsSelect(session, "test_date"))
+ .execute(getQueryRunner(), session, trinoCreateAsSelect("test_date"))
+ .execute(getQueryRunner(), session, trinoCreateAndInsert(session, "test_date"))
+ .execute(getQueryRunner(), session, trinoCreateAndInsert("test_date"));
+ }
+
+ private static SqlDataTypeTest dateTest(Function inputLiteralFactory)
+ {
+ return SqlDataTypeTest.create()
+ .addRoundTrip("date", "NULL", DATE, "CAST(NULL AS DATE)")
+ .addRoundTrip("date", inputLiteralFactory.apply("'0001-01-01'"), DATE, "DATE '0001-01-01'") // mon value in Kudu
+ .addRoundTrip("date", inputLiteralFactory.apply("'1582-10-04'"), DATE, "DATE '1582-10-04'") // before julian->gregorian switch
+ .addRoundTrip("date", inputLiteralFactory.apply("'1582-10-05'"), DATE, "DATE '1582-10-15'") // begin julian->gregorian switch
+ .addRoundTrip("date", inputLiteralFactory.apply("'1582-10-14'"), DATE, "DATE '1582-10-24'") // end julian->gregorian switch
+ .addRoundTrip("date", inputLiteralFactory.apply("'1952-04-03'"), DATE, "DATE '1952-04-03'") // before epoch
+ .addRoundTrip("date", inputLiteralFactory.apply("'1970-01-01'"), DATE, "DATE '1970-01-01'")
+ .addRoundTrip("date", inputLiteralFactory.apply("'1970-02-03'"), DATE, "DATE '1970-02-03'")
+ .addRoundTrip("date", inputLiteralFactory.apply("'1983-04-01'"), DATE, "DATE '1983-04-01'")
+ .addRoundTrip("date", inputLiteralFactory.apply("'1983-10-01'"), DATE, "DATE '1983-10-01'")
+ .addRoundTrip("date", inputLiteralFactory.apply("'2017-07-01'"), DATE, "DATE '2017-07-01'") // summer on northern hemisphere (possible DST)
+ .addRoundTrip("date", inputLiteralFactory.apply("'2017-01-01'"), DATE, "DATE '2017-01-01'") // winter on northern hemisphere (possible DST on southern hemisphere)
+ .addRoundTrip("date", inputLiteralFactory.apply("'9999-12-31'"), DATE, "DATE '9999-12-31'"); // max value in Kudu
+ }
+
+ @Test
+ void testTimestamp()
+ {
+ testTimestamp(UTC);
+ testTimestamp(jvmZone);
+ // using two non-JVM zones
+ testTimestamp(vilnius);
+ testTimestamp(kathmandu);
+ testTimestamp(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId());
+ }
+
+ private void testTimestamp(ZoneId sessionZone)
+ {
+ Session session = Session.builder(getSession())
+ .setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId()))
+ .build();
+
+ SqlDataTypeTest.create()
+ .addRoundTrip("timestamp(3)", "TIMESTAMP '1958-01-01 13:18:03.123'", createTimestampType(3), "TIMESTAMP '1958-01-01 13:18:03.123'")
+ .addRoundTrip("timestamp(3)", "TIMESTAMP '2019-03-18 10:01:17.987'", createTimestampType(3), "TIMESTAMP '2019-03-18 10:01:17.987'")
+ .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-10-28 01:33:17.456'", createTimestampType(3), "TIMESTAMP '2018-10-28 01:33:17.456'")
+ .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-10-28 03:33:33.333'", createTimestampType(3), "TIMESTAMP '2018-10-28 03:33:33.333'")
+
+ // epoch also is a gap in JVM zone
+ .addRoundTrip("timestamp(3)", "TIMESTAMP '1970-01-01 00:00:00.000'", createTimestampType(3), "TIMESTAMP '1970-01-01 00:00:00.000'")
+
+ .addRoundTrip("timestamp(3)", "TIMESTAMP '1970-01-01 00:13:42.000'", createTimestampType(3), "TIMESTAMP '1970-01-01 00:13:42.000'")
+ .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-04-01 02:13:55.123'", createTimestampType(3), "TIMESTAMP '2018-04-01 02:13:55.123'")
+ .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-03-25 03:17:17.000'", createTimestampType(3), "TIMESTAMP '2018-03-25 03:17:17.000'")
+ .addRoundTrip("timestamp(3)", "TIMESTAMP '1986-01-01 00:13:07.000'", createTimestampType(3), "TIMESTAMP '1986-01-01 00:13:07.000'")
+ .execute(getQueryRunner(), session, trinoCreateAsSelect(session, "test_timestamp"))
+ .execute(getQueryRunner(), session, trinoCreateAsSelect("test_timestamp"))
+ .execute(getQueryRunner(), session, trinoCreateAndInsert(session, "test_timestamp"))
+ .execute(getQueryRunner(), session, trinoCreateAndInsert("test_timestamp"));
+ }
+
+ private DataSetup trinoCreateAsSelect(String tableNamePrefix)
+ {
+ return trinoCreateAsSelect(getSession(), tableNamePrefix);
+ }
+
+ private DataSetup trinoCreateAsSelect(Session session, String tableNamePrefix)
+ {
+ return new CreateAsSelectDataSetup(new TrinoSqlExecutor(getQueryRunner(), session), tableNamePrefix);
+ }
+
+ private DataSetup trinoCreateAndInsert(String tableNamePrefix)
+ {
+ return trinoCreateAndInsert(getSession(), tableNamePrefix);
+ }
+
+ private DataSetup trinoCreateAndInsert(Session session, String tableNamePrefix)
+ {
+ return new KuduCreateAndInsertDataSetup(new TrinoSqlExecutor(getQueryRunner(), session), tableNamePrefix);
+ }
+}
diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java
index 0cee1c087bf..c14584fdbc0 100644
--- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java
+++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java
@@ -32,7 +32,7 @@ public class TestingKuduServer
{
private static final String KUDU_IMAGE = "apache/kudu";
public static final String EARLIEST_TAG = "1.13.0";
- public static final String LATEST_TAG = "1.15.0";
+ public static final String LATEST_TAG = "1.17";
private static final Integer KUDU_MASTER_PORT = 7051;
private static final Integer KUDU_TSERVER_PORT = 7050;
@@ -61,14 +61,11 @@ public TestingKuduServer()
public TestingKuduServer(String kuduVersion)
{
network = Network.newNetwork();
-
- String hostIP = getHostIPAddress();
-
String masterContainerAlias = "kudu-master";
this.master = new GenericContainer<>(format("%s:%s", KUDU_IMAGE, kuduVersion))
.withExposedPorts(KUDU_MASTER_PORT)
.withCommand("master")
- .withEnv("MASTER_ARGS", "--default_num_replicas=1")
+ .withEnv("MASTER_ARGS", "--default_num_replicas=1 --unlock_unsafe_flags --use_hybrid_clock=false")
.withNetwork(network)
.withNetworkAliases(masterContainerAlias);
@@ -78,14 +75,16 @@ public TestingKuduServer(String kuduVersion)
toxiProxy.start();
String instanceName = "kudu-tserver";
+ @SuppressWarnings("deprecation")
ToxiproxyContainer.ContainerProxy proxy = toxiProxy.getProxy(instanceName, KUDU_TSERVER_PORT);
tabletServer = new GenericContainer<>(format("%s:%s", KUDU_IMAGE, kuduVersion))
.withExposedPorts(KUDU_TSERVER_PORT)
.withCommand("tserver")
.withEnv("KUDU_MASTERS", format("%s:%s", masterContainerAlias, KUDU_MASTER_PORT))
- .withEnv("TSERVER_ARGS", format("--fs_wal_dir=/var/lib/kudu/tserver --logtostderr --use_hybrid_clock=false --rpc_bind_addresses=%s:%s --rpc_advertised_addresses=%s:%s", instanceName, KUDU_TSERVER_PORT, hostIP, proxy.getProxyPort()))
+ .withEnv("TSERVER_ARGS", format("--fs_wal_dir=/var/lib/kudu/tserver --logtostderr --use_hybrid_clock=false --unlock_unsafe_flags --rpc_bind_addresses=%s:%s --rpc_advertised_addresses=%s:%s", instanceName, KUDU_TSERVER_PORT, TOXIPROXY_NETWORK_ALIAS, proxy.getOriginalProxyPort()))
.withNetwork(network)
.withNetworkAliases(instanceName)
+ .waitingFor(new KuduTabletWaitStrategy(master))
.dependsOn(master);
master.start();
diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java
index dd00a554460..52de3e7ecce 100644
--- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java
+++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java
@@ -677,13 +677,13 @@ public void testSelectInTransaction()
@Test
public void testSelectVersionOfNonExistentTable()
{
+ String tableName = "foo_" + randomNameSuffix();
String catalog = getSession().getCatalog().orElseThrow();
String schema = getSession().getSchema().orElseThrow();
- String tableName = "foo_" + randomNameSuffix();
assertThatThrownBy(() -> query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2021-03-01 00:00:01'"))
- .hasMessage(format("line 1:15: Table '%s.%s.%s' does not exist", catalog, schema, tableName));
+ .hasMessageMatching("line 1:15: Table '%s.%s.%s' does not exist|This connector does not support versioned tables".formatted(catalog, schema, tableName));
assertThatThrownBy(() -> query("SELECT * FROM " + tableName + " FOR VERSION AS OF 'version1'"))
- .hasMessage(format("line 1:15: Table '%s.%s.%s' does not exist", catalog, schema, tableName));
+ .hasMessageMatching("line 1:15: Table '%s.%s.%s' does not exist|This connector does not support versioned tables".formatted(catalog, schema, tableName));
}
/**