Skip to content

Commit

Permalink
calculate size based flush threshold per topic
Browse files Browse the repository at this point in the history
  • Loading branch information
itschrispeck committed Jan 7, 2025
1 parent 3ee3dd4 commit c012d4f
Showing 4 changed files with 37 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -320,11 +320,10 @@ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
String realtimeTableName = tableConfig.getTableName();
LOGGER.info("Setting up new LLC table: {}", realtimeTableName);

_flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);

List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map(
streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig)
).collect(Collectors.toList());
streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater);
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList());
Original file line number Diff line number Diff line change
@@ -18,13 +18,14 @@
*/
package org.apache.pinot.controller.helix.core.realtime.segment;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.pinot.spi.stream.StreamConfig;


/**
* Manager which maintains the flush threshold update objects for each table
* Manager which maintains the flush threshold update objects for each (table, topic) pair
*/
public class FlushThresholdUpdateManager {
private final ConcurrentMap<String, FlushThresholdUpdater> _flushThresholdUpdaterMap = new ConcurrentHashMap<>();
@@ -45,30 +46,39 @@ public class FlushThresholdUpdateManager {
* partitions consumed by a server; FixedFlushThresholdUpdater sets the actual segment flush threshold as is.
*/
public FlushThresholdUpdater getFlushThresholdUpdater(StreamConfig streamConfig) {
String tableTopicKey = getKey(streamConfig);
String realtimeTableName = streamConfig.getTableNameWithType();

int flushThresholdRows = streamConfig.getFlushThresholdRows();
if (flushThresholdRows > 0) {
_flushThresholdUpdaterMap.remove(realtimeTableName);
_flushThresholdUpdaterMap.remove(tableTopicKey);
return new DefaultFlushThresholdUpdater(flushThresholdRows);
}
int flushThresholdSegmentRows = streamConfig.getFlushThresholdSegmentRows();
if (flushThresholdSegmentRows > 0) {
_flushThresholdUpdaterMap.remove(realtimeTableName);
_flushThresholdUpdaterMap.remove(tableTopicKey);
return new FixedFlushThresholdUpdater(flushThresholdSegmentRows);
}
// Legacy behavior: when flush threshold rows is explicitly set to 0, use segment size based flush threshold
long flushThresholdSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes();
if (flushThresholdRows == 0 || flushThresholdSegmentSizeBytes > 0) {
return _flushThresholdUpdaterMap.computeIfAbsent(realtimeTableName,
k -> new SegmentSizeBasedFlushThresholdUpdater(realtimeTableName));
return _flushThresholdUpdaterMap.computeIfAbsent(tableTopicKey,
k -> new SegmentSizeBasedFlushThresholdUpdater(realtimeTableName, streamConfig.getTopicName()));
} else {
_flushThresholdUpdaterMap.remove(realtimeTableName);
_flushThresholdUpdaterMap.remove(tableTopicKey);
return new DefaultFlushThresholdUpdater(StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
}
}

public void clearFlushThresholdUpdater(String realtimeTableName) {
_flushThresholdUpdaterMap.remove(realtimeTableName);
public void clearFlushThresholdUpdater(StreamConfig streamConfig) {
_flushThresholdUpdaterMap.remove(getKey(streamConfig));
}

private String getKey(StreamConfig streamConfig) {
return streamConfig.getTableNameWithType() + "," + streamConfig.getTopicName();
}

@VisibleForTesting
public int getFlushThresholdUpdaterMapSize() {
return _flushThresholdUpdaterMap.size();
}
}
Original file line number Diff line number Diff line change
@@ -39,12 +39,14 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda
public static final Logger LOGGER = LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class);
private final SegmentFlushThresholdComputer _flushThresholdComputer;
private final String _realtimeTableName;
private final String _topicName;

private final ControllerMetrics _controllerMetrics = ControllerMetrics.get();

public SegmentSizeBasedFlushThresholdUpdater(String realtimeTableName) {
public SegmentSizeBasedFlushThresholdUpdater(String realtimeTableName, String topicName) {
_flushThresholdComputer = new SegmentFlushThresholdComputer();
_realtimeTableName = realtimeTableName;
_topicName = topicName;
}

// synchronized since this method could be called for multiple partitions of the same table in different threads
@@ -57,8 +59,9 @@ public synchronized void updateFlushThreshold(StreamConfig streamConfig, Segment
newSegmentZKMetadata.getSegmentName());
newSegmentZKMetadata.setSizeThresholdToFlushSegment(threshold);

_controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, ControllerGauge.NUM_ROWS_THRESHOLD, threshold);
_controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, ControllerGauge.COMMITTING_SEGMENT_SIZE,
_controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, _topicName, ControllerGauge.NUM_ROWS_THRESHOLD,
threshold);
_controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, _topicName, ControllerGauge.COMMITTING_SEGMENT_SIZE,
committingSegmentDescriptor.getSegmentSizeBytes());
}
}
Original file line number Diff line number Diff line change
@@ -90,7 +90,9 @@ public void testFlushThresholdUpdateManager() {
segmentBasedflushThresholdUpdater = flushThresholdUpdater;

// Clear the updater
flushThresholdUpdateManager.clearFlushThresholdUpdater(REALTIME_TABLE_NAME);
assertEquals(flushThresholdUpdateManager.getFlushThresholdUpdaterMapSize(), 1);
flushThresholdUpdateManager.clearFlushThresholdUpdater(mockStreamConfig(0, -1, -1));
assertEquals(flushThresholdUpdateManager.getFlushThresholdUpdaterMapSize(), 0);

// Call again with flush threshold rows set to 0 - a different Object should be returned
flushThresholdUpdater = flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1, -1));
@@ -140,7 +142,7 @@ public void testSegmentSizeBasedFlushThreshold() {
for (long[] segmentSizesMB : Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
STEPS_SEGMENT_SIZES_MB)) {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, streamConfig.getTopicName());

// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
@@ -178,7 +180,7 @@ public void testSegmentSizeBasedFlushThresholdMinPartition() {
for (long[] segmentSizesMB : Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
STEPS_SEGMENT_SIZES_MB)) {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, streamConfig.getTopicName());

// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1);
@@ -238,9 +240,9 @@ private long getSegmentSizeBytes(int numRowsConsumed, long[] segmentSizesMB) {

@Test
public void testTimeThreshold() {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, streamConfig.getTopicName());

// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
@@ -272,9 +274,9 @@ public void testTimeThreshold() {

@Test
public void testMinThreshold() {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, streamConfig.getTopicName());

// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
@@ -305,15 +307,15 @@ public void testMinThreshold() {

@Test
public void testSegmentSizeBasedUpdaterWithModifications() {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater
= new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);

// Use customized stream config
long flushSegmentDesiredSizeBytes = StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES / 2;
long flushThresholdTimeMillis = StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS / 2;
int flushAutotuneInitialRows = StreamConfig.DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS / 2;
StreamConfig streamConfig =
mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows);
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater
= new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, streamConfig.getTopicName());

// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);

0 comments on commit c012d4f

Please sign in to comment.