Skip to content

Commit

Permalink
[iceberg] Add sections for SOFT_AFFINITY split scheduling
Browse files Browse the repository at this point in the history
This change moves the affinity scheduling file section size
configuration from HiveClientConfig and HiveSessionProperties
to HiveCommonClientConfig and HiveCommonSessionProperties so
that the iceberg connector can benefit from this scheduling
strategy when tables have a small number of files but a large
number of splits.
  • Loading branch information
ZacBlanco committed Feb 20, 2025
1 parent a766a3a commit 6dfb1f2
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class HiveCommonClientConfig
private boolean readNullMaskedParquetEncryptedValueEnabled;
private boolean useParquetColumnNames;
private boolean zstdJniDecompressionEnabled;
private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE);

public NodeSelectionStrategy getNodeSelectionStrategy()
{
Expand Down Expand Up @@ -284,4 +285,17 @@ public HiveCommonClientConfig setZstdJniDecompressionEnabled(boolean zstdJniDeco
this.zstdJniDecompressionEnabled = zstdJniDecompressionEnabled;
return this;
}

@NotNull
public DataSize getAffinitySchedulingFileSectionSize()
{
return affinitySchedulingFileSectionSize;
}

@Config("hive.affinity-scheduling-file-section-size")
public HiveCommonClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySchedulingFileSectionSize)
{
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class HiveCommonSessionProperties
@VisibleForTesting
public static final String PARQUET_BATCH_READ_OPTIMIZATION_ENABLED = "parquet_batch_read_optimization_enabled";

private static final String NODE_SELECTION_STRATEGY = "node_selection_strategy";
public static final String NODE_SELECTION_STRATEGY = "node_selection_strategy";
private static final String ORC_BLOOM_FILTERS_ENABLED = "orc_bloom_filters_enabled";
private static final String ORC_LAZY_READ_SMALL_RANGES = "orc_lazy_read_small_ranges";
private static final String ORC_MAX_BUFFER_SIZE = "orc_max_buffer_size";
Expand All @@ -61,6 +61,7 @@ public class HiveCommonSessionProperties
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_USE_COLUMN_NAMES = "parquet_use_column_names";
public static final String READ_MASKED_VALUE_ENABLED = "read_null_masked_parquet_encrypted_value_enabled";
public static final String AFFINITY_SCHEDULING_FILE_SECTION_SIZE = "affinity_scheduling_file_section_size";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand Down Expand Up @@ -177,6 +178,11 @@ public HiveCommonSessionProperties(HiveCommonClientConfig hiveCommonClientConfig
READ_MASKED_VALUE_ENABLED,
"Return null when access is denied for an encrypted parquet column",
hiveCommonClientConfig.getReadNullMaskedParquetEncryptedValue(),
false),
dataSizeSessionProperty(
AFFINITY_SCHEDULING_FILE_SECTION_SIZE,
"Size of file section for affinity scheduling",
hiveCommonClientConfig.getAffinitySchedulingFileSectionSize(),
false));
}

Expand Down Expand Up @@ -299,4 +305,9 @@ public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, St
value -> DataSize.valueOf((String) value),
DataSize::toString);
}

public static DataSize getAffinitySchedulingFileSectionSize(ConnectorSession session)
{
return session.getProperty(AFFINITY_SCHEDULING_FILE_SECTION_SIZE, DataSize.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;

import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.HARD_AFFINITY;
import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class TestHiveCommonClientConfig
{
Expand All @@ -47,7 +48,8 @@ public void testDefaults()
.setZstdJniDecompressionEnabled(false)
.setParquetBatchReaderVerificationEnabled(false)
.setParquetBatchReadOptimizationEnabled(false)
.setReadNullMaskedParquetEncryptedValue(false));
.setReadNullMaskedParquetEncryptedValue(false)
.setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE)));
}

@Test
Expand All @@ -72,6 +74,7 @@ public void testExplicitPropertyMappings()
.put("hive.enable-parquet-batch-reader-verification", "true")
.put("hive.parquet-batch-read-optimization-enabled", "true")
.put("hive.read-null-masked-parquet-encrypted-value-enabled", "true")
.put("hive.affinity-scheduling-file-section-size", "512MB")
.build();

HiveCommonClientConfig expected = new HiveCommonClientConfig()
Expand All @@ -92,7 +95,8 @@ public void testExplicitPropertyMappings()
.setZstdJniDecompressionEnabled(true)
.setParquetBatchReaderVerificationEnabled(true)
.setParquetBatchReadOptimizationEnabled(true)
.setReadNullMaskedParquetEncryptedValue(true);
.setReadNullMaskedParquetEncryptedValue(true)
.setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE));

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ public class HiveClientConfig
private Duration parquetQuickStatsFileMetadataFetchTimeout = new Duration(60, TimeUnit.SECONDS);
private int parquetQuickStatsMaxConcurrentCalls = 500;
private int quickStatsMaxConcurrentCalls = 100;
private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE);
private boolean legacyTimestampBucketing;

@Min(0)
Expand Down Expand Up @@ -1793,19 +1792,6 @@ public int getMaxParallelParsingConcurrency()
return this.maxParallelParsingConcurrency;
}

@NotNull
public DataSize getAffinitySchedulingFileSectionSize()
{
return affinitySchedulingFileSectionSize;
}

@Config("hive.affinity-scheduling-file-section-size")
public HiveClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySchedulingFileSectionSize)
{
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
return this;
}

@Config("hive.skip-empty-files")
@ConfigDescription("Enables skip of empty files avoiding output error")
public HiveClientConfig setSkipEmptyFilesEnabled(boolean skipEmptyFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public final class HiveSessionProperties
public static final String QUICK_STATS_INLINE_BUILD_TIMEOUT = "quick_stats_inline_build_timeout";
public static final String QUICK_STATS_BACKGROUND_BUILD_TIMEOUT = "quick_stats_background_build_timeout";
public static final String DYNAMIC_SPLIT_SIZES_ENABLED = "dynamic_split_sizes_enabled";
public static final String AFFINITY_SCHEDULING_FILE_SECTION_SIZE = "affinity_scheduling_file_section_size";
public static final String SKIP_EMPTY_FILES = "skip_empty_files";
public static final String LEGACY_TIMESTAMP_BUCKETING = "legacy_timestamp_bucketing";

Expand Down Expand Up @@ -639,11 +638,6 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
false,
value -> Duration.valueOf((String) value),
Duration::toString),
dataSizeSessionProperty(
AFFINITY_SCHEDULING_FILE_SECTION_SIZE,
"Size of file section for affinity scheduling",
hiveClientConfig.getAffinitySchedulingFileSectionSize(),
false),
booleanProperty(
SKIP_EMPTY_FILES,
"If it is required empty files will be skipped",
Expand Down Expand Up @@ -1126,11 +1120,6 @@ public static Duration getQuickStatsBackgroundBuildTimeout(ConnectorSession sess
return session.getProperty(QUICK_STATS_BACKGROUND_BUILD_TIMEOUT, Duration.class);
}

public static DataSize getAffinitySchedulingFileSectionSize(ConnectorSession session)
{
return session.getProperty(AFFINITY_SCHEDULING_FILE_SECTION_SIZE, DataSize.class);
}

public static boolean isSkipEmptyFilesEnabled(ConnectorSession session)
{
return session.getProperty(SKIP_EMPTY_FILES, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@

import static com.facebook.airlift.concurrent.MoreFutures.failedFuture;
import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_SPLIT_BUFFERING_LIMIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static com.facebook.presto.hive.HiveSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMinimumAssignedSplitWeight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ public void testDefaults()
.setMaxConcurrentParquetQuickStatsCalls(500)
.setCteVirtualBucketCount(128)
.setSkipEmptyFilesEnabled(false)
.setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE))
.setLegacyTimestampBucketing(false));
}

Expand Down Expand Up @@ -290,7 +289,6 @@ public void testExplicitPropertyMappings()
.put("hive.quick-stats.parquet.max-concurrent-calls", "399")
.put("hive.quick-stats.max-concurrent-calls", "101")
.put("hive.cte-virtual-bucket-count", "256")
.put("hive.affinity-scheduling-file-section-size", "512MB")
.put("hive.skip-empty-files", "true")
.put("hive.legacy-timestamp-bucketing", "true")
.build();
Expand Down Expand Up @@ -411,10 +409,8 @@ public void testExplicitPropertyMappings()
.setParquetQuickStatsFileMetadataFetchTimeout(new Duration(30, TimeUnit.SECONDS))
.setMaxConcurrentParquetQuickStatsCalls(399)
.setMaxConcurrentQuickStatsCalls(101)
.setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE))
.setSkipEmptyFilesEnabled(true)
.setCteVirtualBucketCount(256)
.setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE))
.setLegacyTimestampBucketing(true);

ConfigAssertions.assertFullMapping(properties, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import static com.facebook.presto.hive.CacheQuotaScope.GLOBAL;
import static com.facebook.presto.hive.CacheQuotaScope.PARTITION;
import static com.facebook.presto.hive.CacheQuotaScope.TABLE;
import static com.facebook.presto.hive.HiveSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class IcebergSplit
private final List<DeleteFile> deletes;
private final Optional<ChangelogSplitInfo> changelogSplitInfo;
private final long dataSequenceNumber;
private final long affinitySchedulingSectionSize;

@JsonCreator
public IcebergSplit(
Expand All @@ -66,7 +67,8 @@ public IcebergSplit(
@JsonProperty("splitWeight") SplitWeight splitWeight,
@JsonProperty("deletes") List<DeleteFile> deletes,
@JsonProperty("changelogSplitInfo") Optional<ChangelogSplitInfo> changelogSplitInfo,
@JsonProperty("dataSequenceNumber") long dataSequenceNumber)
@JsonProperty("dataSequenceNumber") long dataSequenceNumber,
@JsonProperty("affinitySchedulingSectionSize") long maxAffinitySchedulingSectionSize)
{
requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
this.path = requireNonNull(path, "path is null");
Expand All @@ -82,6 +84,7 @@ public IcebergSplit(
this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null"));
this.changelogSplitInfo = requireNonNull(changelogSplitInfo, "changelogSplitInfo is null");
this.dataSequenceNumber = dataSequenceNumber;
this.affinitySchedulingSectionSize = maxAffinitySchedulingSectionSize;
}

@JsonProperty
Expand Down Expand Up @@ -143,7 +146,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
if (getNodeSelectionStrategy() == SOFT_AFFINITY) {
return nodeProvider.get(path);
return nodeProvider.get(path + "#" + start / affinitySchedulingSectionSize);
}
return addresses;
}
Expand Down Expand Up @@ -173,6 +176,12 @@ public long getDataSequenceNumber()
return dataSequenceNumber;
}

@JsonProperty
public long getAffinitySchedulingSectionSize()
{
return affinitySchedulingSectionSize;
}

@Override
public Object getInfo()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
Expand All @@ -60,6 +61,7 @@ public class IcebergSplitSource
private final double minimumAssignedSplitWeight;
private final long targetSplitSize;
private final NodeSelectionStrategy nodeSelectionStrategy;
private final long maxAffinitySchedulingSectionSize;

private final TupleDomain<IcebergColumnHandle> metadataColumnConstraints;

Expand All @@ -73,11 +75,12 @@ public IcebergSplitSource(
this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes();
this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session);
this.nodeSelectionStrategy = getNodeSelectionStrategy(session);
this.maxAffinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes();
this.fileScanTaskIterator = closer.register(
splitFiles(
closer.register(tableScan.planFiles()),
targetSplitSize)
.iterator());
.iterator());
}

@Override
Expand Down Expand Up @@ -139,6 +142,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / targetSplitSize, minimumAssignedSplitWeight), 1.0)),
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
Optional.empty(),
getDataSequenceNumber(task.file()));
getDataSequenceNumber(task.file()),
maxAffinitySchedulingSectionSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
Expand All @@ -71,6 +72,7 @@ public class ChangelogSplitSource
private final long targetSplitSize;
private final List<IcebergColumnHandle> columnHandles;
private final NodeSelectionStrategy nodeSelectionStrategy;
private final long affinitySchedulingSectionSize;

public ChangelogSplitSource(
ConnectorSession session,
Expand All @@ -86,6 +88,7 @@ public ChangelogSplitSource(
this.nodeSelectionStrategy = getNodeSelectionStrategy(session);
this.fileScanTaskIterable = closer.register(tableScan.planFiles());
this.fileScanTaskIterator = closer.register(fileScanTaskIterable.iterator());
this.affinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes();
}

@Override
Expand Down Expand Up @@ -153,6 +156,7 @@ private IcebergSplit splitFromContentScanTask(ContentScanTask<DataFile> task, Ch
changeTask.changeOrdinal(),
changeTask.commitSnapshotId(),
columnHandles)),
getDataSequenceNumber(task.file()));
getDataSequenceNumber(task.file()),
affinitySchedulingSectionSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.iceberg.FileContent.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent;
Expand All @@ -53,6 +54,7 @@ public class EqualityDeletesSplitSource
private final ConnectorSession session;
private final Map<Integer, PartitionSpec> specById;
private CloseableIterator<DeleteFile> deleteFiles;
private final long affinitySchedulingSectionSize;

public EqualityDeletesSplitSource(
ConnectorSession session,
Expand All @@ -64,6 +66,7 @@ public EqualityDeletesSplitSource(
requireNonNull(deleteFiles, "deleteFiles is null");
this.specById = table.specs();
this.deleteFiles = CloseableIterable.filter(deleteFiles, deleteFile -> fromIcebergFileContent(deleteFile.content()) == EQUALITY_DELETES).iterator();
this.affinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes();
}

@Override
Expand Down Expand Up @@ -121,6 +124,7 @@ private IcebergSplit splitFromDeleteFile(DeleteFile deleteFile)
SplitWeight.standard(),
ImmutableList.of(),
Optional.empty(),
IcebergUtil.getDataSequenceNumber(deleteFile));
IcebergUtil.getDataSequenceNumber(deleteFile),
affinitySchedulingSectionSize);
}
}
Loading

0 comments on commit 6dfb1f2

Please sign in to comment.