Skip to content

Commit

Permalink
[Managed Iceberg] Fix partition value race condition (apache#33549)
Browse files Browse the repository at this point in the history
* fix and update tests

* dont mention df yet

* add PR link

* whitespace
  • Loading branch information
ahmedabu98 authored Jan 9, 2025
1 parent c6f8aae commit d50cc15
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
}
10 changes: 10 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
## Bugfixes

* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).
* [Managed Iceberg] Fixed a bug where DataFile metadata was assigned incorrect partition values ([#33549](https://github.com/apache/beam/pull/33549)).

## Security Fixes

Expand Down Expand Up @@ -157,6 +158,11 @@
* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111))
* (Python) Fixed BigQuery Enrichment bug that can lead to multiple conditions returning duplicate rows, batching returning incorrect results and conditions not scoped by row during batching ([#32780](https://github.com/apache/beam/pull/32780)).

## Known Issues

* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
* Fixed in 2.62.0

# [2.60.0] - 2024-10-17

## Highlights
Expand Down Expand Up @@ -211,6 +217,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192))
* Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output.
* Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results.
* Fixed in 2.61.0.
* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
* Fixed in 2.62.0

# [2.59.0] - 2024-09-11

Expand Down Expand Up @@ -259,6 +267,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192))
* Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output.
* Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results.
* Fixed in 2.61.0.
* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
* Fixed in 2.62.0

# [2.58.1] - 2024-08-15

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class DestinationState {
private final IcebergDestination icebergDestination;
private final PartitionSpec spec;
private final org.apache.iceberg.Schema schema;
private final PartitionKey partitionKey;
// used to determine the partition to which a record belongs
// must not be directly used to create a writer
private final PartitionKey routingPartitionKey;
private final Table table;
private final String stateToken = UUID.randomUUID().toString();
final Cache<PartitionKey, RecordWriter> writers;
Expand All @@ -109,7 +111,7 @@ class DestinationState {
this.icebergDestination = icebergDestination;
this.schema = table.schema();
this.spec = table.spec();
this.partitionKey = new PartitionKey(spec, schema);
this.routingPartitionKey = new PartitionKey(spec, schema);
this.table = table;
for (PartitionField partitionField : spec.fields()) {
partitionFieldMap.put(partitionField.name(), partitionField);
Expand Down Expand Up @@ -154,12 +156,12 @@ class DestinationState {
* can't create a new writer, the {@link Record} is rejected and {@code false} is returned.
*/
boolean write(Record record) {
partitionKey.partition(getPartitionableRecord(record));
routingPartitionKey.partition(getPartitionableRecord(record));

if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) {
if (!writers.asMap().containsKey(routingPartitionKey) && openWriters >= maxNumWriters) {
return false;
}
RecordWriter writer = fetchWriterForPartition(partitionKey);
RecordWriter writer = fetchWriterForPartition(routingPartitionKey);
writer.write(record);
return true;
}
Expand All @@ -173,10 +175,12 @@ private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) {
RecordWriter recordWriter = writers.getIfPresent(partitionKey);

if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) {
// each writer must have its own PartitionKey object
PartitionKey copy = partitionKey.copy();
// calling invalidate for a non-existent key is a safe operation
writers.invalidate(partitionKey);
recordWriter = createWriter(partitionKey);
writers.put(partitionKey, recordWriter);
writers.invalidate(copy);
recordWriter = createWriter(copy);
writers.put(copy, recordWriter);
}
return recordWriter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -65,6 +68,7 @@
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
Expand All @@ -74,7 +78,10 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.PartitionUtil;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -295,6 +302,22 @@ private List<Row> populateTable(Table table) throws IOException {
return expectedRows;
}

private static Map<Integer, ?> constantsMap(
FileScanTask task,
BiFunction<Type, Object, Object> converter,
org.apache.iceberg.Schema schema) {
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();
org.apache.iceberg.Schema partitionSchema = TypeUtil.select(schema, idColumns);
boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();

if (projectsIdentityPartitionColumns) {
return PartitionUtil.constantsMap(task, converter);
} else {
return Collections.emptyMap();
}
}

private List<Record> readRecords(Table table) {
org.apache.iceberg.Schema tableSchema = table.schema();
TableScan tableScan = table.newScan().project(tableSchema);
Expand All @@ -303,13 +326,16 @@ private List<Record> readRecords(Table table) {
InputFilesDecryptor descryptor =
new InputFilesDecryptor(task, table.io(), table.encryption());
for (FileScanTask fileTask : task.files()) {
Map<Integer, ?> idToConstants =
constantsMap(fileTask, IdentityPartitionConverters::convertConstant, tableSchema);
InputFile inputFile = descryptor.getInputFile(fileTask);
CloseableIterable<Record> iterable =
Parquet.read(inputFile)
.split(fileTask.start(), fileTask.length())
.project(tableSchema)
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(tableSchema, fileSchema))
fileSchema ->
GenericParquetReaders.buildReader(tableSchema, fileSchema, idToConstants))
.filter(fileTask.residual())
.build();

Expand Down

0 comments on commit d50cc15

Please sign in to comment.