Skip to content

Commit

Permalink
mark/unmark segments being added as optional and allow to remove such…
Browse files Browse the repository at this point in the history
… optional segments with a configurable delay
  • Loading branch information
klsince committed Sep 12, 2024
1 parent 83a71d2 commit 2ee3554
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti
_tableIndexDir = context.getTableIndexDir();
UpsertConfig.ConsistencyMode cmode = context.getConsistencyMode();
if (cmode == UpsertConfig.ConsistencyMode.SYNC || cmode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
_upsertViewManager = new UpsertViewManager(cmode, context);
_upsertViewManager = new UpsertViewManager(tableNameWithType, partitionId, cmode, context);
} else {
_upsertViewManager = null;
}
Expand Down Expand Up @@ -401,6 +401,9 @@ public void addSegment(ImmutableSegment segment) {
_logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName());
return;
}
if (_upsertViewManager != null) {
_upsertViewManager.markOptional(segment);
}
if (_enableSnapshot) {
_snapshotLock.readLock().lock();
}
Expand All @@ -414,6 +417,9 @@ public void addSegment(ImmutableSegment segment) {
if (_enableSnapshot) {
_snapshotLock.readLock().unlock();
}
if (_upsertViewManager != null) {
_upsertViewManager.unmarkOptional(segment);
}
finishOperation();
}
}
Expand Down Expand Up @@ -1116,9 +1122,12 @@ public synchronized void close()
}
}
doClose();
// We don't remove the segment from the metadata manager when
// it's closed. This was done to make table deletion faster. Since we don't remove the segment, we never decrease
// the primary key count. So, we set the primary key count to 0 here.
if (_upsertViewManager != null) {
_upsertViewManager.close();
}
// We don't remove the segment from the metadata manager when it's closed. This was done to make table deletion
// faster. Since we don't remove the segment, we never decrease the primary key count. So, we set the primary key
// count to 0 here.
updatePrimaryKeyGauge(0);
_logger.info("Closed the metadata manager");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,16 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
_consistencyMode = UpsertConfig.ConsistencyMode.NONE;
}
long upsertViewRefreshIntervalMs = upsertConfig.getUpsertViewRefreshIntervalMs();
long optionalSegmentTTLMs = upsertConfig.getOptionalSegmentTTLMs();
long optionalSegmentCleanInternalMs = upsertConfig.getOptionalSegmentCleanInternalMs();
File tableIndexDir = tableDataManager.getTableDataDir();
_context = new UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema)
.setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns)
.setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction)
.setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot)
.setEnablePreload(_enablePreload).setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL)
.setConsistencyMode(_consistencyMode).setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs)
.setOptionalSegmentTTLMs(optionalSegmentTTLMs).setOptionalSegmentCleanInternalMs(optionalSegmentCleanInternalMs)
.setTableIndexDir(tableIndexDir).setDropOutOfOrderRecord(upsertConfig.isDropOutOfOrderRecord())
.setEnableDeletedKeysCompactionConsistency(_enableDeletedKeysCompactionConsistency)
.setTableDataManager(tableDataManager).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class UpsertContext {
private final double _deletedKeysTTL;
private final UpsertConfig.ConsistencyMode _consistencyMode;
private final long _upsertViewRefreshIntervalMs;
private final long _optionalSegmentTTLMs;
private final long _optionalSegmentCleanInternalMs;
private final File _tableIndexDir;
private final boolean _dropOutOfOrderRecord;
private final boolean _enableDeletedKeysCompactionConsistency;
Expand All @@ -53,8 +55,9 @@ private UpsertContext(TableConfig tableConfig, Schema schema, List<String> prima
List<String> comparisonColumns, @Nullable String deleteRecordColumn, HashFunction hashFunction,
@Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, boolean enablePreload,
double metadataTTL, double deletedKeysTTL, UpsertConfig.ConsistencyMode consistencyMode,
long upsertViewRefreshIntervalMs, File tableIndexDir, boolean dropOutOfOrderRecord,
boolean enableDeletedKeysCompactionConsistency, @Nullable TableDataManager tableDataManager) {
long upsertViewRefreshIntervalMs, long optionalSegmentTTLMs, long optionalSegmentCleanInternalMs,
File tableIndexDir, boolean dropOutOfOrderRecord, boolean enableDeletedKeysCompactionConsistency,
@Nullable TableDataManager tableDataManager) {
_tableConfig = tableConfig;
_schema = schema;
_primaryKeyColumns = primaryKeyColumns;
Expand All @@ -68,6 +71,8 @@ private UpsertContext(TableConfig tableConfig, Schema schema, List<String> prima
_deletedKeysTTL = deletedKeysTTL;
_consistencyMode = consistencyMode;
_upsertViewRefreshIntervalMs = upsertViewRefreshIntervalMs;
_optionalSegmentTTLMs = optionalSegmentTTLMs;
_optionalSegmentCleanInternalMs = optionalSegmentCleanInternalMs;
_tableIndexDir = tableIndexDir;
_dropOutOfOrderRecord = dropOutOfOrderRecord;
_enableDeletedKeysCompactionConsistency = enableDeletedKeysCompactionConsistency;
Expand Down Expand Up @@ -126,6 +131,14 @@ public long getUpsertViewRefreshIntervalMs() {
return _upsertViewRefreshIntervalMs;
}

public long getOptionalSegmentTTLMs() {
return _optionalSegmentTTLMs;
}

public long getOptionalSegmentCleanInternalMs() {
return _optionalSegmentCleanInternalMs;
}

public File getTableIndexDir() {
return _tableIndexDir;
}
Expand Down Expand Up @@ -156,6 +169,8 @@ public static class Builder {
private double _deletedKeysTTL;
private UpsertConfig.ConsistencyMode _consistencyMode;
private long _upsertViewRefreshIntervalMs;
private long _optionalSegmentTTLMs;
private long _optionalSegmentCleanInternalMs;
private File _tableIndexDir;
private boolean _dropOutOfOrderRecord;
private boolean _enableDeletedKeysCompactionConsistency;
Expand Down Expand Up @@ -226,6 +241,16 @@ public Builder setUpsertViewRefreshIntervalMs(long upsertViewRefreshIntervalMs)
return this;
}

public Builder setOptionalSegmentTTLMs(long optionalSegmentTTLMs) {
_optionalSegmentTTLMs = optionalSegmentTTLMs;
return this;
}

public Builder setOptionalSegmentCleanInternalMs(long optionalSegmentCleanInternalMs) {
_optionalSegmentCleanInternalMs = optionalSegmentCleanInternalMs;
return this;
}

public Builder setTableIndexDir(File tableIndexDir) {
_tableIndexDir = tableIndexDir;
return this;
Expand Down Expand Up @@ -255,8 +280,8 @@ public UpsertContext build() {
Preconditions.checkState(_tableIndexDir != null, "Table index directory must be set");
return new UpsertContext(_tableConfig, _schema, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn,
_hashFunction, _partialUpsertHandler, _enableSnapshot, _enablePreload, _metadataTTL, _deletedKeysTTL,
_consistencyMode, _upsertViewRefreshIntervalMs, _tableIndexDir, _dropOutOfOrderRecord,
_enableDeletedKeysCompactionConsistency, _tableDataManager);
_consistencyMode, _upsertViewRefreshIntervalMs, _optionalSegmentTTLMs, _optionalSegmentCleanInternalMs,
_tableIndexDir, _dropOutOfOrderRecord, _enableDeletedKeysCompactionConsistency, _tableDataManager);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@
package org.apache.pinot.segment.local.upsert;

import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
Expand All @@ -48,8 +56,9 @@
* the query threads access a copy of bitmaps that are kept updated by upsert thread periodically. But the query
* thread can specify a freshness threshold query option to refresh the bitmap copies if not fresh enough.
*/
public class UpsertViewManager {
public class UpsertViewManager implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(UpsertViewManager.class);
private static ScheduledExecutorService _optionalSegmentsAsyncCleaner;
private final UpsertConfig.ConsistencyMode _consistencyMode;

// NOTE that we can't reuse _trackedSegments map in BasePartitionUpsertMetadataManager, as it doesn't track all
Expand All @@ -62,7 +71,9 @@ public class UpsertViewManager {
// Optional segments are part of the tracked segments. They can get processed by server before getting included in
// broker's routing table, like the new consuming segment. Although broker misses such segments, the server needs
// to acquire them to avoid missing the new valid docs in them.
private final Set<String> _optionalSegments = ConcurrentHashMap.newKeySet();
private final Map<String, Long /*ts to remove*/> _optionalSegments = new ConcurrentHashMap<>();
private final long _optionalSegmentTTLMs;
private final ScheduledFuture<?> _optionalSegmentsAsyncCleanFuture;

// Updating and accessing segments' validDocIds bitmaps are synchronized with a separate R/W lock for clarity.
// The query threads always get _upsertViewTrackedSegmentsLock then _upsertViewSegmentDocIdsLock to avoid deadlock.
Expand All @@ -75,9 +86,53 @@ public class UpsertViewManager {
private volatile long _lastUpsertViewRefreshTimeMs = 0;
private final long _upsertViewRefreshIntervalMs;

public UpsertViewManager(UpsertConfig.ConsistencyMode consistencyMode, UpsertContext context) {
public UpsertViewManager(String tableNameWithType, int partitionId, UpsertConfig.ConsistencyMode consistencyMode,
UpsertContext context) {
_consistencyMode = consistencyMode;
_upsertViewRefreshIntervalMs = context.getUpsertViewRefreshIntervalMs();
_optionalSegmentTTLMs = context.getOptionalSegmentTTLMs();
if (_optionalSegmentTTLMs > 0) {
synchronized (UpsertViewManager.class) {
if (_optionalSegmentsAsyncCleaner == null) {
_optionalSegmentsAsyncCleaner =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("optional-segments-async-cleaner"));
LOGGER.info("Initialized single-threaded optional segments async cleaner");
}
}
long intervalMs = context.getOptionalSegmentCleanInternalMs();
long initialDelayMs = (long) ((new Random().nextDouble() * 0.9 + 0.1) * intervalMs);
_optionalSegmentsAsyncCleanFuture = _optionalSegmentsAsyncCleaner.scheduleWithFixedDelay(() -> {
try {
LOGGER.debug("Cleaning tracked optional segments: {} from partition: {} of table: {}", _optionalSegments,
partitionId, tableNameWithType);
long nowMs = System.currentTimeMillis();
_optionalSegments.entrySet().removeIf(entry -> nowMs > entry.getValue());
} catch (Throwable t) {
LOGGER.error("Caught exception while removing optional segments asynchronously", t);
}
}, initialDelayMs, intervalMs, TimeUnit.MILLISECONDS);
LOGGER.info(
"Scheduled task to remove optional segments from partition: {} of table: {} with initial delay: {}ms, "
+ "interval: {}ms", partitionId, tableNameWithType, initialDelayMs, intervalMs);
} else {
_optionalSegmentsAsyncCleanFuture = null;
}
}

@Override
public void close()
throws IOException {
if (_optionalSegmentsAsyncCleanFuture != null) {
LOGGER.info("Cancelling optional segments clean task");
_optionalSegmentsAsyncCleanFuture.cancel(true);
while (!_optionalSegmentsAsyncCleanFuture.isDone()) {
try {
_optionalSegmentsAsyncCleanFuture.get();
} catch (Exception e) {
LOGGER.error("Caught exception while waiting for optional segments clean task to finish", e);
}
}
}
}

public void replaceDocId(IndexSegment newSegment, ThreadSafeMutableRoaringBitmap validDocIds,
Expand Down Expand Up @@ -277,15 +332,30 @@ public void unlockTrackedSegments() {
}

public Set<String> getOptionalSegments() {
return _optionalSegments;
return _optionalSegments.keySet();
}

// We mark a tracked segment as optional to get it included in the list of selected segments for queries to get a
// complete upsert view, e.g. the newly created consuming segment or newly uploaded immutable segments. Such
// segments can be processed by the server even before they are included in the broker's routing table.
public void markOptional(IndexSegment segment) {
_optionalSegments.put(segment.getSegmentName(), -1L);
}

public void unmarkOptional(IndexSegment segment) {
if (_optionalSegmentTTLMs == 0) {
_optionalSegments.remove(segment.getSegmentName());
} else if (_optionalSegmentTTLMs > 0) {
_optionalSegments.put(segment.getSegmentName(), System.currentTimeMillis() + _optionalSegmentTTLMs);
} // else keep it around till server restarts
}

public void trackSegment(IndexSegment segment) {
_trackedSegmentsLock.writeLock().lock();
try {
_trackedSegments.add(segment);
if (segment instanceof MutableSegment) {
_optionalSegments.add(segment.getSegmentName());
markOptional(segment);
}
if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
// Note: it's possible the segment is already tracked and the _trackedSegments doesn't really change here. But
Expand All @@ -306,7 +376,7 @@ public void untrackSegment(IndexSegment segment) {
try {
_trackedSegments.remove(segment);
if (segment instanceof MutableSegment) {
_optionalSegments.remove(segment.getSegmentName());
unmarkOptional(segment);
}
// No need to eagerly refresh the upsert view for SNAPSHOT mode when untracking a segment, as the untracked
// segment won't be used by any new queries, thus it can be removed when next refresh happens later.
Expand Down
Loading

0 comments on commit 2ee3554

Please sign in to comment.