Skip to content

Commit

Permalink
IcebergInputSource : Add option to toggle case sensitivity while read…
Browse files Browse the repository at this point in the history
…ing columns from iceberg catalog (apache#16496)

* Toggle case sensitivity while reading columns from iceberg

* Fix tests

* Drop case check and set unconditionally
  • Loading branch information
a2l007 authored May 31, 2024
1 parent 0936798 commit b53d757
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 18 deletions.
24 changes: 13 additions & 11 deletions docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -1061,20 +1061,22 @@ The catalog object supports `local` and `hive` catalog types.

The following table lists the properties of a `local` catalog:

|Property|Description|Required|
|--------|-----------|---------|
|type|Set this value to `local`.|yes|
|warehousePath|The location of the warehouse associated with the catalog|yes|
|catalogProperties|Map of any additional properties that needs to be attached to the catalog|no|
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set this value to `local`.|None|yes|
|warehousePath|The location of the warehouse associated with the catalog.|None|yes|
|catalogProperties|Map of any additional properties that needs to be attached to the catalog.|None|no|
|caseSensitive|Toggle case sensitivity for column names during Iceberg table reads.|true|no|

The following table lists the properties of a `hive` catalog:

|Property|Description|Required|
|--------|-----------|---------|
|type|Set this value to `hive`.|yes|
|warehousePath|The location of the warehouse associated with the catalog|yes|
|catalogUri|The URI associated with the hive catalog|yes|
|catalogProperties|Map of any additional properties that needs to be attached to the catalog|no|
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set this value to `hive`.|None|yes|
|warehousePath|The location of the warehouse associated with the catalog.|None|yes|
|catalogUri|The URI associated with the hive catalog.|None|yes|
|catalogProperties|Map of any additional properties that needs to be attached to the catalog.|None|no|
|caseSensitive|Toggle case sensitivity for column names during Iceberg table reads.|true|no|

### Iceberg filter object

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class HiveIcebergCatalog extends IcebergCatalog
@JsonProperty
private Map<String, String> catalogProperties;

@JsonProperty
private final Boolean caseSensitive;

private final Configuration configuration;

private BaseMetastoreCatalog hiveCatalog;
Expand All @@ -69,13 +72,15 @@ public HiveIcebergCatalog(
@JsonProperty("catalogUri") String catalogUri,
@JsonProperty("catalogProperties") @Nullable
Map<String, Object> catalogProperties,
@JsonProperty("caseSensitive") Boolean caseSensitive,
@JacksonInject @Json ObjectMapper mapper,
@JacksonInject @HiveConf Configuration configuration
)
{
this.warehousePath = Preconditions.checkNotNull(warehousePath, "warehousePath cannot be null");
this.catalogUri = Preconditions.checkNotNull(catalogUri, "catalogUri cannot be null");
this.catalogProperties = DynamicConfigProviderUtils.extraConfigAndSetStringMap(catalogProperties, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, mapper);
this.caseSensitive = caseSensitive == null ? true : caseSensitive;
this.configuration = configuration;
this.catalogProperties
.forEach(this.configuration::set);
Expand Down Expand Up @@ -137,4 +142,10 @@ public Map<String, String> getCatalogProperties()
{
return catalogProperties;
}

@Override
public boolean isCaseSensitive()
{
return caseSensitive;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public abstract class IcebergCatalog

public abstract BaseMetastoreCatalog retrieveCatalog();

public boolean isCaseSensitive()
{
return true;
}

/**
* Extract the iceberg data files upto the latest snapshot associated with the table
*
Expand Down Expand Up @@ -92,6 +97,8 @@ public List<String> extractSnapshotDataFiles(
if (snapshotTime != null) {
tableScan = tableScan.asOfTime(snapshotTime.getMillis());
}

tableScan = tableScan.caseSensitive(isCaseSensitive());
CloseableIterable<FileScanTask> tasks = tableScan.planFiles();
CloseableIterable.transform(tasks, FileScanTask::file)
.forEach(dataFile -> dataFilePaths.add(dataFile.path().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,23 @@ public class LocalCatalog extends IcebergCatalog
@JsonProperty
private final Map<String, String> catalogProperties;

@JsonProperty
private final Boolean caseSensitive;

private BaseMetastoreCatalog catalog;

@JsonCreator
public LocalCatalog(
@JsonProperty("warehousePath") String warehousePath,
@JsonProperty("catalogProperties") @Nullable
Map<String, String> catalogProperties
Map<String, String> catalogProperties,
@JsonProperty("caseSensitive") Boolean caseSensitive
)
{
Preconditions.checkNotNull(warehousePath, "warehousePath is null");
this.warehousePath = warehousePath;
this.catalogProperties = catalogProperties;
this.caseSensitive = caseSensitive == null ? true : caseSensitive;
this.catalog = retrieveCatalog();

}
Expand All @@ -71,6 +76,12 @@ public Map<String, String> getCatalogProperties()
return catalogProperties;
}

@Override
public boolean isCaseSensitive()
{
return caseSensitive;
}

@Override
public BaseMetastoreCatalog retrieveCatalog()
{
Expand Down Expand Up @@ -100,12 +111,13 @@ public boolean equals(Object o)
}
LocalCatalog that = (LocalCatalog) o;
return warehousePath.equals(that.warehousePath)
&& Objects.equals(catalogProperties, that.catalogProperties);
&& Objects.equals(catalogProperties, that.catalogProperties)
&& Objects.equals(caseSensitive, that.caseSensitive);
}

@Override
public int hashCode()
{
return Objects.hash(warehousePath, catalogProperties);
return Objects.hash(warehousePath, catalogProperties, caseSensitive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ public void testCatalogCreate()
warehouseDir.getPath(),
"hdfs://testuri",
new HashMap<>(),
true,
mapper,
new Configuration()
);
HiveIcebergCatalog hiveCatalogNullProps = new HiveIcebergCatalog(
warehouseDir.getPath(),
"hdfs://testuri",
null,
null,
mapper,
new Configuration()
);
Expand All @@ -68,6 +70,7 @@ public void testAuthenticate()
warehouseDir.getPath(),
"hdfs://testuri",
catalogMap,
null,
mapper,
new Configuration()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class IcebergInputSourceTest

private IcebergCatalog testCatalog;
private TableIdentifier tableIdentifier;
private File warehouseDir;

private Schema tableSchema = new Schema(
Types.NestedField.required(1, "id", Types.StringType.get()),
Expand All @@ -80,8 +81,8 @@ public class IcebergInputSourceTest
@Before
public void setup() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true);
tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);

createAndLoadTable(tableIdentifier);
Expand Down Expand Up @@ -187,6 +188,33 @@ public void testInputSourceReadFromLatestSnapshot() throws IOException
Assert.assertEquals(1, splits.count());
}

@Test
public void testCaseInsensitiveFiltering() throws IOException
{
LocalCatalog caseInsensitiveCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), false);
Table icebergTableFromSchema = testCatalog.retrieveCatalog().loadTable(tableIdentifier);

icebergTableFromSchema.updateSchema().renameColumn("name", "Name").commit();
IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
new IcebergEqualsFilter("name", "Foo"),
caseInsensitiveCatalog,
new LocalInputSourceFactory(),
null
);

Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
.map(inpSource -> (LocalInputSource) inpSource)
.map(LocalInputSource::getFiles)
.flatMap(List::stream)
.collect(Collectors.toList());

Assert.assertEquals(1, inputSource.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L, null)));
Assert.assertEquals(1, localInputSourceList.size());
}

@After
public void tearDown()
{
Expand All @@ -197,7 +225,6 @@ private void createAndLoadTable(TableIdentifier tableIdentifier) throws IOExcept
{
//Setup iceberg table and schema
Table icebergTableFromSchema = testCatalog.retrieveCatalog().createTable(tableIdentifier, tableSchema);

//Generate an iceberg record and write it to a file
GenericRecord record = GenericRecord.create(tableSchema);
ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void testCatalogSerDe() throws JsonProcessingException
{
final File warehouseDir = FileUtils.createTempDir();
DefaultObjectMapper mapper = new DefaultObjectMapper();
LocalCatalog before = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
LocalCatalog before = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true);
LocalCatalog after = mapper.readValue(
mapper.writeValueAsString(before), LocalCatalog.class);
Assert.assertEquals(before, after);
Expand Down

0 comments on commit b53d757

Please sign in to comment.