Skip to content

Commit

Permalink
refactor to use PauseState
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 committed Aug 30, 2024
1 parent d068be4 commit 17ae277
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -473,25 +473,25 @@ private void setUpPinotController() {

// Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
LOGGER.info("Starting realtime segment manager");
_tableSizeReader =
new TableSizeReader(_executorService, _connectionManager, _controllerMetrics, _helixResourceManager,
_leadControllerManager);
_storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, _controllerMetrics, _leadControllerManager,
_helixResourceManager, _config);
_pinotLLCRealtimeSegmentManager =
new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _storageQuotaChecker, _controllerMetrics);
new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics);
// TODO: Need to put this inside HelixResourceManager when HelixControllerLeadershipManager is removed.
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
_segmentCompletionManager =
new SegmentCompletionManager(_helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
_leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
_sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());

_connectionManager = PoolingHttpClientConnectionManagerHelper.createWithSocketFactory();
_connectionManager.setDefaultSocketConfig(
SocketConfig.custom()
.setSoTimeout(Timeout.of(_config.getServerAdminRequestTimeoutSeconds() * 1000, TimeUnit.MILLISECONDS))
.build());
_segmentCompletionManager =
new SegmentCompletionManager(_helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
_leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
_tableSizeReader =
new TableSizeReader(_executorService, _connectionManager, _controllerMetrics, _helixResourceManager,
_leadControllerManager);
_storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, _controllerMetrics, _leadControllerManager,
_helixResourceManager, _config);

// Setting up periodic tasks
List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.controller.validation.StorageQuotaChecker;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.spi.ColumnMetadata;
Expand Down Expand Up @@ -142,8 +141,6 @@ public class PinotLLCRealtimeSegmentManager {
@Deprecated
public static final String IS_TABLE_PAUSED = "isTablePaused";
public static final String PAUSE_STATE = "pauseState";
// simple field in Ideal State representing storage quota breached for the table
public static final String IS_QUOTA_EXCEEDED = "isQuotaExceeded";
private static final Logger LOGGER = LoggerFactory.getLogger(PinotLLCRealtimeSegmentManager.class);

private static final int STARTING_SEQUENCE_NUMBER = 0; // Initial sequence number for new table segments
Expand Down Expand Up @@ -175,7 +172,6 @@ public class PinotLLCRealtimeSegmentManager {
private final HelixManager _helixManager;
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final PinotHelixResourceManager _helixResourceManager;
private final StorageQuotaChecker _storageQuotaChecker;
private final String _clusterName;
private final ControllerConf _controllerConf;
private final ControllerMetrics _controllerMetrics;
Expand All @@ -192,12 +188,11 @@ public class PinotLLCRealtimeSegmentManager {
private volatile boolean _isStopping = false;

public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
StorageQuotaChecker storageQuotaChecker, ControllerMetrics controllerMetrics) {
ControllerMetrics controllerMetrics) {
_helixAdmin = helixResourceManager.getHelixAdmin();
_helixManager = helixResourceManager.getHelixZkManager();
_propertyStore = helixResourceManager.getPropertyStore();
_helixResourceManager = helixResourceManager;
_storageQuotaChecker = storageQuotaChecker;
_clusterName = helixResourceManager.getHelixClusterName();
_controllerConf = controllerConf;
_controllerMetrics = controllerMetrics;
Expand Down Expand Up @@ -431,8 +426,7 @@ void persistSegmentZKMetadata(String realtimeTableName, SegmentZKMetadata segmen
}
}

@VisibleForTesting
IdealState getIdealState(String realtimeTableName) {
public IdealState getIdealState(String realtimeTableName) {
try {
IdealState idealState = HelixHelper.getTableIdealState(_helixManager, realtimeTableName);
Preconditions.checkState(idealState != null, "Failed to find IdealState for table: " + realtimeTableName);
Expand Down Expand Up @@ -549,12 +543,7 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
// Step-2
long startTimeNs2 = System.nanoTime();
String newConsumingSegmentName = null;

// Update table IS based on storage quota breach so that new consuming segment is not created in case of breach
if (_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig)) {
updateStorageQuotaExceededInIdealState(realtimeTableName, idealState, true);
}
if (!isTablePaused(idealState) && !isStorageQuotaExceeded(idealState)) {
if (!isTablePaused(idealState)) {
StreamConfig streamConfig =
new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
Set<Integer> partitionIds;
Expand Down Expand Up @@ -938,9 +927,8 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig s
assert idealState != null;
boolean isTableEnabled = idealState.isEnabled();
boolean isTablePaused = isTablePaused(idealState);
boolean isStorageQuotaExceeded = isStorageQuotaExceeded(idealState);
boolean offsetsHaveToChange = offsetCriteria != null;
if (isTableEnabled && !isTablePaused && !isStorageQuotaExceeded) {
if (isTableEnabled && !isTablePaused) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
offsetsHaveToChange
? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions
Expand All @@ -954,9 +942,8 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig s
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList,
recreateDeletedConsumingSegment, offsetCriteria);
} else {
LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}, "
+ "isStorageQuotaExceeded: {}", realtimeTableName, isTableEnabled, isTablePaused,
isStorageQuotaExceeded);
LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}",
realtimeTableName, isTableEnabled, isTablePaused);
return idealState;
}
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
Expand Down Expand Up @@ -987,8 +974,7 @@ IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, String
"Exceeded max segment completion time for segment " + committingSegmentName);
}
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
isTablePaused(idealState) || isStorageQuotaExceeded(idealState) ? null : newSegmentName, segmentAssignment,
instancePartitionsMap);
isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap);
return idealState;
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
}
Expand Down Expand Up @@ -1016,10 +1002,6 @@ private static PauseState extractTablePauseState(IdealState idealState) {
return null;
}

private boolean isStorageQuotaExceeded(IdealState idealState) {
return Boolean.parseBoolean(idealState.getRecord().getSimpleField(IS_QUOTA_EXCEEDED));
}

@VisibleForTesting
void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap,
@Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
Expand Down Expand Up @@ -1785,39 +1767,6 @@ private IdealState updatePauseStateInIdealState(String tableNameWithType, boolea
return updatedIdealState;
}

/**
* Updates the table IS property 'isQuotaExceeded' based on provided 'quotaExceeded'.
* Will be a no op in case the IS already has the same value.
* @param tableNameWithType table on which to update the IS
* @param is existing ideal state if available
* @param quotaExceeded boolean indicating whether table has exceeded the quota limits
* @return true if the IS was successfully updated for the table. Returns false in case of no op or the update fails.
*/
public boolean updateStorageQuotaExceededInIdealState(String tableNameWithType, @Nullable IdealState is,
boolean quotaExceeded) {
if (is == null) {
is = getIdealState(tableNameWithType);
}
if (is.getRecord().getBooleanField(IS_QUOTA_EXCEEDED, false) != quotaExceeded) {
IdealState updatedIS = HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> {
ZNRecord znRecord = idealState.getRecord();
znRecord.setSimpleField(IS_QUOTA_EXCEEDED, Boolean.valueOf(quotaExceeded).toString());
return new IdealState(znRecord);
}, RetryPolicies.noDelayRetryPolicy(1));
if (updatedIS == null) {
LOGGER.error("Failed to set 'isQuotaExceeded' to {} in the Ideal State for table {}.", quotaExceeded,
tableNameWithType);
return false;
}
is.getRecord().setBooleanField(IS_QUOTA_EXCEEDED, quotaExceeded);
LOGGER.info("Set 'isQuotaExceeded' to {} in the Ideal State for table {}.", quotaExceeded, tableNameWithType);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_QUOTA_EXCEEDED,
quotaExceeded ? 1 : 0);
return true;
}
return false;
}

private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments) {
if (!consumingSegments.isEmpty()) {
Criteria recipientCriteria = new Criteria();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.PauseState;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
Expand Down Expand Up @@ -119,11 +122,31 @@ protected void processTable(String tableNameWithType, Context context) {
// This will help resume the table consumption once the quota is available due to either quota increase or
// segment deletion.
// In which case we need to pass "recreateDeletedConsumingSegment" as true to "ensureAllPartitionsConsuming" below.
boolean idealStateUpdated = _llcRealtimeSegmentManager.updateStorageQuotaExceededInIdealState(tableNameWithType,
null, _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig));
PauseState pauseState = computePauseState(tableNameWithType);

_llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig,
context._recreateDeletedConsumingSegment || idealStateUpdated, context._offsetCriteria);
context._recreateDeletedConsumingSegment || !pauseState.isPaused(), context._offsetCriteria);
}

private PauseState computePauseState(String tableNameWithType) {
PauseStatusDetails pauseStatus = _llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType);
boolean isTablePaused = pauseStatus.getPauseFlag();
// if table is paused by admin then don't compute
if (!isTablePaused || pauseStatus.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED)) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
boolean isQuotaExceeded = _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_QUOTA_EXCEEDED,
isQuotaExceeded ? 1 : 0);
// if quota breach and pause flag is not in sync, update the IS
if (isQuotaExceeded != isTablePaused) {
String storageQuota = tableConfig.getQuotaConfig() != null ? tableConfig.getQuotaConfig().getStorage() : "NA";
pauseStatus = _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType,
PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
isQuotaExceeded ? "Storage quota of " + storageQuota + " exceeded." : "Table storage within quota limits");
}
}
return new PauseState(pauseStatus.getPauseFlag(), pauseStatus.getReasonCode(), pauseStatus.getComment(),
pauseStatus.getTimestamp());
}

private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig streamConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.validation.StorageQuotaChecker;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
Expand Down Expand Up @@ -92,7 +91,6 @@
import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
import static org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
Expand Down Expand Up @@ -1117,18 +1115,11 @@ private static class FakePinotLLCRealtimeSegmentManager extends PinotLLCRealtime
FileUploadDownloadClient _mockedFileUploadDownloadClient;

FakePinotLLCRealtimeSegmentManager() {
super(mock(PinotHelixResourceManager.class), CONTROLLER_CONF, createMockStorageQuotaChecker(),
mock(ControllerMetrics.class));
}

private static StorageQuotaChecker createMockStorageQuotaChecker() {
StorageQuotaChecker storageQuotaChecker = mock(StorageQuotaChecker.class);
when(storageQuotaChecker.isTableStorageQuotaExceeded(any())).thenReturn(false);
return storageQuotaChecker;
super(mock(PinotHelixResourceManager.class), CONTROLLER_CONF, mock(ControllerMetrics.class));
}

FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
super(pinotHelixResourceManager, config, createMockStorageQuotaChecker(), mock(ControllerMetrics.class));
super(pinotHelixResourceManager, config, mock(ControllerMetrics.class));
}

FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager) {
Expand Down Expand Up @@ -1211,7 +1202,7 @@ void persistSegmentZKMetadata(String realtimeTableName, SegmentZKMetadata segmen
}

@Override
protected IdealState getIdealState(String realtimeTableName) {
public IdealState getIdealState(String realtimeTableName) {
return _idealState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1358,12 +1358,6 @@ private static HelixManager createMockHelixManager(boolean isLeader, boolean isC
return helixManager;
}

private static StorageQuotaChecker createMockStorageQuotaChecker() {
StorageQuotaChecker storageQuotaChecker = mock(StorageQuotaChecker.class);
when(storageQuotaChecker.isTableStorageQuotaExceeded(any())).thenReturn(false);
return storageQuotaChecker;
}

public static class MockPinotLLCRealtimeSegmentManager extends PinotLLCRealtimeSegmentManager {
public SegmentZKMetadata _segmentMetadata;
public MockSegmentCompletionManager _segmentCompletionMgr;
Expand All @@ -1378,7 +1372,7 @@ protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHeli

protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
ControllerMetrics controllerMetrics) {
super(pinotHelixResourceManager, CONTROLLER_CONF, createMockStorageQuotaChecker(), controllerMetrics);
super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ public void setTimeInMillis(String timestamp) {
}

public enum ReasonCode {
ADMINISTRATIVE
ADMINISTRATIVE, STORAGE_QUOTA_EXCEEDED
}
}

0 comments on commit 17ae277

Please sign in to comment.