Skip to content

Commit

Permalink
fix checks on largest comparison value for upsert ttl and allow to ad…
Browse files Browse the repository at this point in the history
…d segments out of ttl (apache#14094)

* fix checks on largest comparison value for ttl and allow subclass to add segments even out of ttl 

* fix the special value for TTL comparison and use Double.Negtive_Infinity

* unify TTL enable check and persist watermark after taking snapshot

* update ttl watermark and skip segment out of ttl during preloading

* refine and add tests
  • Loading branch information
klsince authored Sep 28, 2024
1 parent fce17d3 commit ad1eda5
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
@ThreadSafe
public abstract class BasePartitionUpsertMetadataManager implements PartitionUpsertMetadataManager {
protected static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1);
// The special value to indicate the largest comparison value is not set yet, and allow negative comparison values.
protected static final double TTL_WATERMARK_NOT_SET = Double.NEGATIVE_INFINITY;

protected final String _tableNameWithType;
protected final int _partitionId;
Expand Down Expand Up @@ -178,10 +180,13 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti
}
_serverMetrics = ServerMetrics.get();
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
if (_metadataTTL > 0) {
if (isTTLEnabled()) {
Preconditions.checkState(_comparisonColumns.size() == 1,
"Upsert TTL does not work with multiple comparison columns");
Preconditions.checkState(_metadataTTL <= 0 || _enableSnapshot, "Upsert metadata TTL must have snapshot enabled");
_largestSeenComparisonValue = new AtomicDouble(loadWatermark());
} else {
_largestSeenComparisonValue = new AtomicDouble(Double.MIN_VALUE);
_largestSeenComparisonValue = new AtomicDouble(TTL_WATERMARK_NOT_SET);
deleteWatermark();
}
}
Expand Down Expand Up @@ -383,37 +388,9 @@ public void addSegment(ImmutableSegment segment) {
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), segmentName,
_tableNameWithType);
ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;

if (_deletedKeysTTL > 0) {
double maxComparisonValue =
((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
.getMaxValue()).doubleValue();
_largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue));
if (isTTLEnabled()) {
updateWatermark(segment);
}

// Skip adding segment that has max comparison value smaller than (largestSeenComparisonValue - TTL)
if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) {
Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot enabled");
Preconditions.checkState(_comparisonColumns.size() == 1,
"Upsert TTL does not work with multiple comparison columns");
Number maxComparisonValue =
(Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL) {
_logger.info("Skip adding segment: {} because it's out of TTL", segmentName);
MutableRoaringBitmap validDocIdsSnapshot = immutableSegment.loadValidDocIdsFromSnapshot();
if (validDocIdsSnapshot != null) {
MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment, validDocIdsSnapshot);
immutableSegment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot),
queryableDocIds != null ? new ThreadSafeMutableRoaringBitmap(queryableDocIds) : null);
} else {
_logger.warn("Failed to find snapshot from segment: {} which is out of TTL, treating all documents as valid",
segmentName);
}
return;
}
}

if (!startOperation()) {
_logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName());
return;
Expand All @@ -422,7 +399,7 @@ public void addSegment(ImmutableSegment segment) {
_snapshotLock.readLock().lock();
}
try {
doAddSegment(immutableSegment);
doAddSegment((ImmutableSegmentImpl) segment);
_trackedSegments.add(segment);
if (_enableSnapshot) {
_updatedSegmentsSinceLastSnapshot.add(segment);
Expand All @@ -435,9 +412,51 @@ public void addSegment(ImmutableSegment segment) {
}
}

protected boolean isTTLEnabled() {
return _metadataTTL > 0 || _deletedKeysTTL > 0;
}

protected boolean isOutOfMetadataTTL(IndexSegment segment) {
if (_metadataTTL > 0 && _largestSeenComparisonValue.get() != TTL_WATERMARK_NOT_SET) {
Number maxComparisonValue =
(Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
return maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL;
}
return false;
}

protected boolean skipAddSegmentOutOfTTL(ImmutableSegmentImpl segment) {
String segmentName = segment.getSegmentName();
_logger.info("Skip adding segment: {} because it's out of TTL", segmentName);
MutableRoaringBitmap validDocIdsSnapshot = segment.loadValidDocIdsFromSnapshot();
if (validDocIdsSnapshot != null) {
MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment, validDocIdsSnapshot);
segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot),
queryableDocIds != null ? new ThreadSafeMutableRoaringBitmap(queryableDocIds) : null);
} else {
_logger.warn("Failed to find validDocIds snapshot to add segment: {} out of TTL, treating all docs as valid",
segmentName);
}
// Return true if segment is skipped. This boolean value allows subclass to decide whether to skip.
return true;
}

protected boolean skipPreloadSegmentOutOfTTL(ImmutableSegmentImpl segment, MutableRoaringBitmap validDocIdsSnapshot) {
String segmentName = segment.getSegmentName();
_logger.info("Skip preloading segment: {} because it's out of TTL", segmentName);
MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment, validDocIdsSnapshot);
segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot),
queryableDocIds != null ? new ThreadSafeMutableRoaringBitmap(queryableDocIds) : null);
// Return true if segment is skipped. This boolean value allows subclass to decide whether to skip.
return true;
}

protected void doAddSegment(ImmutableSegmentImpl segment) {
String segmentName = segment.getSegmentName();
_logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys());
if (isOutOfMetadataTTL(segment) && skipAddSegmentOutOfTTL(segment)) {
return;
}
long startTimeMs = System.currentTimeMillis();
if (!_enableSnapshot) {
segment.deleteValidDocIdsSnapshot();
Expand Down Expand Up @@ -479,6 +498,9 @@ public void preloadSegment(ImmutableSegment segment) {
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), segmentName,
_tableNameWithType);
if (isTTLEnabled()) {
updateWatermark(segment);
}
if (!startOperation()) {
_logger.info("Skip preloading segment: {} because metadata manager is already stopped", segmentName);
return;
Expand Down Expand Up @@ -508,7 +530,9 @@ protected void doPreloadSegment(ImmutableSegmentImpl segment) {
segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
return;
}

if (isOutOfMetadataTTL(segment) && skipPreloadSegmentOutOfTTL(segment, validDocIds)) {
return;
}
try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn)) {
doPreloadSegment(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds));
Expand Down Expand Up @@ -780,21 +804,17 @@ public void removeSegment(IndexSegment segment) {
_logger.info("Skip removing untracked (replaced or empty) segment: {}", segmentName);
return;
}
if (!startOperation()) {
_logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName);
return;
}
// Skip removing the upsert metadata of segment that has max comparison value smaller than
// (largestSeenComparisonValue - TTL), i.e. out of metadata TTL. The expired metadata is removed while creating
// new consuming segment in batches.
boolean skipRemoveMetadata = false;
if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) {
Number maxComparisonValue =
(Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL) {
_logger.info("Skip removing segment: {} because it's out of TTL", segmentName);
skipRemoveMetadata = true;
}
}
if (!startOperation()) {
_logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName);
return;
if (isOutOfMetadataTTL(segment)) {
_logger.info("Skip removing segment: {} because it's out of TTL", segmentName);
skipRemoveMetadata = true;
}
if (_enableSnapshot) {
_snapshotLock.readLock().lock();
Expand Down Expand Up @@ -989,6 +1009,12 @@ protected void doTakeSnapshot() {
}
}
_updatedSegmentsSinceLastSnapshot.clear();
// Persist TTL watermark after taking snapshots if TTL is enabled, so that segments out of TTL can be loaded with
// updated validDocIds bitmaps. If the TTL watermark is persisted first, segments out of TTL may get loaded with
// stale bitmaps or even no bitmap snapshots to use.
if (isTTLEnabled()) {
persistWatermark(_largestSeenComparisonValue.get());
}
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, numImmutableSegments);
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
Expand Down Expand Up @@ -1020,7 +1046,7 @@ protected double loadWatermark() {
_logger.warn("Caught exception while loading watermark file: {}, skipping", watermarkFile);
}
}
return Double.MIN_VALUE;
return TTL_WATERMARK_NOT_SET;
}

/**
Expand Down Expand Up @@ -1061,9 +1087,26 @@ protected File getWatermarkFile() {
return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION + _partitionId);
}

protected void updateWatermark(ImmutableSegment segment) {
double maxComparisonValue =
((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
.getMaxValue()).doubleValue();
_largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue));
}

@VisibleForTesting
double getWatermark() {
return _largestSeenComparisonValue.get();
}

@VisibleForTesting
void setWatermark(double watermark) {
_largestSeenComparisonValue.set(watermark);
}

@Override
public void removeExpiredPrimaryKeys() {
if (_metadataTTL <= 0 && _deletedKeysTTL <= 0) {
if (!isTTLEnabled()) {
return;
}
if (!startOperation()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,10 @@ public void doRemoveExpiredPrimaryKeys() {
AtomicInteger numTotalKeysMarkForDeletion = new AtomicInteger();
AtomicInteger numDeletedKeysWithinTTLWindow = new AtomicInteger();
double largestSeenComparisonValue = _largestSeenComparisonValue.get();
double metadataTTLKeysThreshold;
if (_metadataTTL > 0) {
metadataTTLKeysThreshold = largestSeenComparisonValue - _metadataTTL;
} else {
metadataTTLKeysThreshold = Double.MIN_VALUE;
}
double deletedKeysThreshold;
if (_deletedKeysTTL > 0) {
deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL;
} else {
deletedKeysThreshold = Double.MIN_VALUE;
}

double metadataTTLKeysThreshold =
_metadataTTL > 0 ? largestSeenComparisonValue - _metadataTTL : Double.NEGATIVE_INFINITY;
double deletedKeysThreshold =
_deletedKeysTTL > 0 ? largestSeenComparisonValue - _deletedKeysTTL : Double.NEGATIVE_INFINITY;
_primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
double comparisonValue = ((Number) recordLocation.getComparisonValue()).doubleValue();
if (_metadataTTL > 0 && comparisonValue < metadataTTLKeysThreshold) {
Expand All @@ -227,9 +218,6 @@ public void doRemoveExpiredPrimaryKeys() {
}
}
});
if (_metadataTTL > 0) {
persistWatermark(largestSeenComparisonValue);
}

// Update metrics
updatePrimaryKeyGauge();
Expand Down Expand Up @@ -266,7 +254,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
Comparable newComparisonValue = recordInfo.getComparisonValue();

// When TTL is enabled, update largestSeenComparisonValue when adding new record
if (_metadataTTL > 0 || _deletedKeysTTL > 0) {
if (isTTLEnabled()) {
double comparisonValue = ((Number) newComparisonValue).doubleValue();
_largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, comparisonValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,7 @@ protected void doRemoveSegment(IndexSegment segment) {
segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
long startTimeMs = System.currentTimeMillis();

try (
PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, _primaryKeyColumns)) {
try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, _primaryKeyColumns)) {
removeSegment(segment,
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, segment.getSegmentMetadata().getTotalDocs()));
} catch (Exception e) {
Expand Down Expand Up @@ -292,13 +291,8 @@ public void doRemoveExpiredPrimaryKeys() {
AtomicInteger numDeletedKeysWithinTTLWindow = new AtomicInteger();
AtomicInteger numDeletedTTLKeysInMultipleSegments = new AtomicInteger();
double largestSeenComparisonValue = _largestSeenComparisonValue.get();
double deletedKeysThreshold;
if (_deletedKeysTTL > 0) {
deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL;
} else {
deletedKeysThreshold = Double.MIN_VALUE;
}

double deletedKeysThreshold =
_deletedKeysTTL > 0 ? largestSeenComparisonValue - _deletedKeysTTL : Double.NEGATIVE_INFINITY;
_primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
double comparisonValue = ((Number) recordLocation.getComparisonValue()).doubleValue();
// We need to verify that the record belongs to only one segment. If a record is part of multiple segments,
Expand Down
Loading

0 comments on commit ad1eda5

Please sign in to comment.