From d50cc15c483adb457e448474c0a5401a01e3dcfc Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Thu, 9 Jan 2025 18:09:14 +0000 Subject: [PATCH] [Managed Iceberg] Fix partition value race condition (#33549) * fix and update tests * dont mention df yet * add PR link * whitespace --- .../IO_Iceberg_Integration_Tests.json | 2 +- CHANGES.md | 10 +++++++ .../sdk/io/iceberg/RecordWriterManager.java | 20 +++++++------ .../iceberg/catalog/IcebergCatalogBaseIT.java | 28 ++++++++++++++++++- 4 files changed, 50 insertions(+), 10 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 1efc8e9e4405..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run", + "comment": "Modify this file in a trivial way to cause this test suite to run.", "modification": 1 } diff --git a/CHANGES.md b/CHANGES.md index 189bd8c1f71c..2333c835e038 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -115,6 +115,7 @@ ## Bugfixes * Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)). +* [Managed Iceberg] Fixed a bug where DataFile metadata was assigned incorrect partition values ([#33549](https://github.com/apache/beam/pull/33549)). ## Security Fixes @@ -157,6 +158,11 @@ * Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111)) * (Python) Fixed BigQuery Enrichment bug that can lead to multiple conditions returning duplicate rows, batching returning incorrect results and conditions not scoped by row during batching ([#32780](https://github.com/apache/beam/pull/32780)). +## Known Issues + +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 + # [2.60.0] - 2024-10-17 ## Highlights @@ -211,6 +217,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192)) * Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output. * Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results. * Fixed in 2.61.0. +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 # [2.59.0] - 2024-09-11 @@ -259,6 +267,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192)) * Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output. * Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results. * Fixed in 2.61.0. +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 # [2.58.1] - 2024-08-15 diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 4c21a0175ab0..63186f26fb5a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -96,7 +96,9 @@ class DestinationState { private final IcebergDestination icebergDestination; private final PartitionSpec spec; private final org.apache.iceberg.Schema schema; - private final PartitionKey partitionKey; + // used to determine the partition to which a record belongs + // must not be directly used to create a writer + private final PartitionKey routingPartitionKey; private final Table table; private final String stateToken = UUID.randomUUID().toString(); final Cache writers; @@ -109,7 +111,7 @@ class DestinationState { this.icebergDestination = icebergDestination; this.schema = table.schema(); this.spec = table.spec(); - this.partitionKey = new PartitionKey(spec, schema); + this.routingPartitionKey = new PartitionKey(spec, schema); this.table = table; for (PartitionField partitionField : spec.fields()) { partitionFieldMap.put(partitionField.name(), partitionField); @@ -154,12 +156,12 @@ class DestinationState { * can't create a new writer, the {@link Record} is rejected and {@code false} is returned. */ boolean write(Record record) { - partitionKey.partition(getPartitionableRecord(record)); + routingPartitionKey.partition(getPartitionableRecord(record)); - if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) { + if (!writers.asMap().containsKey(routingPartitionKey) && openWriters >= maxNumWriters) { return false; } - RecordWriter writer = fetchWriterForPartition(partitionKey); + RecordWriter writer = fetchWriterForPartition(routingPartitionKey); writer.write(record); return true; } @@ -173,10 +175,12 @@ private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) { RecordWriter recordWriter = writers.getIfPresent(partitionKey); if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) { + // each writer must have its own PartitionKey object + PartitionKey copy = partitionKey.copy(); // calling invalidate for a non-existent key is a safe operation - writers.invalidate(partitionKey); - recordWriter = createWriter(partitionKey); - writers.put(partitionKey, recordWriter); + writers.invalidate(copy); + recordWriter = createWriter(copy); + writers.put(copy, recordWriter); } return recordWriter; } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index 3970fd07c5c6..df2ca5adb7ac 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -28,10 +28,13 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -65,6 +68,7 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; @@ -74,7 +78,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.PartitionUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -295,6 +302,22 @@ private List populateTable(Table table) throws IOException { return expectedRows; } + private static Map constantsMap( + FileScanTask task, + BiFunction converter, + org.apache.iceberg.Schema schema) { + PartitionSpec spec = task.spec(); + Set idColumns = spec.identitySourceIds(); + org.apache.iceberg.Schema partitionSchema = TypeUtil.select(schema, idColumns); + boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); + + if (projectsIdentityPartitionColumns) { + return PartitionUtil.constantsMap(task, converter); + } else { + return Collections.emptyMap(); + } + } + private List readRecords(Table table) { org.apache.iceberg.Schema tableSchema = table.schema(); TableScan tableScan = table.newScan().project(tableSchema); @@ -303,13 +326,16 @@ private List readRecords(Table table) { InputFilesDecryptor descryptor = new InputFilesDecryptor(task, table.io(), table.encryption()); for (FileScanTask fileTask : task.files()) { + Map idToConstants = + constantsMap(fileTask, IdentityPartitionConverters::convertConstant, tableSchema); InputFile inputFile = descryptor.getInputFile(fileTask); CloseableIterable iterable = Parquet.read(inputFile) .split(fileTask.start(), fileTask.length()) .project(tableSchema) .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(tableSchema, fileSchema)) + fileSchema -> + GenericParquetReaders.buildReader(tableSchema, fileSchema, idToConstants)) .filter(fileTask.residual()) .build();