diff --git a/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonClientConfig.java b/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonClientConfig.java index a2eda55f1d500..f86e252f9880d 100644 --- a/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonClientConfig.java +++ b/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonClientConfig.java @@ -46,6 +46,7 @@ public class HiveCommonClientConfig private boolean readNullMaskedParquetEncryptedValueEnabled; private boolean useParquetColumnNames; private boolean zstdJniDecompressionEnabled; + private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE); public NodeSelectionStrategy getNodeSelectionStrategy() { @@ -284,4 +285,17 @@ public HiveCommonClientConfig setZstdJniDecompressionEnabled(boolean zstdJniDeco this.zstdJniDecompressionEnabled = zstdJniDecompressionEnabled; return this; } + + @NotNull + public DataSize getAffinitySchedulingFileSectionSize() + { + return affinitySchedulingFileSectionSize; + } + + @Config("hive.affinity-scheduling-file-section-size") + public HiveCommonClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySchedulingFileSectionSize) + { + this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize; + return this; + } } diff --git a/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonSessionProperties.java b/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonSessionProperties.java index b07965622c03f..b67955d5b7024 100644 --- a/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonSessionProperties.java +++ b/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonSessionProperties.java @@ -44,7 +44,7 @@ public class HiveCommonSessionProperties @VisibleForTesting public static final String PARQUET_BATCH_READ_OPTIMIZATION_ENABLED = "parquet_batch_read_optimization_enabled"; - private static final String NODE_SELECTION_STRATEGY = "node_selection_strategy"; + public static final String NODE_SELECTION_STRATEGY = "node_selection_strategy"; private static final String ORC_BLOOM_FILTERS_ENABLED = "orc_bloom_filters_enabled"; private static final String ORC_LAZY_READ_SMALL_RANGES = "orc_lazy_read_small_ranges"; private static final String ORC_MAX_BUFFER_SIZE = "orc_max_buffer_size"; @@ -61,6 +61,7 @@ public class HiveCommonSessionProperties private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; private static final String PARQUET_USE_COLUMN_NAMES = "parquet_use_column_names"; public static final String READ_MASKED_VALUE_ENABLED = "read_null_masked_parquet_encrypted_value_enabled"; + public static final String AFFINITY_SCHEDULING_FILE_SECTION_SIZE = "affinity_scheduling_file_section_size"; private final List> sessionProperties; @Inject @@ -177,6 +178,11 @@ public HiveCommonSessionProperties(HiveCommonClientConfig hiveCommonClientConfig READ_MASKED_VALUE_ENABLED, "Return null when access is denied for an encrypted parquet column", hiveCommonClientConfig.getReadNullMaskedParquetEncryptedValue(), + false), + dataSizeSessionProperty( + AFFINITY_SCHEDULING_FILE_SECTION_SIZE, + "Size of file section for affinity scheduling", + hiveCommonClientConfig.getAffinitySchedulingFileSectionSize(), false)); } @@ -299,4 +305,9 @@ public static PropertyMetadata dataSizeSessionProperty(String name, St value -> DataSize.valueOf((String) value), DataSize::toString); } + + public static DataSize getAffinitySchedulingFileSectionSize(ConnectorSession session) + { + return session.getProperty(AFFINITY_SCHEDULING_FILE_SECTION_SIZE, DataSize.class); + } } diff --git a/presto-hive-common/src/test/java/com/facebook/presto/hive/TestHiveCommonClientConfig.java b/presto-hive-common/src/test/java/com/facebook/presto/hive/TestHiveCommonClientConfig.java index 15d12e9d96800..5db4217f6f17e 100644 --- a/presto-hive-common/src/test/java/com/facebook/presto/hive/TestHiveCommonClientConfig.java +++ b/presto-hive-common/src/test/java/com/facebook/presto/hive/TestHiveCommonClientConfig.java @@ -23,6 +23,7 @@ import java.util.Map; import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.HARD_AFFINITY; +import static io.airlift.units.DataSize.Unit.MEGABYTE; public class TestHiveCommonClientConfig { @@ -47,7 +48,8 @@ public void testDefaults() .setZstdJniDecompressionEnabled(false) .setParquetBatchReaderVerificationEnabled(false) .setParquetBatchReadOptimizationEnabled(false) - .setReadNullMaskedParquetEncryptedValue(false)); + .setReadNullMaskedParquetEncryptedValue(false) + .setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE))); } @Test @@ -72,6 +74,7 @@ public void testExplicitPropertyMappings() .put("hive.enable-parquet-batch-reader-verification", "true") .put("hive.parquet-batch-read-optimization-enabled", "true") .put("hive.read-null-masked-parquet-encrypted-value-enabled", "true") + .put("hive.affinity-scheduling-file-section-size", "512MB") .build(); HiveCommonClientConfig expected = new HiveCommonClientConfig() @@ -92,7 +95,8 @@ public void testExplicitPropertyMappings() .setZstdJniDecompressionEnabled(true) .setParquetBatchReaderVerificationEnabled(true) .setParquetBatchReadOptimizationEnabled(true) - .setReadNullMaskedParquetEncryptedValue(true); + .setReadNullMaskedParquetEncryptedValue(true) + .setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE)); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index 8c0b6ceb4068e..77ced4d0bab2c 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -220,7 +220,6 @@ public class HiveClientConfig private Duration parquetQuickStatsFileMetadataFetchTimeout = new Duration(60, TimeUnit.SECONDS); private int parquetQuickStatsMaxConcurrentCalls = 500; private int quickStatsMaxConcurrentCalls = 100; - private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE); private boolean legacyTimestampBucketing; @Min(0) @@ -1793,19 +1792,6 @@ public int getMaxParallelParsingConcurrency() return this.maxParallelParsingConcurrency; } - @NotNull - public DataSize getAffinitySchedulingFileSectionSize() - { - return affinitySchedulingFileSectionSize; - } - - @Config("hive.affinity-scheduling-file-section-size") - public HiveClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySchedulingFileSectionSize) - { - this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize; - return this; - } - @Config("hive.skip-empty-files") @ConfigDescription("Enables skip of empty files avoiding output error") public HiveClientConfig setSkipEmptyFilesEnabled(boolean skipEmptyFiles) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index f175a109636c8..c2bf836f82ef8 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -131,7 +131,6 @@ public final class HiveSessionProperties public static final String QUICK_STATS_INLINE_BUILD_TIMEOUT = "quick_stats_inline_build_timeout"; public static final String QUICK_STATS_BACKGROUND_BUILD_TIMEOUT = "quick_stats_background_build_timeout"; public static final String DYNAMIC_SPLIT_SIZES_ENABLED = "dynamic_split_sizes_enabled"; - public static final String AFFINITY_SCHEDULING_FILE_SECTION_SIZE = "affinity_scheduling_file_section_size"; public static final String SKIP_EMPTY_FILES = "skip_empty_files"; public static final String LEGACY_TIMESTAMP_BUCKETING = "legacy_timestamp_bucketing"; @@ -639,11 +638,6 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon false, value -> Duration.valueOf((String) value), Duration::toString), - dataSizeSessionProperty( - AFFINITY_SCHEDULING_FILE_SECTION_SIZE, - "Size of file section for affinity scheduling", - hiveClientConfig.getAffinitySchedulingFileSectionSize(), - false), booleanProperty( SKIP_EMPTY_FILES, "If it is required empty files will be skipped", @@ -1126,11 +1120,6 @@ public static Duration getQuickStatsBackgroundBuildTimeout(ConnectorSession sess return session.getProperty(QUICK_STATS_BACKGROUND_BUILD_TIMEOUT, Duration.class); } - public static DataSize getAffinitySchedulingFileSectionSize(ConnectorSession session) - { - return session.getProperty(AFFINITY_SCHEDULING_FILE_SECTION_SIZE, DataSize.class); - } - public static boolean isSkipEmptyFilesEnabled(ConnectorSession session) { return session.getProperty(SKIP_EMPTY_FILES, Boolean.class); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java index d4dcdf215931d..98843199442fb 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java @@ -52,10 +52,10 @@ import static com.facebook.airlift.concurrent.MoreFutures.failedFuture; import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture; +import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize; import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_SPLIT_BUFFERING_LIMIT; import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND; import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR; -import static com.facebook.presto.hive.HiveSessionProperties.getAffinitySchedulingFileSectionSize; import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize; import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize; import static com.facebook.presto.hive.HiveSessionProperties.getMinimumAssignedSplitWeight; diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index 48fe14bcd8a33..9ed0f0262308c 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -165,7 +165,6 @@ public void testDefaults() .setMaxConcurrentParquetQuickStatsCalls(500) .setCteVirtualBucketCount(128) .setSkipEmptyFilesEnabled(false) - .setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE)) .setLegacyTimestampBucketing(false)); } @@ -290,7 +289,6 @@ public void testExplicitPropertyMappings() .put("hive.quick-stats.parquet.max-concurrent-calls", "399") .put("hive.quick-stats.max-concurrent-calls", "101") .put("hive.cte-virtual-bucket-count", "256") - .put("hive.affinity-scheduling-file-section-size", "512MB") .put("hive.skip-empty-files", "true") .put("hive.legacy-timestamp-bucketing", "true") .build(); @@ -411,10 +409,8 @@ public void testExplicitPropertyMappings() .setParquetQuickStatsFileMetadataFetchTimeout(new Duration(30, TimeUnit.SECONDS)) .setMaxConcurrentParquetQuickStatsCalls(399) .setMaxConcurrentQuickStatsCalls(101) - .setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE)) .setSkipEmptyFilesEnabled(true) .setCteVirtualBucketCount(256) - .setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE)) .setLegacyTimestampBucketing(true); ConfigAssertions.assertFullMapping(properties, expected); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java index fba5b260aaade..0113e717e7765 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java @@ -42,7 +42,7 @@ import static com.facebook.presto.hive.CacheQuotaScope.GLOBAL; import static com.facebook.presto.hive.CacheQuotaScope.PARTITION; import static com.facebook.presto.hive.CacheQuotaScope.TABLE; -import static com.facebook.presto.hive.HiveSessionProperties.getAffinitySchedulingFileSectionSize; +import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize; import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize; import static com.facebook.presto.hive.HiveTestUtils.SESSION; import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java index d23b2faa337c4..e5874c32704bd 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java @@ -51,6 +51,7 @@ public class IcebergSplit private final List deletes; private final Optional changelogSplitInfo; private final long dataSequenceNumber; + private final long affinitySchedulingSectionSize; @JsonCreator public IcebergSplit( @@ -66,7 +67,8 @@ public IcebergSplit( @JsonProperty("splitWeight") SplitWeight splitWeight, @JsonProperty("deletes") List deletes, @JsonProperty("changelogSplitInfo") Optional changelogSplitInfo, - @JsonProperty("dataSequenceNumber") long dataSequenceNumber) + @JsonProperty("dataSequenceNumber") long dataSequenceNumber, + @JsonProperty("affinitySchedulingSectionSize") long maxAffinitySchedulingSectionSize) { requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null"); this.path = requireNonNull(path, "path is null"); @@ -82,6 +84,7 @@ public IcebergSplit( this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null")); this.changelogSplitInfo = requireNonNull(changelogSplitInfo, "changelogSplitInfo is null"); this.dataSequenceNumber = dataSequenceNumber; + this.affinitySchedulingSectionSize = maxAffinitySchedulingSectionSize; } @JsonProperty @@ -143,7 +146,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy() public List getPreferredNodes(NodeProvider nodeProvider) { if (getNodeSelectionStrategy() == SOFT_AFFINITY) { - return nodeProvider.get(path); + return nodeProvider.get(path + "#" + start / affinitySchedulingSectionSize); } return addresses; } @@ -173,6 +176,12 @@ public long getDataSequenceNumber() return dataSequenceNumber; } + @JsonProperty + public long getAffinitySchedulingSectionSize() + { + return affinitySchedulingSectionSize; + } + @Override public Object getInfo() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java index bf71ba005fdc5..331542004b2e4 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java @@ -37,6 +37,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize; import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy; import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat; import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; @@ -60,6 +61,7 @@ public class IcebergSplitSource private final double minimumAssignedSplitWeight; private final long targetSplitSize; private final NodeSelectionStrategy nodeSelectionStrategy; + private final long maxAffinitySchedulingSectionSize; private final TupleDomain metadataColumnConstraints; @@ -73,11 +75,12 @@ public IcebergSplitSource( this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes(); this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session); this.nodeSelectionStrategy = getNodeSelectionStrategy(session); + this.maxAffinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes(); this.fileScanTaskIterator = closer.register( splitFiles( closer.register(tableScan.planFiles()), targetSplitSize) - .iterator()); + .iterator()); } @Override @@ -139,6 +142,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task) SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / targetSplitSize, minimumAssignedSplitWeight), 1.0)), task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()), Optional.empty(), - getDataSequenceNumber(task.file())); + getDataSequenceNumber(task.file()), + maxAffinitySchedulingSectionSize); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java index 0d585393987f6..44f6a2b30d494 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java @@ -47,6 +47,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize; import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; @@ -71,6 +72,7 @@ public class ChangelogSplitSource private final long targetSplitSize; private final List columnHandles; private final NodeSelectionStrategy nodeSelectionStrategy; + private final long affinitySchedulingSectionSize; public ChangelogSplitSource( ConnectorSession session, @@ -86,6 +88,7 @@ public ChangelogSplitSource( this.nodeSelectionStrategy = getNodeSelectionStrategy(session); this.fileScanTaskIterable = closer.register(tableScan.planFiles()); this.fileScanTaskIterator = closer.register(fileScanTaskIterable.iterator()); + this.affinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes(); } @Override @@ -153,6 +156,7 @@ private IcebergSplit splitFromContentScanTask(ContentScanTask task, Ch changeTask.changeOrdinal(), changeTask.commitSnapshotId(), columnHandles)), - getDataSequenceNumber(task.file())); + getDataSequenceNumber(task.file()), + affinitySchedulingSectionSize); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/equalitydeletes/EqualityDeletesSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/equalitydeletes/EqualityDeletesSplitSource.java index 7f1d60c3da10a..3598a8fa4474f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/equalitydeletes/EqualityDeletesSplitSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/equalitydeletes/EqualityDeletesSplitSource.java @@ -36,6 +36,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize; import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy; import static com.facebook.presto.iceberg.FileContent.EQUALITY_DELETES; import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent; @@ -53,6 +54,7 @@ public class EqualityDeletesSplitSource private final ConnectorSession session; private final Map specById; private CloseableIterator deleteFiles; + private final long affinitySchedulingSectionSize; public EqualityDeletesSplitSource( ConnectorSession session, @@ -64,6 +66,7 @@ public EqualityDeletesSplitSource( requireNonNull(deleteFiles, "deleteFiles is null"); this.specById = table.specs(); this.deleteFiles = CloseableIterable.filter(deleteFiles, deleteFile -> fromIcebergFileContent(deleteFile.content()) == EQUALITY_DELETES).iterator(); + this.affinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes(); } @Override @@ -121,6 +124,7 @@ private IcebergSplit splitFromDeleteFile(DeleteFile deleteFile) SplitWeight.standard(), ImmutableList.of(), Optional.empty(), - IcebergUtil.getDataSequenceNumber(deleteFile)); + IcebergUtil.getDataSequenceNumber(deleteFile), + affinitySchedulingSectionSize); } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSplitManager.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSplitManager.java index 73047fb4c9f00..a5fb12e6cd23a 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSplitManager.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSplitManager.java @@ -16,6 +16,7 @@ import com.facebook.presto.Session; import com.facebook.presto.common.transaction.TransactionId; import com.facebook.presto.execution.Lifespan; +import com.facebook.presto.metadata.Split; import com.facebook.presto.spi.TableHandle; import com.facebook.presto.spi.WarningCollector; import com.facebook.presto.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy; @@ -29,16 +30,22 @@ import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.AbstractTestQueryFramework; import com.facebook.presto.transaction.TransactionManager; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.iceberg.TableProperties; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.function.Function; import java.util.stream.IntStream; +import static com.facebook.presto.hive.HiveCommonSessionProperties.AFFINITY_SCHEDULING_FILE_SECTION_SIZE; +import static com.facebook.presto.hive.HiveCommonSessionProperties.NODE_SELECTION_STRATEGY; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED; @@ -182,13 +189,13 @@ public void testSplitSchedulingWithTablePropertyAndSession() Long.toString(TableProperties.SPLIT_SIZE_DEFAULT)); assertQuerySucceeds("ALTER TABLE test_split_size SET PROPERTIES (\"read.split.target-size\" = 1)"); String selectQuery = "SELECT * FROM test_split_size"; - long maxSplits = getSplitsForSql(session, selectQuery); + long maxSplits = getSplitsForSql(session, selectQuery).size(); IntStream.range(1, 5) .mapToObj(i -> Math.pow(2, i)) .forEach(splitSize -> { assertQuerySucceeds("ALTER TABLE test_split_size SET PROPERTIES (\"read.split.target-size\" =" + splitSize.intValue() + ")"); - assertEquals(getSplitsForSql(session, selectQuery), (double) maxSplits / splitSize, 5); + assertEquals(getSplitsForSql(session, selectQuery).size(), (double) maxSplits / splitSize, 5); }); // split size should be set to 32 on the table property. // Set it to 1 with the session property to override the table value and verify we get the @@ -200,6 +207,37 @@ public void testSplitSchedulingWithTablePropertyAndSession() assertQuerySucceeds("DROP TABLE test_split_size"); } + @Test + public void testSoftAffinitySchedulingSectionConfig() + { + Session maxIdentifiers = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", AFFINITY_SCHEDULING_FILE_SECTION_SIZE, "1B") + .setCatalogSessionProperty("iceberg", TARGET_SPLIT_SIZE, "1") + .setCatalogSessionProperty("iceberg", NODE_SELECTION_STRATEGY, "SOFT_AFFINITY") + .build(); + assertQuerySucceeds("CREATE TABLE test_affinity_section_scheduling as SELECT * FROM UNNEST(sequence(1, 512)) as t(i)"); + String selectQuery = "SELECT * FROM test_affinity_section_scheduling"; + Function> getIdentifiers = (session) -> { + List splits = getSplitsForSql(session, selectQuery); + Set allIdentifiers = new HashSet<>(); + splits.stream().map(Split::getConnectorSplit) + .forEach(connectorSplit -> connectorSplit.getPreferredNodes(identifier -> { + allIdentifiers.add(identifier); + return ImmutableList.of(); + })); + return allIdentifiers; + }; + Set maxSplitsIds = getIdentifiers.apply(maxIdentifiers); + Set halfSplitIds = getIdentifiers.apply(Session.builder(maxIdentifiers) + .setCatalogSessionProperty("iceberg", AFFINITY_SCHEDULING_FILE_SECTION_SIZE, "2B").build()); + assertEquals((double) halfSplitIds.size() / maxSplitsIds.size(), 0.5, 1E-10); + + Set singleSplitId = getIdentifiers.apply(Session.builder(maxIdentifiers) + .setCatalogSessionProperty("iceberg", AFFINITY_SCHEDULING_FILE_SECTION_SIZE, "1GB").build()); + assertEquals(singleSplitId.size(), 1); + assertQuerySucceeds("DROP TABLE test_affinity_section_scheduling"); + } + private Session sessionWithFilterPushdown(boolean pushdown) { return Session.builder(getQueryRunner().getDefaultSession()) @@ -207,7 +245,7 @@ private Session sessionWithFilterPushdown(boolean pushdown) .build(); } - private long getSplitsForSql(Session session, String sql) + private List getSplitsForSql(Session session, String sql) { TransactionManager transactionManager = getQueryRunner().getTransactionManager(); SplitManager splitManager = getQueryRunner().getSplitManager(); @@ -226,12 +264,12 @@ private long getSplitsForSql(Session session, String sql) tableHandle.getDynamicFilter()); try (SplitSource splitSource = splitManager.getSplits(session, newTableHandle, SplitSchedulingStrategy.UNGROUPED_SCHEDULING, WarningCollector.NOOP)) { - int splits = 0; + ImmutableList.Builder splits = ImmutableList.builder(); while (!splitSource.isFinished()) { - splits += splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1024).get().getSplits().size(); + splits.addAll(splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1024).get().getSplits()); } assertTrue(splitSource.isFinished()); - return splits; + return splits.build(); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e);