Skip to content

Commit

Permalink
Added SegmentGeneratorConfig as an optional param to the SegmentPurger (
Browse files Browse the repository at this point in the history
  • Loading branch information
rajagopr authored Jan 28, 2025
1 parent 1edffab commit 8f003bd
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
*/
package org.apache.pinot.core.minion;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorCustomConfigs;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
Expand All @@ -50,12 +53,14 @@ public class SegmentPurger {
private final Schema _schema;
private final RecordPurger _recordPurger;
private final RecordModifier _recordModifier;

private final SegmentGeneratorCustomConfigs _segmentGeneratorCustomConfigs;
private SegmentGeneratorConfig _segmentGeneratorConfig;
private int _numRecordsPurged;
private int _numRecordsModified;

public SegmentPurger(File indexDir, File workingDir, TableConfig tableConfig, Schema schema,
@Nullable RecordPurger recordPurger, @Nullable RecordModifier recordModifier) {
@Nullable RecordPurger recordPurger, @Nullable RecordModifier recordModifier,
@Nullable SegmentGeneratorCustomConfigs segmentGeneratorCustomConfigs) {
Preconditions.checkArgument(recordPurger != null || recordModifier != null,
"At least one of record purger and modifier should be non-null");
_indexDir = indexDir;
Expand All @@ -64,6 +69,7 @@ public SegmentPurger(File indexDir, File workingDir, TableConfig tableConfig, Sc
_schema = schema;
_recordPurger = recordPurger;
_recordModifier = recordModifier;
_segmentGeneratorCustomConfigs = segmentGeneratorCustomConfigs;
}

public File purgeSegment()
Expand All @@ -84,34 +90,45 @@ public File purgeSegment()
return null;
}

SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema);
config.setOutDir(_workingDir.getPath());
config.setSegmentName(segmentName);
initSegmentGeneratorConfig(segmentName);

// Keep index creation time the same as original segment because both segments use the same raw data.
// This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to
// identify if the new pushed segment has newer data than the existing one.
config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));
_segmentGeneratorConfig.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));

// The time column type info is not stored in the segment metadata.
// Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT).
if (segmentMetadata.getTimeInterval() != null) {
config.setTimeColumnName(_tableConfig.getValidationConfig().getTimeColumnName());
config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
_segmentGeneratorConfig.setTimeColumnName(_tableConfig.getValidationConfig().getTimeColumnName());
_segmentGeneratorConfig.setStartTime(Long.toString(segmentMetadata.getStartTime()));
_segmentGeneratorConfig.setEndTime(Long.toString(segmentMetadata.getEndTime()));
_segmentGeneratorConfig.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
}

SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
purgeRecordReader.rewind();
driver.init(config, purgeRecordReader);
driver.init(_segmentGeneratorConfig, purgeRecordReader);
driver.build();
}

LOGGER.info("Finish purging table: {}, segment: {}, purged {} records, modified {} records", tableNameWithType,
segmentName, _numRecordsPurged, _numRecordsModified);

return new File(_workingDir, segmentName);
return new File(_workingDir, _segmentGeneratorConfig.getSegmentName());
}

@VisibleForTesting
void initSegmentGeneratorConfig(String segmentName) {
_segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
_segmentGeneratorConfig.setOutDir(_workingDir.getPath());

if (_segmentGeneratorCustomConfigs != null && StringUtils.isNotEmpty(
_segmentGeneratorCustomConfigs.getSegmentName())) {
_segmentGeneratorConfig.setSegmentName(_segmentGeneratorCustomConfigs.getSegmentName());
} else {
_segmentGeneratorConfig.setSegmentName(segmentName);
}
}

public RecordPurger getRecordPurger() {
Expand All @@ -130,6 +147,10 @@ public int getNumRecordsModified() {
return _numRecordsModified;
}

public SegmentGeneratorConfig getSegmentGeneratorConfig() {
return _segmentGeneratorConfig;
}

private class PurgeRecordReader implements RecordReader {
final PinotSegmentRecordReader _pinotSegmentRecordReader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorCustomConfigs;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
Expand Down Expand Up @@ -126,7 +127,8 @@ public void testPurgeSegment()
};

SegmentPurger segmentPurger =
new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, _schema, recordPurger, recordModifier);
new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, _schema, recordPurger, recordModifier,
null);
File purgedIndexDir = segmentPurger.purgeSegment();

// Check the purge/modify counter in segment purger
Expand Down Expand Up @@ -173,6 +175,27 @@ public void testPurgeSegment()
}
}

@Test
public void testSegmentPurgerWithCustomSegmentGeneratorConfig() {
SegmentPurger.RecordPurger recordPurger = row -> row.getValue(D1).equals(0);

SegmentPurger segmentPurger =
new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, _schema, recordPurger, null, null);
segmentPurger.initSegmentGeneratorConfig("currentSegmentName");
assertEquals(segmentPurger.getSegmentGeneratorConfig().getSegmentName(), "currentSegmentName");

String newSegmentName = "myTable_segment_001";
SegmentGeneratorCustomConfigs segmentGeneratorCustomConfigs = new SegmentGeneratorCustomConfigs();
segmentGeneratorCustomConfigs.setSegmentName(newSegmentName);

// test with custom segment generator configs
SegmentPurger segmentPurger2 =
new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, _schema, recordPurger, null,
segmentGeneratorCustomConfigs);
segmentPurger2.initSegmentGeneratorConfig("currentSegmentName");
assertEquals(segmentPurger2.getSegmentGeneratorConfig().getSegmentName(), newSegmentName);
}

@AfterClass
public void tearDown()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File

_eventObserver.notifyProgress(pinotTaskConfig, "Purging segment: " + indexDir);
SegmentPurger segmentPurger =
new SegmentPurger(indexDir, workingDir, tableConfig, schema, recordPurger, recordModifier);
new SegmentPurger(indexDir, workingDir, tableConfig, schema, recordPurger, recordModifier, null);
long purgeTaskStartTimeNs = MX_BEAN.getCurrentThreadCpuTime();
File purgedSegmentFile = segmentPurger.purgeSegment();
long purgeTaskEndTimeNs = MX_BEAN.getCurrentThreadCpuTime();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.spi.creator;

/**
* Class holds customizable configs for segment generation.
*/
public class SegmentGeneratorCustomConfigs {
private String _segmentName;

public void setSegmentName(String segmentName) {
_segmentName = segmentName;
}

public String getSegmentName() {
return _segmentName;
}
}

0 comments on commit 8f003bd

Please sign in to comment.