Skip to content

Commit

Permalink
Adds scheduling logic in controller
Browse files Browse the repository at this point in the history
  • Loading branch information
noob-se7en committed Jan 15, 2025
1 parent 1aecc5a commit 5be2722
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,14 @@ public class ForceCommitMessage extends Message {
public static final String FORCE_COMMIT_MSG_SUB_TYPE = "FORCE_COMMIT";
private static final String TABLE_NAME = "tableName";
private static final String SEGMENT_NAMES = "segmentNames";
private static final String BATCH_SIZE = "batchSize";

public ForceCommitMessage(String tableNameWithType, Set<String> segmentNames, int batchSize) {
public ForceCommitMessage(String tableNameWithType, Set<String> segmentNames) {
super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
setMsgSubType(FORCE_COMMIT_MSG_SUB_TYPE);
setExecutionTimeout(-1); // no timeout
ZNRecord znRecord = getRecord();
znRecord.setSimpleField(TABLE_NAME, tableNameWithType);
znRecord.setSimpleField(SEGMENT_NAMES, String.join(",", segmentNames));
znRecord.setIntField(BATCH_SIZE, batchSize);
}

public ForceCommitMessage(Message message) {
Expand All @@ -61,8 +59,4 @@ public String getTableName() {
public Set<String> getSegmentNames() {
return Arrays.stream(getRecord().getSimpleField(SEGMENT_NAMES).split(",")).collect(Collectors.toSet());
}

public int getBatchSize() {
return getRecord().getIntField(BATCH_SIZE, Integer.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.pinot.common.helix.ExtraInstanceConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -251,22 +250,6 @@ public static IdealState getTableIdealState(HelixManager manager, String resourc
return accessor.getProperty(builder.idealStates(resourceName));
}

public static Set<String> getOnlineSegmentsFromIdealState(HelixManager manager, String tableNameWithType,
boolean includeConsuming) {
IdealState tableIdealState = getTableIdealState(manager, tableNameWithType);
Preconditions.checkState((tableIdealState != null), "Table ideal state is null");
Map<String, Map<String, String>> segmentAssignment = tableIdealState.getRecord().getMapFields();
Set<String> matchingSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
Map<String, String> instanceStateMap = entry.getValue();
if (instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE) || (includeConsuming
&& instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING))) {
matchingSegments.add(entry.getKey());
}
}
return matchingSegments;
}

public static ExternalView getExternalViewForResource(HelixAdmin admin, String clusterName, String resourceName) {
return admin.getResourceExternalView(clusterName, resourceName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
Expand Down Expand Up @@ -117,7 +119,10 @@
import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -152,6 +157,9 @@ public class PinotLLCRealtimeSegmentManager {

// Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
private static final int FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS = 15000;
private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.fixedDelayRetryPolicy(10, FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);

// TODO: make this configurable with default set to 10
/**
Expand Down Expand Up @@ -1725,7 +1733,6 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin
* @param tableNameWithType table name with type
* @param partitionGroupIdsToCommit comma separated list of partition group IDs to commit
* @param segmentsToCommit comma separated list of consuming segments to commit
* @param batchSize max number of consuming segments a server can commit at once
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit,
Expand All @@ -1734,10 +1741,98 @@ public Set<String> forceCommit(String tableNameWithType, @Nullable String partit
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments, batchSize);

List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize);
ExecutorService executorService = Executors.newFixedThreadPool(1);

try {
for (Set<String> segmentBatchToCommit : segmentBatchList) {
executorService.submit(() -> executeBatch(tableNameWithType, segmentBatchToCommit));
}
} finally {
executorService.shutdown();
}

return targetConsumingSegments;
}

private void executeBatch(String tableNameWithType, Set<String> segmentBatchToCommit) {
sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);

int attemptCount = 0;
try {
attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> isBatchSuccessful(tableNameWithType, segmentBatchToCommit));
} catch (AttemptsExceededException | RetriableOperationException e) {
String errorMsg =
String.format("Failed to execute the forceCommit batch of segments: %s , attempt count: %d", segmentBatchToCommit,
attemptCount);
LOGGER.error(errorMsg, e);
throw new RuntimeException(e);
}
}

private boolean isBatchSuccessful(String tableNameWithType,
Set<String> segmentBatchToCommit) {

Set<String> onlineSegmentsForTable =
_helixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType, false);

for (String segmentName : segmentBatchToCommit) {
if (!onlineSegmentsForTable.contains(segmentName)) {
return false;
}
}

return true;
}

private List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments,
int batchSize) {
Map<String, Queue<String>> instanceToConsumingSegments = getInstanceToConsumingSegments(idealState, targetConsumingSegments);

List<Set<String>> segmentBatchList = new ArrayList<>();
Set<String> currentBatch = new HashSet<>();
boolean segmentsRemaining = true;

while (segmentsRemaining) {
segmentsRemaining = false;
for (Queue<String> queue : instanceToConsumingSegments.values()) {
if (!queue.isEmpty()) {
currentBatch.add(queue.poll());
if (currentBatch.size() == batchSize) {
segmentBatchList.add(currentBatch);
currentBatch = new HashSet<>();
}
segmentsRemaining = true;
}
}
}

return segmentBatchList;
}

private Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState idealState,
Set<String> targetConsumingSegments) {
Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>();

Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields();
for (String segmentName : segmentNameToInstanceToStateMap.keySet()) {
if (!targetConsumingSegments.contains(segmentName)) {
continue;
}
Map<String, String> instanceToStateMap = segmentNameToInstanceToStateMap.get(segmentName);
for (String instance : instanceToStateMap.keySet()) {
String state = instanceToStateMap.get(instance);
if (state.equals(SegmentStateModel.CONSUMING)) {
instanceToConsumingSegments.putIfAbsent(instance, new LinkedList<>());
instanceToConsumingSegments.get(instance).add(segmentName);
}
}
}

return instanceToConsumingSegments;
}

/**
* Among all consuming segments, filter the ones that are in the given partitions or segments.
*/
Expand Down Expand Up @@ -1780,7 +1875,7 @@ public PauseStatusDetails pauseConsumption(String tableNameWithType, PauseState.
@Nullable String comment) {
IdealState updatedIdealState = updatePauseStateInIdealState(tableNameWithType, true, reasonCode, comment);
Set<String> consumingSegments = findConsumingSegments(updatedIdealState);
sendForceCommitMessageToServers(tableNameWithType, consumingSegments, Integer.MAX_VALUE);
sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
return new PauseStatusDetails(true, consumingSegments, reasonCode, comment != null ? comment
: "Pause flag is set. Consuming segments are being committed."
+ " Use /pauseStatus endpoint in a few moments to check if all consuming segments have been committed.",
Expand Down Expand Up @@ -1825,14 +1920,14 @@ public IdealState updatePauseStateInIdealState(String tableNameWithType, boolean
return updatedIdealState;
}

private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments, int batchSize) {
private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments) {
if (!consumingSegments.isEmpty()) {
Criteria recipientCriteria = new Criteria();
recipientCriteria.setInstanceName("%");
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, consumingSegments, batchSize);
ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, consumingSegments);
int numMessagesSent = _helixManager.getMessagingService().send(recipientCriteria, message, null, -1);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} force commit messages for table: {} segments: {}", numMessagesSent, tableNameWithType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ void reloadSegments(String tableNameWithType, List<String> segmentNames, boolean
/**
* Immediately stop consumption and start committing the consuming segments.
*/
void forceCommit(String tableNameWithType, Set<String> segmentNames, int batchSize);
void forceCommit(String tableNameWithType, Set<String> segmentNames);

/**
* Enables the installation of a method to determine if a server is ready to server queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
Expand All @@ -70,8 +69,6 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -599,115 +596,24 @@ public SegmentUploader getSegmentUploader() {
}

@Override
public void forceCommit(String tableNameWithType, Set<String> segmentNames, int batchSize) {
public void forceCommit(String tableNameWithType, Set<String> segmentNames) {
Preconditions.checkArgument(TableNameBuilder.isRealtimeTableResource(tableNameWithType), String.format(
"Force commit is only supported for segments of realtime tables - table name: %s segment names: %s",
tableNameWithType, segmentNames));
TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
if (tableDataManager == null) {
return;
}

List<List<RealtimeSegmentDataManager>> segmentBatchList =
getSegmentBatchesToCommit(tableDataManager, segmentNames, batchSize);

ExecutorService executorService = Executors.newFixedThreadPool(1);

try {
for (List<RealtimeSegmentDataManager> segmentBatchToCommit : segmentBatchList) {
executorService.submit(() -> {
if (tableDataManager != null) {
segmentNames.forEach(segName -> {
SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segName);
if (segmentDataManager != null) {
try {
executeBatch(tableDataManager, segmentBatchToCommit);
} finally {
for (RealtimeSegmentDataManager realtimeSegmentDataManager : segmentBatchToCommit) {
tableDataManager.releaseSegment(realtimeSegmentDataManager);
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
}
}
});
}
} finally {
executorService.shutdown();
}
}

private List<List<RealtimeSegmentDataManager>> getSegmentBatchesToCommit(TableDataManager tableDataManager,
Set<String> segmentNames, int batchSize) {
List<RealtimeSegmentDataManager> segmentsToCommit = new ArrayList<>();

try {
for (String segmentName : segmentNames) {
SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
if (segmentDataManager != null) {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
segmentsToCommit.add((RealtimeSegmentDataManager) segmentDataManager);
} else {
} finally {
tableDataManager.releaseSegment(segmentDataManager);
}
}
}

return divideSegmentsInBatches(segmentsToCommit, batchSize);
} catch (Exception e) {
for (RealtimeSegmentDataManager realtimeSegmentDataManager : segmentsToCommit) {
tableDataManager.releaseSegment(realtimeSegmentDataManager);
}
throw new RuntimeException(e);
}
}

private List<List<RealtimeSegmentDataManager>> divideSegmentsInBatches(
List<RealtimeSegmentDataManager> segmentsToCommit,
int batchSize) {
List<List<RealtimeSegmentDataManager>> segmentBatchListToRet = new ArrayList<>();
List<RealtimeSegmentDataManager> lastBatch = new ArrayList<>();

for (RealtimeSegmentDataManager segmentDataManager : segmentsToCommit) {
lastBatch.add(segmentDataManager);
if (lastBatch.size() == batchSize) {
segmentBatchListToRet.add(lastBatch);
lastBatch = new ArrayList<>();
}
}

if (!lastBatch.isEmpty()) {
segmentBatchListToRet.add(lastBatch);
}

return segmentBatchListToRet;
}

private void executeBatch(TableDataManager tableDataManager, List<RealtimeSegmentDataManager> segmentBatchToCommit) {
for (RealtimeSegmentDataManager realtimeSegmentDataManager : segmentBatchToCommit) {
realtimeSegmentDataManager.forceCommit();
});
}

int attemptCount = 0;
try {
attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> isBatchSuccessful(tableDataManager, segmentBatchToCommit));
} catch (AttemptsExceededException | RetriableOperationException e) {
List<String> segmentNames = new ArrayList<>();
for (RealtimeSegmentDataManager realtimeSegmentDataManager : segmentBatchToCommit) {
segmentNames.add(realtimeSegmentDataManager.getSegmentName());
}
String errorMsg =
String.format("Failed to execute the forceCommit batch of segments: %s , attempt count: %d", segmentNames,
attemptCount);
LOGGER.error(errorMsg, e);
throw new RuntimeException(e);
}
}

private boolean isBatchSuccessful(TableDataManager tableDataManager,
List<RealtimeSegmentDataManager> segmentBatchToCommit) {
Set<String> onlineSegmentsForTable =
HelixHelper.getOnlineSegmentsFromIdealState(_helixManager, tableDataManager.getTableName(), false);

for (SegmentDataManager segmentDataManager : segmentBatchToCommit) {
if (!onlineSegmentsForTable.contains(segmentDataManager.getSegmentName())) {
return false;
}
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,12 @@ private class ForceCommitMessageHandler extends DefaultMessageHandler {

private String _tableName;
private Set<String> _segmentNames;
private final int _batchSize;

public ForceCommitMessageHandler(ForceCommitMessage forceCommitMessage, ServerMetrics metrics,
NotificationContext ctx) {
super(forceCommitMessage, metrics, ctx);
_tableName = forceCommitMessage.getTableName();
_segmentNames = forceCommitMessage.getSegmentNames();
_batchSize = forceCommitMessage.getBatchSize();
}

@Override
Expand All @@ -231,7 +229,7 @@ public HelixTaskResult handleMessage()
HelixTaskResult helixTaskResult = new HelixTaskResult();
_logger.info("Handling force commit message for table {} segments {}", _tableName, _segmentNames);
try {
_instanceDataManager.forceCommit(_tableName, _segmentNames, _batchSize);
_instanceDataManager.forceCommit(_tableName, _segmentNames);
helixTaskResult.setSuccess(true);
} catch (Exception e) {
_metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETE_TABLE_FAILURES, 1);
Expand Down

0 comments on commit 5be2722

Please sign in to comment.