Skip to content

Commit

Permalink
1. Removing pauseless from indexingConfig
Browse files Browse the repository at this point in the history
2. Fixing few formatting issues.
3. Fixing few comments for better readability.
4. Removing unused code: contructors, constants etc
  • Loading branch information
9aman committed Jan 7, 2025
1 parent a97847f commit 4f92b6d
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@

import java.util.Optional;
import javax.validation.constraints.NotNull;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;


public class PauselessConsumptionUtils {
public static final String PAUSELESS_CONSUMPTION_ENABLED = "pauselessConsumptionEnabled";

private PauselessConsumptionUtils() {
// Private constructor to prevent instantiation of utility class
Expand All @@ -42,15 +40,6 @@ private PauselessConsumptionUtils() {
* @throws NullPointerException if tableConfig is null
*/
public static boolean isPauselessEnabled(@NotNull TableConfig tableConfig) {
return checkIngestionConfig(tableConfig) || checkIndexingConfig(tableConfig);
}

private static boolean checkIndexingConfig(@NotNull TableConfig tableConfig) {
return Optional.ofNullable(tableConfig.getIndexingConfig()).map(IndexingConfig::isPauselessConsumptionEnabled)
.orElse(false);
}

private static boolean checkIngestionConfig(@NotNull TableConfig tableConfig) {
return Optional.ofNullable(tableConfig.getIngestionConfig()).map(IngestionConfig::getStreamIngestionConfig)
.map(StreamIngestionConfig::isPauselessConsumptionEnabled).orElse(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ public PauselessSegmentCommitter(Logger segmentLogger, ServerSegmentCompletionPr
super(segmentLogger, protocolHandler, params, segmentUploader, peerDownloadScheme);
}

public PauselessSegmentCommitter(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler,
SegmentCompletionProtocol.Request.Params params, SegmentUploader segmentUploader) {
super(segmentLogger, protocolHandler, params, segmentUploader);
}

/**
* Commits a built segment without executing the segmentCommitStart step. This method assumes that
* segmentCommitStart has already been executed prior to building the segment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,7 @@ public void run() {
// 2. Creates ZK metadata for the new consuming segment
// 3. Updates the IdealState for committing and new consuming segment to ONLINE and CONSUMING
// respectively.
// See design doc for the new commit protocol:
// https://docs.google.com/document/d/1d-xttk7sXFIOqfyZvYw5W_KeGS6Ztmi8eYevBcCrT_c
// Refer to the PR for the new commit protocol: https://github.com/apache/pinot/pull/14741
if (PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {
if (!startSegmentCommit()) {
// If for any reason commit failed, we don't want to be in COMMITTING state when we hold.
Expand Down Expand Up @@ -925,7 +924,7 @@ public void run() {
}
}

boolean startSegmentCommit() {
private boolean startSegmentCommit() {
SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
.withNumRows(_numRowsConsumed).withInstanceId(_instanceId).withReason(_stopReason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ public File downloadSegment(SegmentZKMetadata zkMetadata)
Thread.sleep(Math.min(SLEEP_INTERVAL_MS, timeRemaining));
}

// If we exit the loop without returning, throw an exception
// If we exit the loop without returning, throw an exception
throw new TimeoutException("Failed to download segment after " + TIMEOUT_MINUTES + " minutes of retrying. Segment: "
+ zkMetadata.getSegmentName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class IndexingConfig extends BaseJsonConfig {
private boolean _aggregateMetrics;
private boolean _nullHandlingEnabled;
private boolean _columnMajorSegmentBuilderEnabled = true;
private boolean _pauselessConsumptionEnabled = false;

/**
* If `optimizeDictionary` enabled, dictionary is not created for the high-cardinality
Expand Down Expand Up @@ -460,12 +459,4 @@ public Set<String> getAllReferencedColumns() {
}
return allColumns;
}

public boolean isPauselessConsumptionEnabled() {
return _pauselessConsumptionEnabled;
}

public void setPauselessConsumptionEnabled(boolean pauselessConsumptionEnabled) {
_pauselessConsumptionEnabled = pauselessConsumptionEnabled;
}
}

0 comments on commit 4f92b6d

Please sign in to comment.