diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PauselessConsumptionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PauselessConsumptionUtils.java new file mode 100644 index 000000000000..36449a54229f --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PauselessConsumptionUtils.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import java.util.Optional; +import javax.validation.constraints.NotNull; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; + + +public class PauselessConsumptionUtils { + + private PauselessConsumptionUtils() { + // Private constructor to prevent instantiation of utility class + } + + /** + * Checks if pauseless consumption is enabled for the given table configuration. + * Returns false if any configuration component is missing or if the flag is not set to true. + * + * @param tableConfig The table configuration to check. Must not be null. + * @return true if pauseless consumption is explicitly enabled, false otherwise + * @throws NullPointerException if tableConfig is null + */ + public static boolean isPauselessEnabled(@NotNull TableConfig tableConfig) { + return Optional.ofNullable(tableConfig.getIngestionConfig()).map(IngestionConfig::getStreamIngestionConfig) + .map(StreamIngestionConfig::isPauselessConsumptionEnabled).orElse(false); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java index b119928a461f..fc48095c854d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java @@ -88,26 +88,26 @@ public enum BlockingSegmentCompletionFSMState { BlockingSegmentCompletionFSMState _state = BlockingSegmentCompletionFSMState.HOLDING; // Typically start off in HOLDING state. final long _startTimeMs; - private final LLCSegmentName _segmentName; - private final String _rawTableName; - private final String _realtimeTableName; - private final int _numReplicas; - private final Set _excludedServerStateMap; - private final Map _commitStateMap; - private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory; - private StreamPartitionMsgOffset _winningOffset = null; - private String _winner; - private final PinotLLCRealtimeSegmentManager _segmentManager; - private final SegmentCompletionManager _segmentCompletionManager; - private final long _maxTimeToPickWinnerMs; - private final long _maxTimeToNotifyWinnerMs; - private final long _initialCommitTimeMs; + protected final LLCSegmentName _segmentName; + protected final String _rawTableName; + protected final String _realtimeTableName; + protected final int _numReplicas; + protected final Set _excludedServerStateMap; + protected final Map _commitStateMap; + protected final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory; + protected StreamPartitionMsgOffset _winningOffset = null; + protected String _winner; + protected final PinotLLCRealtimeSegmentManager _segmentManager; + protected final SegmentCompletionManager _segmentCompletionManager; + protected final long _maxTimeToPickWinnerMs; + protected final long _maxTimeToNotifyWinnerMs; + protected final long _initialCommitTimeMs; // Once the winner is notified, they are expected to commit right away. At this point, it is the segment build // time that we need to consider. // We may need to add some time here to allow for getting the lock? For now 0 // We may need to add some time for the committer come back to us (after the build)? For now 0. - private long _maxTimeAllowedToCommitMs; - private final String _controllerVipUrl; + protected long _maxTimeAllowedToCommitMs; + protected final String _controllerVipUrl; public BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, @@ -242,7 +242,10 @@ public SegmentCompletionProtocol.Response segmentConsumed(String instanceId, Str * that they re-transmit their segmentConsumed() message and start over. */ @Override - public SegmentCompletionProtocol.Response segmentCommitStart(String instanceId, StreamPartitionMsgOffset offset) { + public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params reqParams) { + String instanceId = reqParams.getInstanceId(); + StreamPartitionMsgOffset offset = + _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset()); long now = _segmentCompletionManager.getCurrentTimeMs(); if (_excludedServerStateMap.contains(instanceId)) { _logger.warn("Not accepting commit from {} since it had stoppd consuming", instanceId); @@ -261,7 +264,7 @@ public SegmentCompletionProtocol.Response segmentCommitStart(String instanceId, return committerDecidedCommit(instanceId, offset, now); case COMMITTER_NOTIFIED: - return committerNotifiedCommit(instanceId, offset, now); + return committerNotifiedCommit(reqParams, now); case COMMITTER_UPLOADING: return committerUploadingCommit(instanceId, offset, now); @@ -376,7 +379,7 @@ public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProt } // Helper methods that log the current state and the response sent - private SegmentCompletionProtocol.Response fail(String instanceId, StreamPartitionMsgOffset offset) { + protected SegmentCompletionProtocol.Response fail(String instanceId, StreamPartitionMsgOffset offset) { _logger.info("{}:FAIL for instance={} offset={}", _state, instanceId, offset); return SegmentCompletionProtocol.RESP_FAILED; } @@ -398,28 +401,28 @@ private SegmentCompletionProtocol.Response discard(String instanceId, StreamPart return SegmentCompletionProtocol.RESP_DISCARD; } - private SegmentCompletionProtocol.Response keep(String instanceId, StreamPartitionMsgOffset offset) { + protected SegmentCompletionProtocol.Response keep(String instanceId, StreamPartitionMsgOffset offset) { _logger.info("{}:KEEP for instance={} offset={}", _state, instanceId, offset); return new SegmentCompletionProtocol.Response( new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString()) .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP)); } - private SegmentCompletionProtocol.Response catchup(String instanceId, StreamPartitionMsgOffset offset) { + protected SegmentCompletionProtocol.Response catchup(String instanceId, StreamPartitionMsgOffset offset) { _logger.info("{}:CATCHUP for instance={} offset={}", _state, instanceId, offset); return new SegmentCompletionProtocol.Response( new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(_winningOffset.toString()) .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP)); } - private SegmentCompletionProtocol.Response hold(String instanceId, StreamPartitionMsgOffset offset) { + protected SegmentCompletionProtocol.Response hold(String instanceId, StreamPartitionMsgOffset offset) { _logger.info("{}:HOLD for instance={} offset={}", _state, instanceId, offset); return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params() .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD) .withStreamPartitionMsgOffset(offset.toString())); } - private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, + protected SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, StreamPartitionMsgOffset offset) { _state = BlockingSegmentCompletionFSMState.ABORTED; _segmentCompletionManager.getControllerMetrics() @@ -427,14 +430,14 @@ private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String i return hold(instanceId, offset); } - private SegmentCompletionProtocol.Response abortAndReturnFailed() { + protected SegmentCompletionProtocol.Response abortAndReturnFailed() { _state = BlockingSegmentCompletionFSMState.ABORTED; _segmentCompletionManager.getControllerMetrics() .addMeteredTableValue(_rawTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1); return SegmentCompletionProtocol.RESP_FAILED; } - private SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold(long now, String instanceId, + protected SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold(long now, String instanceId, StreamPartitionMsgOffset offset) { if (now > _maxTimeAllowedToCommitMs) { _logger @@ -464,7 +467,7 @@ private SegmentCompletionProtocol.Response partialConsumingConsumed(String insta * message. As long as the committer is not the one who stopped consuming (which we have already checked before * coming here), we will trust the server that this is a valid commit. */ - private SegmentCompletionProtocol.Response partialConsumingCommit(String instanceId, + protected SegmentCompletionProtocol.Response partialConsumingCommit(String instanceId, StreamPartitionMsgOffset offset, long now) { // Do the same as HOLDING__commit return processCommitWhileHoldingOrPartialConsuming(instanceId, offset, now); @@ -510,7 +513,7 @@ private SegmentCompletionProtocol.Response holdingConsumed(String instanceId, St * This not a good state to receive a commit message, but then it may be that the controller * failed over while in the COMMITTER_NOTIFIED state... */ - private SegmentCompletionProtocol.Response holdingCommit(String instanceId, StreamPartitionMsgOffset offset, + protected SegmentCompletionProtocol.Response holdingCommit(String instanceId, StreamPartitionMsgOffset offset, long now) { return processCommitWhileHoldingOrPartialConsuming(instanceId, offset, now); } @@ -565,7 +568,7 @@ private SegmentCompletionProtocol.Response committerDecidedConsumed(String insta * We have already decided who the committer is, but have not let them know yet. So, we don't expect * a commit() call here. */ - private SegmentCompletionProtocol.Response committerDecidedCommit(String instanceId, + protected SegmentCompletionProtocol.Response committerDecidedCommit(String instanceId, StreamPartitionMsgOffset offset, long now) { return processCommitWhileHoldingOrPartialConsuming(instanceId, offset, now); } @@ -621,8 +624,10 @@ private SegmentCompletionProtocol.Response committerNotifiedConsumed(String inst * We have notified the committer. If we get a consumed message from another server, we can ask them to * catchup (if the offset is lower). If anything else, then we pretty much ask them to hold. */ - private SegmentCompletionProtocol.Response committerNotifiedCommit(String instanceId, - StreamPartitionMsgOffset offset, long now) { + protected SegmentCompletionProtocol.Response committerNotifiedCommit( + SegmentCompletionProtocol.Request.Params reqParams, long now) { + String instanceId = reqParams.getInstanceId(); + StreamPartitionMsgOffset offset = _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset()); SegmentCompletionProtocol.Response response = null; response = checkBadCommitRequest(instanceId, offset, now); if (response != null) { @@ -645,7 +650,7 @@ private SegmentCompletionProtocol.Response committerNotifiedStoppedConsuming(Str return processStoppedConsuming(instanceId, offset, reason, false); } - private SegmentCompletionProtocol.Response committerNotifiedExtendBuildTime(String instanceId, + protected SegmentCompletionProtocol.Response committerNotifiedExtendBuildTime(String instanceId, StreamPartitionMsgOffset offset, int extTimeSec, long now) { SegmentCompletionProtocol.Response response = abortIfTooLateAndReturnHold(now, instanceId, offset); if (response == null) { @@ -667,7 +672,7 @@ private SegmentCompletionProtocol.Response committerUploadingConsumed(String ins return processConsumedAfterCommitStart(instanceId, offset, now); } - private SegmentCompletionProtocol.Response committerUploadingCommit(String instanceId, + protected SegmentCompletionProtocol.Response committerUploadingCommit(String instanceId, StreamPartitionMsgOffset offset, long now) { return processCommitWhileUploading(instanceId, offset, now); } @@ -682,7 +687,7 @@ private SegmentCompletionProtocol.Response committingConsumed(String instanceId, return processConsumedAfterCommitStart(instanceId, offset, now); } - private SegmentCompletionProtocol.Response committingCommit(String instanceId, StreamPartitionMsgOffset offset, + protected SegmentCompletionProtocol.Response committingCommit(String instanceId, StreamPartitionMsgOffset offset, long now) { return processCommitWhileUploading(instanceId, offset, now); } @@ -704,7 +709,7 @@ private SegmentCompletionProtocol.Response committedConsumed(String instanceId, return response; } - private SegmentCompletionProtocol.Response committedCommit(String instanceId, StreamPartitionMsgOffset offset) { + protected SegmentCompletionProtocol.Response committedCommit(String instanceId, StreamPartitionMsgOffset offset) { if (offset.compareTo(_winningOffset) == 0) { return keep(instanceId, offset); } @@ -732,7 +737,7 @@ private SegmentCompletionProtocol.Response processStoppedConsuming(String instan } // A common method when the state is > COMMITTER_NOTIFIED. - private SegmentCompletionProtocol.Response processConsumedAfterCommitStart(String instanceId, + protected SegmentCompletionProtocol.Response processConsumedAfterCommitStart(String instanceId, StreamPartitionMsgOffset offset, long now) { SegmentCompletionProtocol.Response response; // We have already picked a winner, and may or many not have heard from them. @@ -754,23 +759,26 @@ private SegmentCompletionProtocol.Response processConsumedAfterCommitStart(Strin + "now={}", _state, instanceId, offset, now); // Ask them to hold, just in case the committer fails for some reason.. return abortAndReturnHold(now, instanceId, offset); + } + // Common case: A different instance is reporting. + return handleNonWinnerCase(instanceId, offset); + } + + protected SegmentCompletionProtocol.Response handleNonWinnerCase(String instanceId, + StreamPartitionMsgOffset offset) { + if (offset.compareTo(_winningOffset) == 0) { + // Wait until winner has posted the segment before asking this server to KEEP the segment. + return hold(instanceId, offset); + } else if (offset.compareTo(_winningOffset) < 0) { + return catchup(instanceId, offset); } else { - // Common case: A different instance is reporting. - if (offset.compareTo(_winningOffset) == 0) { - // Wait until winner has posted the segment before asking this server to KEEP the segment. - response = hold(instanceId, offset); - } else if (offset.compareTo(_winningOffset) < 0) { - response = catchup(instanceId, offset); - } else { - // We have not yet committed, so ask the new responder to hold. They may be the new leader in case the - // committer fails. - response = hold(instanceId, offset); - } + // We have not yet committed, so ask the new responder to hold. They may be the new leader in case the + // committer fails. + return hold(instanceId, offset); } - return response; } - private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams, + protected SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams, CommittingSegmentDescriptor committingSegmentDescriptor) { String instanceId = reqParams.getInstanceId(); StreamPartitionMsgOffset offset = @@ -802,7 +810,7 @@ private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtoc .constructDownloadUrl(_controllerVipUrl, TableNameBuilder.extractRawTableName(_realtimeTableName), _segmentName.getSegmentName())); } - _segmentManager.commitSegmentMetadata(_realtimeTableName, committingSegmentDescriptor); + commitSegmentMetadata(_realtimeTableName, committingSegmentDescriptor); } catch (Exception e) { _logger .error("Caught exception while committing segment metadata for segment: {}", _segmentName.getSegmentName(), @@ -815,6 +823,11 @@ private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtoc return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS; } + protected void commitSegmentMetadata(String realtimeTableName, + CommittingSegmentDescriptor committingSegmentDescriptor) { + _segmentManager.commitSegmentMetadata(realtimeTableName, committingSegmentDescriptor); + } + private SegmentCompletionProtocol.Response processCommitWhileUploading(String instanceId, StreamPartitionMsgOffset offset, long now) { _logger.info("Processing segmentCommit({}, {})", instanceId, offset); @@ -828,7 +841,7 @@ private SegmentCompletionProtocol.Response processCommitWhileUploading(String in .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)); } - private SegmentCompletionProtocol.Response checkBadCommitRequest(String instanceId, StreamPartitionMsgOffset offset, + protected SegmentCompletionProtocol.Response checkBadCommitRequest(String instanceId, StreamPartitionMsgOffset offset, long now) { SegmentCompletionProtocol.Response response = abortIfTooLateAndReturnHold(now, instanceId, offset); if (response != null) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java new file mode 100644 index 000000000000..f1ca0ece26ed --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.realtime; + +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.protocols.SegmentCompletionProtocol; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +public class PauselessSegmentCompletionFSM extends BlockingSegmentCompletionFSM { + public PauselessSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager, + SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, + SegmentZKMetadata segmentMetadata) { + super(segmentManager, segmentCompletionManager, segmentName, segmentMetadata); + if (segmentMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.COMMITTING) { + StreamPartitionMsgOffsetFactory factory = + _segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName); + StreamPartitionMsgOffset endOffset = factory.create(segmentMetadata.getEndOffset()); + _state = BlockingSegmentCompletionFSMState.COMMITTED; + _winningOffset = endOffset; + _winner = "UNKNOWN"; + } + } + + @Override + protected SegmentCompletionProtocol.Response committerNotifiedCommit( + SegmentCompletionProtocol.Request.Params reqParams, long now) { + String instanceId = reqParams.getInstanceId(); + StreamPartitionMsgOffset offset = _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset()); + SegmentCompletionProtocol.Response response = checkBadCommitRequest(instanceId, offset, now); + if (response != null) { + return response; + } + try { + CommittingSegmentDescriptor committingSegmentDescriptor = + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams); + LOGGER.info( + "Starting to commit changes to ZK and ideal state for the segment:{} during pauseles ingestion as the " + + "leader has been selected", _segmentName); + _segmentManager.commitSegmentStartMetadata( + TableNameBuilder.REALTIME.tableNameWithType(_segmentName.getTableName()), committingSegmentDescriptor); + } catch (Exception e) { + // this aims to handle the failures during commitSegmentStartMetadata + // we abort the state machine to allow commit protocol to start from the beginning + // the server would then retry the commit protocol from the start + return abortAndReturnFailed(); + } + _logger.info("{}:Uploading for instance={} offset={}", _state, instanceId, offset); + _state = BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING; + long commitTimeMs = now - _startTimeMs; + if (commitTimeMs > _initialCommitTimeMs) { + // We assume that the commit time holds for all partitions. It is possible, though, that one partition + // commits at a lower time than another partition, and the two partitions are going simultaneously, + // and we may not get the maximum value all the time. + _segmentCompletionManager.setCommitTime(_segmentName.getTableName(), commitTimeMs); + } + return SegmentCompletionProtocol.RESP_COMMIT_CONTINUE; + } + + @Override + public SegmentCompletionProtocol.Response extendBuildTime(final String instanceId, + final StreamPartitionMsgOffset offset, final int extTimeSec) { + final long now = _segmentCompletionManager.getCurrentTimeMs(); + synchronized (this) { + _logger.info("Processing extendBuildTime({}, {}, {})", instanceId, offset, extTimeSec); + switch (_state) { + case PARTIAL_CONSUMING: + case HOLDING: + case COMMITTER_DECIDED: + case COMMITTER_NOTIFIED: + return fail(instanceId, offset); + case COMMITTER_UPLOADING: + return committerNotifiedExtendBuildTime(instanceId, offset, extTimeSec, now); + case COMMITTING: + case COMMITTED: + case ABORTED: + default: + return fail(instanceId, offset); + } + } + } + + @Override + protected void commitSegmentMetadata(String realtimeTableName, + CommittingSegmentDescriptor committingSegmentDescriptor) { + _segmentManager.commitSegmentEndMetadata(realtimeTableName, committingSegmentDescriptor); + } + + @Override + protected SegmentCompletionProtocol.Response handleNonWinnerCase(String instanceId, StreamPartitionMsgOffset offset) { + // Common case: A different instance is reporting. + if (offset.compareTo(_winningOffset) == 0) { + // The winner has already updated the segment's ZK metadata for the committing segment. + // Additionally, a new consuming segment has been created for pauseless ingestion. + // Return "keep" to allow the server to build the segment and begin ingestion for the new consuming segment. + return keep(instanceId, offset); + } else if (offset.compareTo(_winningOffset) < 0) { + return catchup(instanceId, offset); + } else { + // We have not yet committed, so ask the new responder to hold. They may be the new leader in case the + // committer fails. + return hold(instanceId, offset); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 4ba7cd2208e6..3ed88967c67f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -157,7 +157,8 @@ public class PinotLLCRealtimeSegmentManager { /** * After step 1 of segment completion is done, * this is the max time until which step 3 is allowed to complete. - * See {@link #commitSegmentMetadataInternal(String, CommittingSegmentDescriptor)} for explanation of steps 1 2 3 + * See {@link #commitSegmentMetadataInternal(String, CommittingSegmentDescriptor, boolean)} + * for explanation of steps 1 2 3 * This includes any backoffs and retries for the steps 2 and 3 * The segment will be eligible for repairs by the validation manager, if the time exceeds this value */ @@ -506,80 +507,60 @@ public void commitSegmentMetadata(String realtimeTableName, CommittingSegmentDes try { _numCompletingSegments.addAndGet(1); - commitSegmentMetadataInternal(realtimeTableName, committingSegmentDescriptor); + // Validate segment location only for metadata commit + if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation())) { + LOGGER.warn("Committing segment: {} was not uploaded to deep store", + committingSegmentDescriptor.getSegmentName()); + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1); + } + commitSegmentMetadataInternal(realtimeTableName, committingSegmentDescriptor, false); } finally { _numCompletingSegments.addAndGet(-1); } } private void commitSegmentMetadataInternal(String realtimeTableName, - CommittingSegmentDescriptor committingSegmentDescriptor) { + CommittingSegmentDescriptor committingSegmentDescriptor, boolean isStartMetadata) { String committingSegmentName = committingSegmentDescriptor.getSegmentName(); - LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName); - int committingSegmentPartitionGroupId = committingLLCSegment.getPartitionGroupId(); - LOGGER.info("Committing segment metadata for segment: {}", committingSegmentName); - if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation())) { - LOGGER.warn("Committing segment: {} was not uploaded to deep store", committingSegmentName); - _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1); - } - TableConfig tableConfig = getTableConfig(realtimeTableName); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); IdealState idealState = getIdealState(realtimeTableName); Preconditions.checkState( idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING), "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName); - int numReplicas = getNumReplicas(tableConfig, instancePartitions); /* * Update zookeeper in 3 steps. * - * Step 1: Update PROPERTYSTORE to change the old segment metadata status to DONE + * Step 1: Update PROPERTYSTORE to change the old segment metadata status to COMMITTING/ DONE * Step 2: Update PROPERTYSTORE to create the new segment metadata with status IN_PROGRESS * Step 3: Update IDEALSTATES to include new segment in CONSUMING state, and change old segment to ONLINE state. */ - // Step-1 + // Step-1: Update PROPERTYSTORE + LOGGER.info("Committing segment metadata for segment: {}", committingSegmentName); long startTimeNs1 = System.nanoTime(); SegmentZKMetadata committingSegmentZKMetadata = - updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor); - // Refresh the Broker routing to reflect the changes in the segment ZK metadata - _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true); + updateCommittingSegmentMetadata(realtimeTableName, committingSegmentDescriptor, isStartMetadata); - // Step-2 + // Step-2: Create new segment metadata if needed + LOGGER.info("Creating new segment metadata with status IN_PROGRESS: {}", committingSegmentName); long startTimeNs2 = System.nanoTime(); - String newConsumingSegmentName = null; - if (!isTablePaused(idealState)) { - List streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( - streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) - ).collect(Collectors.toList()); - Set partitionIds = getPartitionIds(streamConfigs, idealState); - if (partitionIds.contains(committingSegmentPartitionGroupId)) { - String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); - long newSegmentCreationTimeMs = getCurrentTimeMs(); - LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, - committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs); - createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), newLLCSegment, newSegmentCreationTimeMs, - committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(), - numReplicas); - newConsumingSegmentName = newLLCSegment.getSegmentName(); - } - } + String newConsumingSegmentName = + createNewSegmentMetadata(tableConfig, idealState, committingSegmentDescriptor, committingSegmentZKMetadata, + instancePartitions); - // Step-3 + // Step-3: Update IdealState + LOGGER.info("Updating Idealstate for previous: {} and new segment: {}", committingSegmentName, + newConsumingSegmentName); long startTimeNs3 = System.nanoTime(); - SegmentAssignment segmentAssignment = - SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig, _controllerMetrics); - Map instancePartitionsMap = - Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); // When multiple segments of the same table complete around the same time it is possible that // the idealstate update fails due to contention. We serialize the updates to the idealstate // to reduce this contention. We may still contend with RetentionManager, or other updates // to idealstate from other controllers, but then we have the retry mechanism to get around that. idealState = - updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentName, - segmentAssignment, instancePartitionsMap); + updateIdealStateForSegments(tableConfig, committingSegmentName, newConsumingSegmentName, instancePartitions); long endTimeNs = System.nanoTime(); LOGGER.info( @@ -607,19 +588,158 @@ private void commitSegmentMetadataInternal(String realtimeTableName, } } + // Step 1: Update committing segment metadata + private SegmentZKMetadata updateCommittingSegmentMetadata(String realtimeTableName, + CommittingSegmentDescriptor committingSegmentDescriptor, boolean isStartMetadata) { + String committingSegmentName = committingSegmentDescriptor.getSegmentName(); + SegmentZKMetadata committingSegmentZKMetadata = + isStartMetadata ? updateCommittingSegmentZKMetadataToCOMMITTING(realtimeTableName, committingSegmentDescriptor) + : updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor); + + // Refresh the Broker routing + _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true); + return committingSegmentZKMetadata; + } + + // Step 2: Create new segment metadata + private String createNewSegmentMetadata(TableConfig tableConfig, IdealState idealState, + CommittingSegmentDescriptor committingSegmentDescriptor, + SegmentZKMetadata committingSegmentZKMetadata, InstancePartitions instancePartitions) { + String committingSegmentName = committingSegmentDescriptor.getSegmentName(); + + String realtimeTableName = tableConfig.getTableName(); + int numReplicas = getNumReplicas(tableConfig, instancePartitions); + + String newConsumingSegmentName = null; + if (!isTablePaused(idealState)) { + LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName); + int committingSegmentPartitionGroupId = committingLLCSegment.getPartitionGroupId(); + + List streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( + streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) + ).collect(Collectors.toList()); + Set partitionIds = getPartitionIds(streamConfigs, idealState); + + if (partitionIds.contains(committingSegmentPartitionGroupId)) { + String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); + long newSegmentCreationTimeMs = getCurrentTimeMs(); + LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, + committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs); + // TODO: This code does not support size-based segment thresholds for tables with pauseless enabled. The + // calculation of row thresholds based on segment size depends on the size of the previously committed + // segment. For tables with pauseless mode enabled, this size is unavailable at this step because the + // segment has not yet been built. + + createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), newLLCSegment, newSegmentCreationTimeMs, + committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(), + numReplicas); + newConsumingSegmentName = newLLCSegment.getSegmentName(); + } + } + return newConsumingSegmentName; + } + + // Step 3: Update IdealState + private IdealState updateIdealStateForSegments(TableConfig tableConfig, String committingSegmentName, + String newConsumingSegmentName, InstancePartitions instancePartitions) { + + SegmentAssignment segmentAssignment = + SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig, _controllerMetrics); + Map instancePartitionsMap = + Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); + + return updateIdealStateOnSegmentCompletion(tableConfig.getTableName(), committingSegmentName, + newConsumingSegmentName, segmentAssignment, instancePartitionsMap); + } + + /** + * Invoked during pauseless ingestion after the realtime segment has been ingested but before + * the response is sent to the server to build the segment. + *

+ * This method performs the following actions: + * 1. Updates the property store segment metadata status from IN_PROGRESS to COMMITTING. + * 2. Creates a new property store record for the next consuming segment. + * 3. Updates the ideal state to mark the new segment as CONSUMING. + */ + public void commitSegmentStartMetadata(String realtimeTableName, + CommittingSegmentDescriptor committingSegmentDescriptor) { + LOGGER.info("commitSegmentStartMetadata: starting segment commit for table:{}, segment: {}", realtimeTableName, + committingSegmentDescriptor.getSegmentName()); + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + try { + _numCompletingSegments.addAndGet(1); + commitSegmentMetadataInternal(realtimeTableName, committingSegmentDescriptor, true); + } finally { + _numCompletingSegments.addAndGet(-1); + } + } + + /** + * Invoked after the realtime segment has been built and uploaded. + * Updates the metadata like CRC, download URL, etc. in the Zookeeper metadata for the committing segment. + */ + public void commitSegmentEndMetadata(String realtimeTableName, + CommittingSegmentDescriptor committingSegmentDescriptor) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + try { + _numCompletingSegments.addAndGet(1); + // Validate segment location only for metadata commit + if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation())) { + LOGGER.warn("Committing segment: {} was not uploaded to deep store", + committingSegmentDescriptor.getSegmentName()); + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1); + } + String committingSegmentName = committingSegmentDescriptor.getSegmentName(); + Stat stat = new Stat(); + SegmentZKMetadata committingSegmentZKMetadata = + getSegmentZKMetadata(realtimeTableName, committingSegmentName, stat); + Preconditions.checkState(committingSegmentZKMetadata.getStatus() == Status.COMMITTING, + "Segment status for segment %s should be COMMITTING, found: %s", committingSegmentName, + committingSegmentZKMetadata.getStatus()); + LOGGER.info("Updating segment ZK metadata for segment: {}", committingSegmentName); + updateCommittingSegmentMetadata(realtimeTableName, committingSegmentDescriptor, false); + LOGGER.info("Successfully updated segment metadata for segment: {}", committingSegmentName); + } finally { + _numCompletingSegments.addAndGet(-1); + } + } + /** * Updates segment ZK metadata for the committing segment. */ - private SegmentZKMetadata updateCommittingSegmentZKMetadata(String realtimeTableName, + private SegmentZKMetadata updateCommittingSegmentZKMetadataToCOMMITTING(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) { String segmentName = committingSegmentDescriptor.getSegmentName(); - LOGGER.info("Updating segment ZK metadata for committing segment: {}", segmentName); Stat stat = new Stat(); SegmentZKMetadata committingSegmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat); Preconditions.checkState(committingSegmentZKMetadata.getStatus() == Status.IN_PROGRESS, "Segment status for segment: %s should be IN_PROGRESS, found: %s", segmentName, committingSegmentZKMetadata.getStatus()); + + // TODO Issue 5953 remove the long parsing once metadata is set correctly. + committingSegmentZKMetadata.setEndOffset(committingSegmentDescriptor.getNextOffset()); + committingSegmentZKMetadata.setStatus(Status.COMMITTING); + + persistSegmentZKMetadata(realtimeTableName, committingSegmentZKMetadata, stat.getVersion()); + return committingSegmentZKMetadata; + } + + + /** + * Updates segment ZK metadata for the committing segment. + */ + private SegmentZKMetadata updateCommittingSegmentZKMetadata(String realtimeTableName, + CommittingSegmentDescriptor committingSegmentDescriptor) { + String segmentName = committingSegmentDescriptor.getSegmentName(); + Stat stat = new Stat(); + SegmentZKMetadata committingSegmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat); + // The segment status can be: + // 1. IN_PROGRESS for normal tables + // 2. COMMITTING for pauseless tables + Preconditions.checkState(committingSegmentZKMetadata.getStatus() != Status.DONE, + "Segment status for segment: %s should not be DONE", segmentName); SegmentMetadataImpl segmentMetadata = committingSegmentDescriptor.getSegmentMetadata(); Preconditions.checkState(segmentMetadata != null, "Failed to find segment metadata from descriptor for segment: %s", segmentName); @@ -916,7 +1036,7 @@ private Map getLatestSegmentZKMetadataMap(String rea * leader of the table. * * During segment commit, we update zookeeper in 3 steps - * Step 1: Update PROPERTYSTORE to change the old segment metadata status to DONE + * Step 1: Update PROPERTYSTORE to change the old segment metadata status to DONE/ COMMITTING * Step 2: Update PROPERTYSTORE to create the new segment metadata with status IN_PROGRESS * Step 3: Update IDEALSTATES to include new segment in CONSUMING state, and change old segment to ONLINE state. * diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java index 516ce4c07d93..c62826cb5fe3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java @@ -80,11 +80,11 @@ SegmentCompletionProtocol.Response segmentConsumed(String instanceId, StreamPart * The FSM verifies whether the server is eligible to commit based on its previous * state and the reported offset, and transitions to a committing state if appropriate. * - * @param instanceId The ID of the server instance attempting to commit. - * @param offset The offset being committed by the server. + * @param reqParams The request parameters containing server instance ID, offset, and other + * segment completion protocol information. * @return A response indicating the next action for the server (e.g., CONTINUE or FAILED). */ - SegmentCompletionProtocol.Response segmentCommitStart(String instanceId, StreamPartitionMsgOffset offset); + SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params reqParams); /** * Handles the event where a server indicates it has stopped consuming. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 5bb3f861d7b0..3dbd20974538 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -210,7 +210,7 @@ public SegmentCompletionProtocol.Response segmentCommitStart( SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED; try { fsm = lookupOrCreateFsm(segmentName, SegmentCompletionProtocol.MSG_TYPE_COMMIT); - response = fsm.segmentCommitStart(instanceId, offset); + response = fsm.segmentCommitStart(reqParams); } catch (Exception e) { LOGGER.error("Caught exception in segmentCommitStart for segment {}", segmentNameStr, e); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index e3e17a6f4d2f..c1462ec5b9a5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -639,10 +639,22 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon Lock segmentLock = getSegmentLock(segmentName); segmentLock.lock(); try { - // Download segment from deep store if CRC changes or forced to download; - // otherwise, copy backup directory back to the original index directory. - // And then continue to load the segment from the index directory. - boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, localMetadata); + /* + Determines if a segment should be downloaded from deep storage based on: + 1. A forced download flag. + 2. The segment status being marked as "DONE" in ZK metadata and a CRC mismatch + between ZK metadata and local metadata CRC. + - The "DONE" status confirms that the COMMIT_END_METADATA call succeeded + and the segment is available in deep storage or with a peer before discarding + the local copy. + + Otherwise: + - Copy the backup directory back to the original index directory. + - Continue loading the segment from the index directory. + */ + boolean shouldDownload = + forceDownload || (isSegmentStatusCompleted(zkMetadata) && !hasSameCRC( + zkMetadata, localMetadata)); if (shouldDownload) { // Create backup directory to handle failure of segment reloading. createBackup(indexDir); @@ -705,6 +717,11 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon _logger.info("Reloaded segment: {}", segmentName); } + private boolean isSegmentStatusCompleted(SegmentZKMetadata zkMetadata) { + return zkMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE + || zkMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.UPLOADED; + } + private boolean canReuseExistingDirectoryForReload(SegmentZKMetadata segmentZKMetadata, String currentSegmentTier, SegmentDirectory segmentDirectory, IndexLoadingConfig indexLoadingConfig, Schema schema) throws Exception { @@ -777,7 +794,7 @@ protected File downloadSegment(SegmentZKMetadata zkMetadata) } } - private File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata) + protected File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata) throws Exception { String segmentName = zkMetadata.getSegmentName(); String downloadUrl = zkMetadata.getDownloadUrl(); @@ -827,7 +844,7 @@ private File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata) } } - private File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata) + protected File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata) throws Exception { String segmentName = zkMetadata.getSegmentName(); Preconditions.checkState(_peerDownloadScheme != null, "Peer download is not enabled for table: %s", @@ -987,9 +1004,19 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading tryInitSegmentDirectory(segmentName, String.valueOf(zkMetadata.getCrc()), indexLoadingConfig); SegmentMetadataImpl segmentMetadata = (segmentDirectory == null) ? null : segmentDirectory.getSegmentMetadata(); - // If the segment doesn't exist on server or its CRC has changed, then we - // need to fall back to download the segment from deep store to load it. - if (segmentMetadata == null || !hasSameCRC(zkMetadata, segmentMetadata)) { + /* + If: + 1. The segment doesn't exist on the server, or + 2. The segment status is marked as "DONE" in ZK metadata but there's a CRC mismatch + between the ZK metadata and the local metadata CRC. + - The "DONE" status confirms the COMMIT_END_METADATA call succeeded, + and the segment is available either in deep storage or with a peer + before discarding the local copy. + + Then: + We need to fall back to downloading the segment from deep storage to load it. + */ + if (segmentMetadata == null || (isSegmentStatusCompleted(zkMetadata) && !hasSameCRC(zkMetadata, segmentMetadata))) { if (segmentMetadata == null) { _logger.info("Segment: {} does not exist", segmentName); } else if (!hasSameCRC(zkMetadata, segmentMetadata)) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PauselessSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PauselessSegmentCommitter.java new file mode 100644 index 000000000000..3cbafa15dc2c --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PauselessSegmentCommitter.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.realtime; + +import java.io.File; +import javax.annotation.Nullable; +import org.apache.pinot.common.protocols.SegmentCompletionProtocol; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.slf4j.Logger; + + +public class PauselessSegmentCommitter extends SplitSegmentCommitter { + public PauselessSegmentCommitter(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler, + SegmentCompletionProtocol.Request.Params params, SegmentUploader segmentUploader, + @Nullable String peerDownloadScheme) { + super(segmentLogger, protocolHandler, params, segmentUploader, peerDownloadScheme); + } + + /** + * Commits a built segment without executing the segmentCommitStart step. This method assumes that + * segmentCommitStart has already been executed prior to building the segment. + * + * The commit process follows these steps: + * 1. Uploads the segment tar file to the designated storage location + * 2. Updates the parameters with the new segment location + * 3. Executes the segment commit end protocol with associated metadata + * + * @param segmentBuildDescriptor Contains the built segment information including the tar file + * and associated metadata files + * @return A SegmentCompletionProtocol.Response object indicating the commit status: + * - Returns the successful commit response if all steps complete successfully + * - Returns RESP_FAILED if either the upload fails or the commit end protocol fails + * + * @see SegmentCompletionProtocol + * @see RealtimeSegmentDataManager.SegmentBuildDescriptor + */ + @Override + public SegmentCompletionProtocol.Response commit( + RealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor) { + File segmentTarFile = segmentBuildDescriptor.getSegmentTarFile(); + + String segmentLocation = uploadSegment(segmentTarFile, _segmentUploader, _params); + if (segmentLocation == null) { + return SegmentCompletionProtocol.RESP_FAILED; + } + _params.withSegmentLocation(segmentLocation); + + SegmentCompletionProtocol.Response commitEndResponse = + _protocolHandler.segmentCommitEndWithMetadata(_params, segmentBuildDescriptor.getMetadataFiles()); + + if (!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) { + _segmentLogger.warn("CommitEnd failed with response {}", commitEndResponse.toJsonString()); + return SegmentCompletionProtocol.RESP_FAILED; + } + return commitEndResponse; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 380b358a84ed..dbb8a6b9da49 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -50,6 +50,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; @@ -845,6 +846,22 @@ public void run() { // CONSUMING -> ONLINE state transition. segmentLock.lockInterruptibly(); try { + // For tables with pauseless consumption enabled we want to start the commit protocol that + // 1. Updates the endOffset in the ZK metadata for the committing segment + // 2. Creates ZK metadata for the new consuming segment + // 3. Updates the IdealState for committing and new consuming segment to ONLINE and CONSUMING + // respectively. + // Refer to the PR for the new commit protocol: https://github.com/apache/pinot/pull/14741 + if (PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) { + if (!startSegmentCommit()) { + // If for any reason commit failed, we don't want to be in COMMITTING state when we hold. + // Change the state to HOLDING before looping around. + _state = State.HOLDING; + _segmentLogger.info("Could not commit segment: {}. Retrying after hold", _segmentNameStr); + hold(); + break; + } + } long buildTimeSeconds = response.getBuildTimeSeconds(); buildSegmentForCommit(buildTimeSeconds * 1000L); if (_segmentBuildDescriptor == null) { @@ -907,6 +924,22 @@ public void run() { } } + private boolean startSegmentCommit() { + SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params(); + params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString()) + .withNumRows(_numRowsConsumed).withInstanceId(_instanceId).withReason(_stopReason); + if (_isOffHeap) { + params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes()); + } + SegmentCompletionProtocol.Response segmentCommitStartResponse = _protocolHandler.segmentCommitStart(params); + if (!segmentCommitStartResponse.getStatus() + .equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE)) { + _segmentLogger.warn("CommitStart failed with response {}", segmentCommitStartResponse.toJsonString()); + return false; + } + return true; + } + @VisibleForTesting protected StreamPartitionMsgOffset extractOffset(SegmentCompletionProtocol.Response response) { return _streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 7cb1a7a5bd93..9126bea9e3cb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -22,16 +22,19 @@ import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.function.BooleanSupplier; import java.util.function.Supplier; @@ -51,6 +54,7 @@ import org.apache.pinot.core.data.manager.BaseTableDataManager; import org.apache.pinot.core.data.manager.DuoSegmentDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.util.PeerServerSegmentFinder; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager; @@ -72,6 +76,8 @@ import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.FieldSpec; @@ -119,6 +125,10 @@ public class RealtimeTableDataManager extends BaseTableDataManager { public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); + public static final long DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(10); // 10 minutes + public static final long SLEEP_INTERVAL_MS = 30000; // 30 seconds sleep interval + private static final String SEGMENT_DOWNLOAD_TIMEOUT_MINUTES = "segmentDownloadTimeoutMinutes"; + // TODO: Change it to BooleanSupplier private final Supplier _isServerReadyToServeQueries; @@ -461,7 +471,15 @@ protected void doAddOnlineSegment(String segmentName) ((RealtimeSegmentDataManager) segmentDataManager).goOnlineFromConsuming(zkMetadata); onConsumingToOnline(segmentName); } else { - replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, indexLoadingConfig); + // For pauseless ingestion, the segment is marked ONLINE before it's built and before the COMMIT_END_METADATA + // call completes. + // The server should replace the segment only after the CRC is set by COMMIT_END_METADATA and the segment is + // marked DONE. + // This ensures the segment's download URL is available before discarding the locally built copy, preventing + // data loss if COMMIT_END_METADATA fails. + if (zkMetadata.getStatus() == Status.DONE) { + replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, indexLoadingConfig); + } } } } @@ -544,6 +562,82 @@ private void doAddConsumingSegment(String segmentName) _logger.info("Added new CONSUMING segment: {}", segmentName); } + @Override + public File downloadSegment(SegmentZKMetadata zkMetadata) + throws Exception { + Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS, + "Segment: %s is still IN_PROGRESS and cannot be downloaded", zkMetadata.getSegmentName()); + + // Case: The commit protocol has completed, and the segment is ready to be downloaded either + // from deep storage or from a peer (if peer-to-peer download is enabled). + if (zkMetadata.getStatus() == Status.DONE) { + return super.downloadSegment(zkMetadata); + } + + // The segment status is COMMITTING, indicating that the segment commit process is incomplete. + // Attempting a waited download within the configured time limit. + long downloadTimeoutMilliseconds = + getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType)); + final long startTime = System.currentTimeMillis(); + List onlineServerURIs; + while (System.currentTimeMillis() - startTime < downloadTimeoutMilliseconds) { + // ZK Metadata may change during segment download process; fetch it on every retry. + zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName()); + + if (zkMetadata.getDownloadUrl() != null) { + // The downloadSegment() will throw an exception in case there are some genuine issues. + // We don't want to retry in those scenarios and will throw an exception + return downloadSegmentFromDeepStore(zkMetadata); + } + + if (_peerDownloadScheme != null) { + _logger.info("Peer download is enabled for the segment: {}", zkMetadata.getSegmentName()); + try { + onlineServerURIs = new ArrayList<>(); + PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(), + _helixManager.getClusterName(), _tableNameWithType, zkMetadata.getSegmentName(), _peerDownloadScheme, + onlineServerURIs); + if (!onlineServerURIs.isEmpty()) { + return downloadSegmentFromPeers(zkMetadata); + } + } catch (Exception e) { + _logger.warn("Could not download segment: {} from peer", zkMetadata.getSegmentName(), e); + } + } + + long timeElapsed = System.currentTimeMillis() - startTime; + long timeRemaining = downloadTimeoutMilliseconds - timeElapsed; + + if (timeRemaining <= 0) { + break; + } + + _logger.info("Sleeping for 30 seconds as the segment url is missing. Time remaining: {} minutes", + Math.round(timeRemaining / 60000.0)); + + // Sleep for the shorter of our normal interval or remaining time + Thread.sleep(Math.min(SLEEP_INTERVAL_MS, timeRemaining)); + } + + // If we exit the loop without returning, throw an exception + throw new TimeoutException( + "Failed to download segment after " + TimeUnit.MILLISECONDS.toMinutes(downloadTimeoutMilliseconds) + + " minutes of retrying. Segment: " + zkMetadata.getSegmentName()); + } + + private long getDownloadTimeOutMilliseconds(@Nullable TableConfig tableConfig) { + return Optional.ofNullable(tableConfig).map(TableConfig::getIngestionConfig) + .map(IngestionConfig::getStreamIngestionConfig).map(StreamIngestionConfig::getStreamConfigMaps) + .filter(maps -> !maps.isEmpty()).map(maps -> maps.get(0)).map(map -> map.get(SEGMENT_DOWNLOAD_TIMEOUT_MINUTES)) + .map(timeoutStr -> { + try { + return TimeUnit.MINUTES.toMillis(Long.parseLong(timeoutStr)); + } catch (NumberFormatException e) { + return DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS; + } + }).orElse(DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS); + } + /** * Sets the default time value in the schema as the segment creation time if it is invalid. Time column is used to * manage the segments, so its values have to be within the valid range. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java index 4224019ab0e1..8a637b739508 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java @@ -21,6 +21,7 @@ import java.net.URISyntaxException; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; @@ -79,6 +80,10 @@ public SegmentCommitter createSegmentCommitter(SegmentCompletionProtocol.Request _protocolHandler.getAuthProvider(), _tableConfig.getTableName()); } + if (PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) { + return new PauselessSegmentCommitter(_logger, _protocolHandler, params, segmentUploader, + peerSegmentDownloadScheme); + } return new SplitSegmentCommitter(_logger, _protocolHandler, params, segmentUploader, peerSegmentDownloadScheme); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java index 1e4ebfe1f856..19aea112486e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java @@ -35,11 +35,11 @@ * If that succeeds, swap in-memory segment with the one built. */ public class SplitSegmentCommitter implements SegmentCommitter { - private final SegmentCompletionProtocol.Request.Params _params; - private final ServerSegmentCompletionProtocolHandler _protocolHandler; - private final SegmentUploader _segmentUploader; - private final String _peerDownloadScheme; - private final Logger _segmentLogger; + protected final SegmentCompletionProtocol.Request.Params _params; + protected final ServerSegmentCompletionProtocolHandler _protocolHandler; + protected final SegmentUploader _segmentUploader; + protected final String _peerDownloadScheme; + protected final Logger _segmentLogger; public SplitSegmentCommitter(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler, SegmentCompletionProtocol.Request.Params params, SegmentUploader segmentUploader, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java b/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java index 7f26d759352d..07181ea373e6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java @@ -76,7 +76,7 @@ public static List getPeerServerURIs(HelixManager helixManager, String tabl return onlineServerURIs; } - private static void getOnlineServersFromExternalView(HelixAdmin helixAdmin, String clusterName, + public static void getOnlineServersFromExternalView(HelixAdmin helixAdmin, String clusterName, String tableNameWithType, String segmentName, String downloadScheme, List onlineServerURIs) throws Exception { ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionIntegrationTest.java new file mode 100644 index 000000000000..4e9fcac0abdc --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionIntegrationTest.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig; +import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class PauselessRealtimeIngestionIntegrationTest extends BaseClusterIntegrationTest { + + private static final int NUM_REALTIME_SEGMENTS = 48; + private static final Logger LOGGER = LoggerFactory.getLogger(PauselessRealtimeIngestionIntegrationTest.class); + private List _avroFiles; + + protected void overrideControllerConf(Map properties) { + properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true); + properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true); + properties.put(SegmentCompletionConfig.FSM_SCHEME + "pauseless", + "org.apache.pinot.controller.helix.core.realtime.PauselessSegmentCompletionFSM"); + } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + // Set segment store uri to the one used by controller as data dir (i.e. deep store) + try { + LOGGER.info("Set segment.store.uri: {} for server with scheme: {}", _controllerConfig.getDataDir(), + new URI(_controllerConfig.getDataDir()).getScheme()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + serverConf.setProperty("pinot.server.instance.segment.store.uri", "file:" + _controllerConfig.getDataDir()); + serverConf.setProperty("pinot.server.instance." + HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE, + "true"); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + // Start a customized controller with more frequent realtime segment validation + startController(); + startBroker(); + startServer(); + + _avroFiles = unpackAvroData(_tempDir); + startKafka(); + pushAvroIntoKafka(_avroFiles); + + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0)); + // Replace stream config from indexing config to ingestion config + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig( + new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs()))); + ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true); + tableConfig.getIndexingConfig().setStreamConfigs(null); + tableConfig.setIngestionConfig(ingestionConfig); + addTableConfig(tableConfig); + + waitForAllDocsLoaded(600_000L); + } + + @Test(description = "Ensure that all the segments are ingested, built and uploaded when pauseless consumption is " + + "enabled") + public void testSegmentAssignment() + throws Exception { + String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName()); + verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS); + assertTrue(PauselessConsumptionUtils.isPauselessEnabled(getRealtimeTableConfig())); + TestUtils.waitForCondition((aVoid) -> { + List segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType); + return assertNoSegmentInProhibitedStatus(segmentZKMetadataList, + CommonConstants.Segment.Realtime.Status.COMMITTING); + }, 1000, 100000, "Some segments have status COMMITTING"); + TestUtils.waitForCondition((aVoid) -> { + List segmentZKMetadataList = _helixResourceManager.getSegmentsZKMetadata(tableNameWithType); + return assertUrlPresent(segmentZKMetadataList); + }, 1000, 100000, "Some segments still have missing url"); + } + + @AfterClass + public void tearDown() + throws IOException { + LOGGER.info("Tearing down..."); + dropRealtimeTable(getTableName()); + stopServer(); + stopBroker(); + stopController(); + stopKafka(); + stopZk(); + FileUtils.deleteDirectory(_tempDir); + } + + private void verifyIdealState(String tableName, int numSegmentsExpected) { + IdealState idealState = HelixHelper.getTableIdealState(_helixManager, tableName); + Map> segmentAssignment = idealState.getRecord().getMapFields(); + assertEquals(segmentAssignment.size(), numSegmentsExpected); + } + + private boolean assertUrlPresent(List segmentZKMetadataList) { + for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE + && segmentZKMetadata.getDownloadUrl() == null) { + System.out.println("URl not found for segment: " + segmentZKMetadata.getSegmentName()); + return false; + } + } + return true; + } + + private boolean assertNoSegmentInProhibitedStatus(List segmentZKMetadataList, + CommonConstants.Segment.Realtime.Status prohibitedStatus) { + for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + if (segmentZKMetadata.getStatus() == prohibitedStatus) { + return false; + } + } + return true; + } + + @Override + protected Map getStreamConfigs() { + Map streamConfigMap = getStreamConfigMap(); + streamConfigMap.put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless"); + return streamConfigMap; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java index 5b216ca9d2e2..33bdc9c3ce96 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java @@ -40,6 +40,9 @@ public class StreamIngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Whether to track offsets of the filtered stream messages during consumption.") private boolean _trackFilteredMessageOffsets = false; + @JsonPropertyDescription("Whether pauseless consumption is enabled for the table") + private boolean _pauselessConsumptionEnabled = false; + @JsonCreator public StreamIngestionConfig(@JsonProperty("streamConfigMaps") List> streamConfigMaps) { _streamConfigMaps = streamConfigMaps; @@ -64,4 +67,12 @@ public void setTrackFilteredMessageOffsets(boolean trackFilteredMessageOffsets) public boolean isTrackFilteredMessageOffsets() { return _trackFilteredMessageOffsets; } + + public boolean isPauselessConsumptionEnabled() { + return _pauselessConsumptionEnabled; + } + + public void setPauselessConsumptionEnabled(boolean pauselessConsumptionEnabled) { + _pauselessConsumptionEnabled = pauselessConsumptionEnabled; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 8e27bbccef35..e17b6f565735 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1082,6 +1082,8 @@ public static class Segment { public static class Realtime { public enum Status { IN_PROGRESS, // The segment is still consuming data + COMMITTING, // This state will only be utilised by pauseless ingestion when the segment has been consumed but + // is yet to be build and uploaded by the server. DONE, // The segment has finished consumption and has been committed to the segment store UPLOADED; // The segment is uploaded by an external party