Skip to content

Commit 9f1c39f

Browse files
Jackie-Jiangleujean02
authored andcommitted
Fix and clean-up upsert and dedup config (apache#15528)
1 parent 5a4714a commit 9f1c39f

File tree

71 files changed

+1287
-1087
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+1287
-1087
lines changed

pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,8 @@ public void testSerDe()
262262
}
263263
{
264264
// with dedup config - without metadata ttl and metadata time column
265-
DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5);
265+
DedupConfig dedupConfig = new DedupConfig();
266+
dedupConfig.setHashFunction(HashFunction.MD5);
266267
TableConfig tableConfig = tableConfigBuilder.setDedupConfig(dedupConfig).build();
267268
// Serialize then de-serialize
268269
checkTableConfigWithDedupConfigWithoutTTL(
@@ -272,7 +273,10 @@ public void testSerDe()
272273
}
273274
{
274275
// with dedup config - with metadata ttl and metadata time column
275-
DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5, null, null, 10, "dedupTimeColumn", false);
276+
DedupConfig dedupConfig = new DedupConfig();
277+
dedupConfig.setHashFunction(HashFunction.MD5);
278+
dedupConfig.setMetadataTTL(10);
279+
dedupConfig.setDedupTimeColumn("dedupTimeColumn");
276280
TableConfig tableConfig = tableConfigBuilder.setDedupConfig(dedupConfig).build();
277281
// Serialize then de-serialize
278282
checkTableConfigWithDedupConfigWithTTL(JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class));

pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,17 @@ public void testCopyConstructor() {
9191
ingestionConfig.setRowTimeValueCheck(true);
9292
ingestionConfig.setSegmentTimeValueCheck(false);
9393

94+
DedupConfig dedupConfig = new DedupConfig();
95+
dedupConfig.setHashFunction(HashFunction.MD5);
96+
9497
TableConfig config = new TableConfigBuilder(TableType.OFFLINE)
9598
.setTableName(RAW_TABLE_NAME)
9699
.setAggregateMetrics(true)
97100
.setRetentionTimeValue("5")
98101
.setRetentionTimeUnit("DAYS")
99102
.setNumReplicas(2)
100-
.setDedupConfig(new DedupConfig(true, HashFunction.MD5))
101103
.setIngestionConfig(ingestionConfig)
104+
.setDedupConfig(dedupConfig)
102105
.setQueryConfig(new QueryConfig(2000L, true, false, Collections.emptyMap(), 100_000L, 100_000L))
103106
.setTierConfigList(List.of(new TierConfig("name", "type", null, null, "storageType", null, null, null)))
104107
.build();

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2546,7 +2546,7 @@ private Map<InstancePartitionsType, InstancePartitions> fetchOrComputeInstancePa
25462546
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixZkManager, tableConfig,
25472547
InstancePartitionsType.OFFLINE));
25482548
}
2549-
if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) {
2549+
if (tableConfig.isUpsertEnabled()) {
25502550
// In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server
25512551
// -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType.
25522552
return Collections.singletonMap(InstancePartitionsType.CONSUMING,

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class RebalanceConfig {
8686
// Whether to run Minimal Data Movement Algorithm, overriding the minimizeDataMovement flag in table config. If set
8787
// to default, the minimizeDataMovement flag in table config will be used to determine whether to run the Minimal
8888
// Data Movement Algorithm.
89+
// TODO: Replace this with Enablement
8990
@ApiModel
9091
public enum MinimizeDataMovementOptions {
9192
ENABLE, DISABLE, DEFAULT

pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java

+31-34
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.pinot.common.utils.TarCompressionUtils;
5555
import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
5656
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
57+
import org.apache.pinot.segment.local.dedup.DedupContext;
5758
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
5859
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
5960
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
@@ -63,6 +64,7 @@
6364
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
6465
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
6566
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
67+
import org.apache.pinot.segment.local.upsert.UpsertContext;
6668
import org.apache.pinot.segment.local.utils.IngestionUtils;
6769
import org.apache.pinot.segment.spi.MutableSegment;
6870
import org.apache.pinot.segment.spi.V1Constants;
@@ -74,7 +76,6 @@
7476
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
7577
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
7678
import org.apache.pinot.spi.config.table.CompletionConfig;
77-
import org.apache.pinot.spi.config.table.DedupConfig;
7879
import org.apache.pinot.spi.config.table.IndexingConfig;
7980
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
8081
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
@@ -761,7 +762,7 @@ public void run() {
761762
// persisted.
762763
// Take upsert snapshot before starting consuming events
763764
if (_partitionUpsertMetadataManager != null) {
764-
if (_tableConfig.getUpsertMetadataTTL() > 0) {
765+
if (_partitionUpsertMetadataManager.getContext().getMetadataTTL() > 0) {
765766
// If upsertMetadataTTL is enabled, we will remove expired primary keys from upsertMetadata
766767
// AFTER taking a snapshot. Taking the snapshot first is crucial to capture the final
767768
// state of each key before it exits the TTL window. Out-of-TTL segments are skipped in
@@ -779,7 +780,8 @@ public void run() {
779780
}
780781
}
781782

782-
if (_partitionDedupMetadataManager != null && _tableConfig.getDedupMetadataTTL() > 0) {
783+
if (_partitionDedupMetadataManager != null
784+
&& _partitionDedupMetadataManager.getContext().getMetadataTTL() > 0) {
783785
_partitionDedupMetadataManager.removeExpiredPrimaryKeys();
784786
}
785787

@@ -1662,27 +1664,24 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
16621664

16631665
// Start new realtime segment
16641666
String consumerDir = realtimeTableDataManager.getConsumerDir();
1665-
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
1666-
new RealtimeSegmentConfig.Builder(indexLoadingConfig).setTableNameWithType(_tableNameWithType)
1667-
.setSegmentName(_segmentNameStr)
1668-
.setStreamName(streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName)
1669-
.setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
1670-
.setSegmentZKMetadata(segmentZKMetadata)
1671-
.setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
1672-
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
1673-
.setAggregateMetrics(indexingConfig.isAggregateMetrics())
1674-
.setIngestionAggregationConfigs(IngestionConfigUtils.getAggregationConfigs(tableConfig))
1675-
.setDefaultNullHandlingEnabled(_defaultNullHandlingEnabled)
1676-
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
1677-
.setUpsertConsistencyMode(tableConfig.getUpsertConsistencyMode())
1678-
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
1679-
.setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
1680-
.setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn())
1681-
.setUpsertOutOfOrderRecordColumn(tableConfig.getOutOfOrderRecordColumn())
1682-
.setUpsertDropOutOfOrderRecord(tableConfig.isDropOutOfOrderRecord())
1683-
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
1684-
.setDedupTimeColumn(tableConfig.getDedupTimeColumn())
1685-
.setFieldConfigList(tableConfig.getFieldConfigList());
1667+
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig)
1668+
.setTableNameWithType(_tableNameWithType)
1669+
.setSegmentName(_segmentNameStr)
1670+
.setStreamName(streamTopic)
1671+
.setSchema(_schema)
1672+
.setTimeColumnName(timeColumnName)
1673+
.setCapacity(_segmentMaxRowCount)
1674+
.setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
1675+
.setSegmentZKMetadata(segmentZKMetadata)
1676+
.setOffHeap(_isOffHeap)
1677+
.setMemoryManager(_memoryManager)
1678+
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
1679+
.setAggregateMetrics(indexingConfig.isAggregateMetrics())
1680+
.setIngestionAggregationConfigs(IngestionConfigUtils.getAggregationConfigs(tableConfig))
1681+
.setDefaultNullHandlingEnabled(_defaultNullHandlingEnabled)
1682+
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
1683+
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
1684+
.setConsumerDir(consumerDir);
16861685

16871686
// Create message decoder
16881687
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig, _schema);
@@ -1761,8 +1760,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
17611760
}
17621761
}
17631762

1764-
@VisibleForTesting
1765-
ParallelSegmentConsumptionPolicy getParallelConsumptionPolicy() {
1763+
private ParallelSegmentConsumptionPolicy getParallelConsumptionPolicy() {
17661764
IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig();
17671765
ParallelSegmentConsumptionPolicy parallelSegmentConsumptionPolicy = null;
17681766
boolean pauseless = false;
@@ -1782,19 +1780,18 @@ ParallelSegmentConsumptionPolicy getParallelConsumptionPolicy() {
17821780
// - For pauseless tables, allow consumption during build, but disallow consumption during download
17831781
// - For non-pauseless tables, disallow consumption during build and download
17841782
// TODO: Revisit the non-pauseless handling
1785-
if (_realtimeTableDataManager.isDedupEnabled()) {
1786-
DedupConfig dedupConfig = _tableConfig.getDedupConfig();
1787-
assert dedupConfig != null;
1788-
if (dedupConfig.isAllowDedupConsumptionDuringCommit()) {
1783+
if (_partitionUpsertMetadataManager != null) {
1784+
UpsertContext upsertContext = _partitionUpsertMetadataManager.getContext();
1785+
if (upsertContext.isAllowPartialUpsertConsumptionDuringCommit()
1786+
|| upsertContext.getUpsertMode() != UpsertConfig.Mode.PARTIAL) {
17891787
return ParallelSegmentConsumptionPolicy.ALLOW_ALWAYS;
17901788
}
17911789
return pauseless ? ParallelSegmentConsumptionPolicy.ALLOW_DURING_BUILD_ONLY
17921790
: ParallelSegmentConsumptionPolicy.DISALLOW_ALWAYS;
17931791
}
1794-
if (_realtimeTableDataManager.isPartialUpsertEnabled()) {
1795-
UpsertConfig upsertConfig = _tableConfig.getUpsertConfig();
1796-
assert upsertConfig != null;
1797-
if (upsertConfig.isAllowPartialUpsertConsumptionDuringCommit()) {
1792+
if (_partitionDedupMetadataManager != null) {
1793+
DedupContext dedupContext = _partitionDedupMetadataManager.getContext();
1794+
if (dedupContext.isAllowDedupConsumptionDuringCommit()) {
17981795
return ParallelSegmentConsumptionPolicy.ALLOW_ALWAYS;
17991796
}
18001797
return pauseless ? ParallelSegmentConsumptionPolicy.ALLOW_DURING_BUILD_ONLY

pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java

+17-23
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
import org.apache.pinot.segment.spi.ImmutableSegment;
7171
import org.apache.pinot.segment.spi.IndexSegment;
7272
import org.apache.pinot.segment.spi.SegmentContext;
73-
import org.apache.pinot.spi.config.table.DedupConfig;
7473
import org.apache.pinot.spi.config.table.IndexingConfig;
7574
import org.apache.pinot.spi.config.table.TableConfig;
7675
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -203,27 +202,22 @@ protected void doInit() {
203202
IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig;
204203
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
205204
assert tableConfig != null;
206-
DedupConfig dedupConfig = tableConfig.getDedupConfig();
207-
boolean dedupEnabled = dedupConfig != null && dedupConfig.isDedupEnabled();
208-
if (dedupEnabled) {
205+
if (tableConfig.isDedupEnabled()) {
209206
Schema schema = indexLoadingConfig.getSchema();
210207
assert schema != null;
211-
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
212-
Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns),
213-
"Primary key columns must be configured for dedup");
214-
_tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics,
215-
_instanceDataManagerConfig.getDedupConfig());
208+
_tableDedupMetadataManager =
209+
TableDedupMetadataManagerFactory.create(_instanceDataManagerConfig.getDedupConfig(), tableConfig, schema,
210+
this);
216211
}
217212

218-
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
219-
if (upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE) {
220-
Preconditions.checkState(!dedupEnabled, "Dedup and upsert cannot be both enabled for table: %s",
221-
_tableUpsertMetadataManager);
213+
if (tableConfig.isUpsertEnabled()) {
214+
Preconditions.checkState(_tableDedupMetadataManager == null,
215+
"Dedup and upsert cannot be both enabled for table: %s", _tableNameWithType);
222216
Schema schema = indexLoadingConfig.getSchema();
223217
assert schema != null;
224218
_tableUpsertMetadataManager =
225-
TableUpsertMetadataManagerFactory.create(tableConfig, _instanceDataManagerConfig.getUpsertConfig());
226-
_tableUpsertMetadataManager.init(tableConfig, schema, this);
219+
TableUpsertMetadataManagerFactory.create(_instanceDataManagerConfig.getUpsertConfig(), tableConfig, schema,
220+
this);
227221
}
228222

229223
_enforceConsumptionInOrder = isEnforceConsumptionInOrder();
@@ -423,7 +417,7 @@ public boolean isUpsertEnabled() {
423417

424418
public boolean isPartialUpsertEnabled() {
425419
return _tableUpsertMetadataManager != null
426-
&& _tableUpsertMetadataManager.getUpsertMode() == UpsertConfig.Mode.PARTIAL;
420+
&& _tableUpsertMetadataManager.getContext().getUpsertMode() == UpsertConfig.Mode.PARTIAL;
427421
}
428422

429423
private void handleSegmentPreload(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) {
@@ -439,7 +433,7 @@ private void handleSegmentPreload(SegmentZKMetadata zkMetadata, IndexLoadingConf
439433
* Handles upsert preload if the upsert preload is enabled.
440434
*/
441435
private void handleUpsertPreload(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) {
442-
if (_tableUpsertMetadataManager == null || !_tableUpsertMetadataManager.isEnablePreload()) {
436+
if (_tableUpsertMetadataManager == null || !_tableUpsertMetadataManager.getContext().isPreloadEnabled()) {
443437
return;
444438
}
445439
String segmentName = zkMetadata.getSegmentName();
@@ -453,7 +447,7 @@ private void handleUpsertPreload(SegmentZKMetadata zkMetadata, IndexLoadingConfi
453447
* Handles dedup preload if the dedup preload is enabled.
454448
*/
455449
private void handleDedupPreload(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) {
456-
if (_tableDedupMetadataManager == null || !_tableDedupMetadataManager.isEnablePreload()) {
450+
if (_tableDedupMetadataManager == null || !_tableDedupMetadataManager.getContext().isPreloadEnabled()) {
457451
return;
458452
}
459453
String segmentName = zkMetadata.getSegmentName();
@@ -654,12 +648,12 @@ private long getDownloadTimeoutMs(TableConfig tableConfig) {
654648
@VisibleForTesting
655649
protected RealtimeSegmentDataManager createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata,
656650
TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName,
657-
ConsumerCoordinator consumerCoordinator, PartitionUpsertMetadataManager partitionUpsertMetadataManager,
658-
PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier isTableReadyToConsumeData)
651+
ConsumerCoordinator consumerCoordinator, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager,
652+
@Nullable PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier isTableReadyToConsumeData)
659653
throws AttemptsExceededException, RetriableOperationException {
660654
return new RealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
661-
indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, _serverMetrics,
662-
partitionUpsertMetadataManager, partitionDedupMetadataManager, isTableReadyToConsumeData);
655+
indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, _serverMetrics, partitionUpsertMetadataManager,
656+
partitionDedupMetadataManager, isTableReadyToConsumeData);
663657
}
664658

665659
/**
@@ -784,7 +778,7 @@ private void replaceUpsertSegment(String segmentName, SegmentDataManager oldSegm
784778
// being filled by partitionUpsertMetadataManager, making the queries see less valid docs than expected.
785779
IndexSegment oldSegment = oldSegmentManager.getSegment();
786780
ImmutableSegment immutableSegment = newSegmentManager.getSegment();
787-
UpsertConfig.ConsistencyMode consistencyMode = _tableUpsertMetadataManager.getUpsertConsistencyMode();
781+
UpsertConfig.ConsistencyMode consistencyMode = _tableUpsertMetadataManager.getContext().getConsistencyMode();
788782
if (consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
789783
partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment);
790784
registerSegment(segmentName, newSegmentManager, partitionUpsertMetadataManager);

pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ private InstanceResponseBlock executeInternal(ServerQueryRequest queryRequest, E
227227
RealtimeTableDataManager rtdm = (RealtimeTableDataManager) tableDataManager;
228228
TableUpsertMetadataManager tumm = rtdm.getTableUpsertMetadataManager();
229229
boolean isUsingConsistencyMode =
230-
rtdm.getTableUpsertMetadataManager().getUpsertConsistencyMode() != UpsertConfig.ConsistencyMode.NONE;
230+
rtdm.getTableUpsertMetadataManager().getContext().getConsistencyMode() != UpsertConfig.ConsistencyMode.NONE;
231231
if (isUsingConsistencyMode) {
232232
tumm.lockForSegmentContexts();
233233
}

0 commit comments

Comments
 (0)