diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java index 85d7f90ec..261709bdb 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java @@ -19,11 +19,14 @@ package org.apache.iceberg.orc; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; import org.apache.orc.TypeDescription; public abstract class OrcSchemaWithTypeVisitor { @@ -96,39 +99,36 @@ protected T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisito as the corresponding types in the union of Orc schema. Except the tag field, the fields in the struct of Iceberg schema are the same as the types in the union of Orc schema in the general case. In case of field projection, the fields in the struct of Iceberg schema only contains - the fields to be projected which equals to a subset of the types in the union of ORC schema. - Therefore, this function visits the complex union with the consideration of both cases. + a subset of the types in the union of ORC schema, but all the readers for the union branch types must be constructed, + it's up to the reader code logic to determine to return what value for the given projection schema. + Therefore, this function visits the complex union with the consideration of either whole projection or partially projected schema. Noted that null value and default value for complex union is not a consideration in case of ORC */ private void visitComplexUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor visitor, List options) { - int typeIndex = 0; - int fieldIndexInStruct = 0; - while (typeIndex < union.getChildren().size()) { - TypeDescription schema = union.getChildren().get(typeIndex); - boolean relatedFieldInStructFound = false; - Types.StructType struct = type.asStructType(); - if (fieldIndexInStruct < struct.fields().size() && - ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME - .equals(struct.fields().get(fieldIndexInStruct).name())) { - fieldIndexInStruct++; - } - if (fieldIndexInStruct < struct.fields().size()) { - String structFieldName = type.asStructType().fields().get(fieldIndexInStruct).name(); - int indexFromStructFieldName = Integer.parseInt(structFieldName - .substring(ORCSchemaUtil.ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH)); - if (typeIndex == indexFromStructFieldName) { - relatedFieldInStructFound = true; - T option = visit(type.asStructType().fields().get(fieldIndexInStruct).type(), schema, visitor); - options.add(option); - fieldIndexInStruct++; - } + StructType structType = type.asStructType(); + List unionTypes = union.getChildren(); + Map idxInOrcUnionToIdxInType = new HashMap<>(); + // Construct idxInOrcUnionToIdxInType + for (int i = 0; i < structType.fields().size(); i += 1) { + String fieldName = structType.fields().get(i).name(); + if (!fieldName.equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) { + int idxInOrcUnion = Integer.parseInt(fieldName + .substring(ORCSchemaUtil.ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH)); + idxInOrcUnionToIdxInType.put(idxInOrcUnion, i); } - if (!relatedFieldInStructFound) { - visitNotProjectedTypeInComplexUnion(schema, visitor, options, typeIndex); + } + + for (int i = 0; i < union.getChildren().size(); i += 1) { + if (idxInOrcUnionToIdxInType.containsKey(i)) { + options.add(visit(structType.fields().get(idxInOrcUnionToIdxInType.get(i)).type(), unionTypes.get(i), visitor)); + } else { + // even if the type is not projected in the iceberg schema, a reader for the underlying orc type branch still needs to be created, + // we use a OrcToIcebergVisitorWithPseudoId to re-construct the iceberg type from the orc union branch type and add it to the options, + // with a pseudo iceberg-id "-1" to avoid failures with the remaining iceberg code infra + visitNotProjectedTypeInComplexUnion(unionTypes.get(i), visitor, options, i); } - typeIndex++; } } @@ -137,16 +137,15 @@ private void visitComplexUnion(Type type, TypeDescription union, OrcSchemaWi // used to make the reading of Orc file successfully. In this case, a pseudo Iceberg type is converted from // the Orc schema and is used to create the option for the reader of the current type which still can // read the corresponding content in Orc file successfully. - private static void visitNotProjectedTypeInComplexUnion(TypeDescription schema, + private static void visitNotProjectedTypeInComplexUnion(TypeDescription orcType, OrcSchemaWithTypeVisitor visitor, List options, int typeIndex) { - OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitor(); - schemaConverter.beforeField("field" + typeIndex, schema); - schema.setAttribute(org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, PSEUDO_ICEBERG_FIELD_ID); - Optional icebergSchema = OrcToIcebergVisitor.visit(schema, schemaConverter); - schemaConverter.afterField("field" + typeIndex, schema); - options.add(visit(icebergSchema.get().type(), schema, visitor)); + OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitorWithPseudoId(); + schemaConverter.beforeField("field" + typeIndex, orcType); + Optional icebergType = OrcToIcebergVisitor.visit(orcType, schemaConverter); + schemaConverter.afterField("field" + typeIndex, orcType); + options.add(visit(icebergType.get().type(), orcType, visitor)); } public T record(Types.StructType iStruct, TypeDescription record, List names, List fields) { diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcToIcebergVisitorWithPseudoId.java b/orc/src/main/java/org/apache/iceberg/orc/OrcToIcebergVisitorWithPseudoId.java new file mode 100644 index 000000000..cf00feace --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcToIcebergVisitorWithPseudoId.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.orc.TypeDescription; + +public class OrcToIcebergVisitorWithPseudoId extends OrcToIcebergVisitor { + + private static final String PSEUDO_ICEBERG_FIELD_ID = "-1"; + + @Override + public Optional union(TypeDescription union, List> options) { + union.setAttribute(org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, PSEUDO_ICEBERG_FIELD_ID); + List> optionsCopy = new ArrayList<>(options); + optionsCopy.add(0, Optional.of( + Types.NestedField.of(Integer.parseInt(PSEUDO_ICEBERG_FIELD_ID), true, + ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME, Types.IntegerType.get()))); + return Optional.of( + Types.NestedField.of(Integer.parseInt(PSEUDO_ICEBERG_FIELD_ID), true, currentFieldName(), Types.StructType.of( + optionsCopy.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList())))); + } + + @Override + public Optional record(TypeDescription record, List names, + List> fields) { + record.setAttribute(org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, PSEUDO_ICEBERG_FIELD_ID); + return super.record(record, names, fields); + } + + @Override + public Optional list(TypeDescription array, Optional element) { + array.setAttribute(org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, PSEUDO_ICEBERG_FIELD_ID); + return super.list(array, element); + } + + @Override + public Optional map(TypeDescription map, Optional key, + Optional value) { + map.setAttribute(org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, PSEUDO_ICEBERG_FIELD_ID); + return super.map(map, key, value); + } + + @Override + public Optional primitive(TypeDescription primitive) { + primitive.setAttribute(org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, PSEUDO_ICEBERG_FIELD_ID); + return super.primitive(primitive); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index 1d097a95e..18841ba4c 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -20,7 +20,7 @@ package org.apache.iceberg.spark.data; import java.math.BigDecimal; -import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.iceberg.orc.ORCSchemaUtil; @@ -169,38 +169,26 @@ protected void set(InternalRow struct, int pos, Object value) { static class UnionReader implements OrcValueReader { private final OrcValueReader[] readers; - private final Type expectedIcebergSchema; - private int[] projectedFieldIdsToIdxInReturnedRow; - private boolean isTagFieldProjected; - private int numOfFieldsInReturnedRow; + private final Type expectedType; + private Map idxInExpectedSchemaToIdxInReaders; private UnionReader(List> readers, Type expected) { this.readers = new OrcValueReader[readers.size()]; for (int i = 0; i < this.readers.length; i += 1) { this.readers[i] = readers.get(i); } - this.expectedIcebergSchema = expected; + this.expectedType = expected; if (this.readers.length > 1) { - // Creating an integer array to track the mapping between the index of fields to be projected - // and the index of the value for the field stored in the returned row, - // if the value for a field equals to Integer.MIN_VALUE, it means the value of this field should not be stored - // in the returned row - this.projectedFieldIdsToIdxInReturnedRow = new int[readers.size()]; - Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, Integer.MIN_VALUE); - this.numOfFieldsInReturnedRow = 0; - this.isTagFieldProjected = false; - - for (Types.NestedField expectedStructField : expectedIcebergSchema.asStructType().fields()) { - String fieldName = expectedStructField.name(); - if (fieldName.equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) { - this.isTagFieldProjected = true; - this.numOfFieldsInReturnedRow++; - continue; + idxInExpectedSchemaToIdxInReaders = new HashMap<>(); + // Construct fieldIdxInExpectedSchemaToIdxInReaders + for (int i = 0; i < expectedType.asStructType().fields().size(); i += 1) { + String fieldName = expectedType.asStructType().fields().get(i).name(); + if (!fieldName.equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) { + int idxInReader = Integer.parseInt(fieldName + .substring(ORCSchemaUtil.ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH)); + this.idxInExpectedSchemaToIdxInReaders.put(i, idxInReader); } - int projectedFieldIndex = Integer.valueOf(fieldName - .substring(ORCSchemaUtil.ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH)); - this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] = this.numOfFieldsInReturnedRow++; } } } @@ -214,18 +202,21 @@ public Object nonNullRead(ColumnVector vector, int row) { if (readers.length == 1) { return value; } else { - InternalRow struct = new GenericInternalRow(numOfFieldsInReturnedRow); + InternalRow struct = new GenericInternalRow(this.expectedType.asStructType().fields().size()); for (int i = 0; i < struct.numFields(); i += 1) { struct.setNullAt(i); } - if (this.isTagFieldProjected) { - struct.update(0, fieldIndex); - } - - if (this.projectedFieldIdsToIdxInReturnedRow[fieldIndex] != Integer.MIN_VALUE) { - struct.update(this.projectedFieldIdsToIdxInReturnedRow[fieldIndex], value); + for (int i = 0; i < this.expectedType.asStructType().fields().size(); i += 1) { + String fieldName = expectedType.asStructType().fields().get(i).name(); + if (fieldName.equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) { + struct.update(i, fieldIndex); + } else { + int idxInReader = this.idxInExpectedSchemaToIdxInReaders.get(i); + if (idxInReader == fieldIndex) { + struct.update(i, value); + } + } } - return struct; } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java index 1cbc4f67f..b58e4158a 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java @@ -56,6 +56,7 @@ public class TestSparkOrcUnions { + private static final int NUM_OF_ROWS = 50; @Rule @@ -99,7 +100,8 @@ public void testComplexUnion() throws IOException { VectorizedRowBatch batch = orcSchema.createRowBatch(); LongColumnVector longColumnVector = new LongColumnVector(NUM_OF_ROWS); BytesColumnVector bytesColumnVector = new BytesColumnVector(NUM_OF_ROWS); - UnionColumnVector complexUnion = new UnionColumnVector(NUM_OF_ROWS, longColumnVector, bytesColumnVector); + UnionColumnVector complexUnion = new UnionColumnVector(NUM_OF_ROWS, longColumnVector, + bytesColumnVector); complexUnion.init(); @@ -146,11 +148,11 @@ public void testComplexUnion() throws IOException { @Test public void testComplexUnionWithColumnProjection() throws IOException { TypeDescription orcSchema = - TypeDescription.fromString("struct>"); + TypeDescription.fromString("struct>"); Schema expectedSchema = new Schema( - Types.NestedField.optional(0, "unionCol", Types.StructType.of( - Types.NestedField.optional(1, "field0", Types.IntegerType.get()))) + Types.NestedField.optional(0, "unionCol", Types.StructType.of( + Types.NestedField.optional(1, "field0", Types.IntegerType.get()))) ); final InternalRow expectedFirstRow = new GenericInternalRow(1); @@ -169,13 +171,14 @@ public void testComplexUnionWithColumnProjection() throws IOException { Path orcFilePath = new Path(orcFile.getPath()); Writer writer = OrcFile.createWriter(orcFilePath, - OrcFile.writerOptions(conf) - .setSchema(orcSchema).overwrite(true)); + OrcFile.writerOptions(conf) + .setSchema(orcSchema).overwrite(true)); VectorizedRowBatch batch = orcSchema.createRowBatch(); LongColumnVector longColumnVector = new LongColumnVector(NUM_OF_ROWS); BytesColumnVector bytesColumnVector = new BytesColumnVector(NUM_OF_ROWS); - UnionColumnVector complexUnion = new UnionColumnVector(NUM_OF_ROWS, longColumnVector, bytesColumnVector); + UnionColumnVector complexUnion = new UnionColumnVector(NUM_OF_ROWS, longColumnVector, + bytesColumnVector); complexUnion.init(); @@ -196,9 +199,9 @@ public void testComplexUnionWithColumnProjection() throws IOException { // Test non-vectorized reader List actualRows = Lists.newArrayList(); try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) - .project(expectedSchema) - .createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema)) - .build()) { + .project(expectedSchema) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema)) + .build()) { reader.forEach(actualRows::add); Assert.assertEquals(actualRows.size(), NUM_OF_ROWS); @@ -208,10 +211,10 @@ public void testComplexUnionWithColumnProjection() throws IOException { // Test vectorized reader try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) - .project(expectedSchema) - .createBatchedReaderFunc(readOrcSchema -> - VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of())) - .build()) { + .project(expectedSchema) + .createBatchedReaderFunc(readOrcSchema -> + VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of())) + .build()) { final Iterator actualRowsIt = batchesToRows(reader.iterator()); assertEquals(expectedSchema, expectedFirstRow, actualRowsIt.next()); @@ -222,7 +225,8 @@ public void testComplexUnionWithColumnProjection() throws IOException { @Test public void testDeeplyNestedUnion() throws IOException { TypeDescription orcSchema = - TypeDescription.fromString("struct>>>"); + TypeDescription.fromString( + "struct>>>"); Schema expectedSchema = new Schema( Types.NestedField.optional(0, "c1", Types.StructType.of( @@ -319,7 +323,8 @@ public void testSingleTypeUnion() throws IOException { TypeDescription orcSchema = TypeDescription.fromString("struct>"); - Schema expectedSchema = new Schema(Types.NestedField.optional(0, "unionCol", Types.StringType.get())); + Schema expectedSchema = new Schema( + Types.NestedField.optional(0, "unionCol", Types.StringType.get())); final InternalRow expectedFirstRow = new GenericInternalRow(1); expectedFirstRow.update(0, UTF8String.fromString("foo-0")); @@ -388,8 +393,8 @@ public void testSingleTypeUnionOfStruct() throws IOException { Schema expectedSchema = new Schema( Types.NestedField.optional(0, "unionCol", Types.StructType.of( - Types.NestedField.optional(1, "c", Types.StringType.get()) - ))); + Types.NestedField.optional(1, "c", Types.StringType.get()) + ))); final InternalRow expectedFirstRow = new GenericInternalRow(1); final InternalRow innerExpectedFirstRow = new GenericInternalRow(1); @@ -455,7 +460,8 @@ public void testSingleTypeUnionOfStruct() throws IOException { @Test public void testDeepNestedSingleTypeUnion() throws IOException { TypeDescription orcSchema = - TypeDescription.fromString("struct>>>"); + TypeDescription.fromString( + "struct>>>"); Schema expectedSchema = new Schema( Types.NestedField.optional(0, "outerUnion", Types.StructType.of( @@ -525,6 +531,87 @@ public void testDeepNestedSingleTypeUnion() throws IOException { } } + @Test + public void testDeeplyNestedComplexUnionWithProjection() throws IOException { + TypeDescription orcSchema = + TypeDescription.fromString( + "struct>>>"); + + // Project [c1.field1, c1.field0, c1.tag] from the ORC schema + Schema expectedSchema = new Schema( + Types.NestedField.optional(0, "c1", Types.StructType.of( + Types.NestedField.optional(2, "field1", Types.StructType.of( + Types.NestedField.optional(3, "c2", Types.StringType.get()), + Types.NestedField.optional(4, "c3", Types.StructType.of( + Types.NestedField.optional(5, "tag", Types.IntegerType.get()), + Types.NestedField.optional(6, "field0", Types.IntegerType.get()), + Types.NestedField.optional(7, "field1", Types.StringType.get()))) + )), + Types.NestedField.optional(8, "field0", Types.IntegerType.get()), + Types.NestedField.optional(1, "tag", Types.IntegerType.get()))) + ); + + final InternalRow expectedFirstRow = new GenericInternalRow(1); + final InternalRow inner1 = new GenericInternalRow(3); + inner1.update(0, null); + inner1.update(1, 0); + inner1.update(2, 0); + expectedFirstRow.update(0, inner1); + + Configuration conf = new Configuration(); + + File orcFile = temp.newFile(); + Path orcFilePath = new Path(orcFile.getPath()); + + Writer writer = OrcFile.createWriter(orcFilePath, + OrcFile.writerOptions(conf) + .setSchema(orcSchema).overwrite(true)); + + VectorizedRowBatch batch = orcSchema.createRowBatch(); + UnionColumnVector innerUnion1 = (UnionColumnVector) batch.cols[0]; + LongColumnVector innerInt1 = (LongColumnVector) innerUnion1.fields[0]; + StructColumnVector innerStruct2 = (StructColumnVector) innerUnion1.fields[1]; + BytesColumnVector innerString2 = (BytesColumnVector) innerStruct2.fields[0]; + UnionColumnVector innerUnion3 = (UnionColumnVector) innerStruct2.fields[1]; + LongColumnVector innerInt3 = (LongColumnVector) innerUnion3.fields[0]; + BytesColumnVector innerString3 = (BytesColumnVector) innerUnion3.fields[1]; + + for (int i = 0; i < NUM_OF_ROWS; ++i) { + int row = batch.size++; + innerUnion1.tags[row] = i % 2; + innerInt1.vector[row] = i; + innerString2.setVal(row, ("foo" + row).getBytes(StandardCharsets.UTF_8)); + innerUnion3.tags[row] = (i + 1) % 2; + innerInt3.vector[row] = i; + innerString3.setVal(row, ("bar" + row).getBytes(StandardCharsets.UTF_8)); + // If the batch is full, write it out and start over. + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + } + if (batch.size != 0) { + writer.addRowBatch(batch); + batch.reset(); + } + writer.close(); + + // test non-vectorized reader + List results = Lists.newArrayList(); + try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) + .project(expectedSchema) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema)) + .build()) { + reader.forEach(results::add); + final InternalRow actualFirstRow = results.get(0); + + // Assert read as a whole completes successfully + Assert.assertEquals(results.size(), NUM_OF_ROWS); + // Assert the first row exactly matches + assertEquals(expectedSchema, expectedFirstRow, actualFirstRow); + } + } + private Iterator batchesToRows(Iterator batches) { return Iterators.concat(Iterators.transform(batches, ColumnarBatch::rowIterator)); }