From 1a6ffc1aeca7b6f67fd437408ae153c62a7fd24f Mon Sep 17 00:00:00 2001 From: Bolek Ziobrowski <26925920+bziobrowski@users.noreply.github.com> Date: Wed, 8 Jan 2025 17:17:54 +0100 Subject: [PATCH] Fix memory leak in tests. --- .../FixedByteSingleValueMultiColWriter.java | 2 +- ...tableSegmentEntriesAboveThresholdTest.java | 83 +++++---- .../mutable/MutableSegmentImplTestUtils.java | 18 +- .../segment/creator/DictionariesTest.java | 161 ++++++++++-------- .../DictionaryOptimizerCardinalityTest.java | 46 ++--- .../impl/SegmentColumnarIndexCreatorTest.java | 10 +- .../creator/CLPForwardIndexCreatorV2Test.java | 89 +++++----- .../index/loader/SegmentPreProcessorTest.java | 3 +- 8 files changed, 234 insertions(+), 178 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteSingleValueMultiColWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteSingleValueMultiColWriter.java index cafae34189bb..bc86b468f8c8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteSingleValueMultiColWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteSingleValueMultiColWriter.java @@ -59,7 +59,7 @@ public FixedByteSingleValueMultiColWriter(PinotDataBuffer dataBuffer, int cols, } _rowSizeInBytes = rowSizeInBytes; _dataBuffer = dataBuffer; - // For passed in PinotDataBuffer, the caller is responsible of closing the PinotDataBuffer. + // For passed in PinotDataBuffer, the caller is responsible for closing the PinotDataBuffer. _shouldCloseDataBuffer = false; } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java index 1eaaab657d21..97ad899a9c2a 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.PinotBuffersAfterClassCheckRule; import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin; @@ -46,7 +47,7 @@ import org.testng.annotations.Test; -public class MutableSegmentEntriesAboveThresholdTest { +public class MutableSegmentEntriesAboveThresholdTest implements PinotBuffersAfterClassCheckRule { private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), MutableSegmentEntriesAboveThresholdTest.class.getSimpleName()); private static final String AVRO_FILE = "data/test_data-mv.avro"; @@ -140,15 +141,19 @@ public void testNoLimitBreached() throws Exception { File avroFile = getAvroFile(); MutableSegmentImpl mutableSegment = getMutableSegment(avroFile); - StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow()); - try (RecordReader recordReader = RecordReaderFactory - .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) { - GenericRow reuse = new GenericRow(); - while (recordReader.hasNext()) { - mutableSegment.index(recordReader.next(reuse), defaultMetadata); + try { + StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow()); + try (RecordReader recordReader = RecordReaderFactory + .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) { + GenericRow reuse = new GenericRow(); + while (recordReader.hasNext()) { + mutableSegment.index(recordReader.next(reuse), defaultMetadata); + } } + assert mutableSegment.canAddMore(); + } finally { + mutableSegment.destroy(); } - assert mutableSegment.canAddMore(); } @Test @@ -156,39 +161,43 @@ public void testLimitBreached() throws Exception { File avroFile = getAvroFile(); MutableSegmentImpl mutableSegment = getMutableSegment(avroFile); - - Field indexContainerMapField = MutableSegmentImpl.class.getDeclaredField("_indexContainerMap"); - indexContainerMapField.setAccessible(true); - Map colVsIndexContainer = (Map) indexContainerMapField.get(mutableSegment); - - for (Map.Entry entry : colVsIndexContainer.entrySet()) { - Object indexContainer = entry.getValue(); - Field mutableIndexesField = indexContainer.getClass().getDeclaredField("_mutableIndexes"); - mutableIndexesField.setAccessible(true); - Map indexTypeVsMutableIndex = - (Map) mutableIndexesField.get(indexContainer); - - MutableForwardIndex mutableForwardIndex = null; - for (IndexType indexType : indexTypeVsMutableIndex.keySet()) { - if (indexType.getId().equals(StandardIndexes.FORWARD_ID)) { - mutableForwardIndex = (MutableForwardIndex) indexTypeVsMutableIndex.get(indexType); + try { + + Field indexContainerMapField = MutableSegmentImpl.class.getDeclaredField("_indexContainerMap"); + indexContainerMapField.setAccessible(true); + Map colVsIndexContainer = (Map) indexContainerMapField.get(mutableSegment); + + for (Map.Entry entry : colVsIndexContainer.entrySet()) { + Object indexContainer = entry.getValue(); + Field mutableIndexesField = indexContainer.getClass().getDeclaredField("_mutableIndexes"); + mutableIndexesField.setAccessible(true); + Map indexTypeVsMutableIndex = + (Map) mutableIndexesField.get(indexContainer); + + MutableForwardIndex mutableForwardIndex = null; + for (IndexType indexType : indexTypeVsMutableIndex.keySet()) { + if (indexType.getId().equals(StandardIndexes.FORWARD_ID)) { + mutableForwardIndex = (MutableForwardIndex) indexTypeVsMutableIndex.get(indexType); + } } - } - assert mutableForwardIndex != null; + assert mutableForwardIndex != null; - indexTypeVsMutableIndex.put(new ForwardIndexPlugin().getIndexType(), - new FakeMutableForwardIndex(mutableForwardIndex)); - } - StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow()); - try (RecordReader recordReader = RecordReaderFactory - .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) { - GenericRow reuse = new GenericRow(); - while (recordReader.hasNext()) { - mutableSegment.index(recordReader.next(reuse), defaultMetadata); + indexTypeVsMutableIndex.put(new ForwardIndexPlugin().getIndexType(), + new FakeMutableForwardIndex(mutableForwardIndex)); + } + StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow()); + try (RecordReader recordReader = RecordReaderFactory + .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) { + GenericRow reuse = new GenericRow(); + while (recordReader.hasNext()) { + mutableSegment.index(recordReader.next(reuse), defaultMetadata); + } } - } - assert !mutableSegment.canAddMore(); + assert !mutableSegment.canAddMore(); + } finally { + mutableSegment.destroy(); + } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java index b23f203ec7a1..af2ec5dd2e0b 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java @@ -109,16 +109,24 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set entry : jsonIndexConfigs.entrySet()) { segmentConfBuilder.setIndex(entry.getKey(), StandardIndexes.json(), entry.getValue()); } + RealtimeSegmentConfig realtimeSegmentConfig = segmentConfBuilder.build(); return new MutableSegmentImpl(realtimeSegmentConfig, serverMetrics); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java index a678be426984..8514571826e7 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java @@ -35,6 +35,7 @@ import org.apache.avro.util.Utf8; import org.apache.commons.io.FileUtils; import org.apache.pinot.plugin.inputformat.avro.AvroUtils; +import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory; import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator; @@ -73,7 +74,7 @@ import org.testng.annotations.Test; -public class DictionariesTest { +public class DictionariesTest implements PinotBuffersAfterMethodCheckRule { private static final String AVRO_DATA = "data/test_sample_data.avro"; private static final File INDEX_DIR = new File(DictionariesTest.class.toString()); private static final Map> UNIQUE_ENTRIES = new HashMap<>(); @@ -136,64 +137,71 @@ public void test1() throws Exception { ImmutableSegment heapSegment = ImmutableSegmentLoader.load(_segmentDirectory, ReadMode.heap); ImmutableSegment mmapSegment = ImmutableSegmentLoader.load(_segmentDirectory, ReadMode.mmap); + try { - Schema schema = heapSegment.getSegmentMetadata().getSchema(); - for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { - // Skip virtual columns - if (fieldSpec.isVirtualColumn()) { - continue; - } + Schema schema = heapSegment.getSegmentMetadata().getSchema(); + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + // Skip virtual columns + if (fieldSpec.isVirtualColumn()) { + continue; + } - String columnName = fieldSpec.getName(); - Dictionary heapDictionary = heapSegment.getDictionary(columnName); - Dictionary mmapDictionary = mmapSegment.getDictionary(columnName); - - switch (fieldSpec.getDataType()) { - case INT: - Assert.assertTrue(heapDictionary instanceof IntDictionary); - Assert.assertTrue(mmapDictionary instanceof IntDictionary); - int firstInt = heapDictionary.getIntValue(0); - Assert.assertEquals(heapDictionary.indexOf(firstInt), heapDictionary.indexOf(String.valueOf(firstInt))); - Assert.assertEquals(mmapDictionary.indexOf(firstInt), mmapDictionary.indexOf(String.valueOf(firstInt))); - break; - case LONG: - Assert.assertTrue(heapDictionary instanceof LongDictionary); - Assert.assertTrue(mmapDictionary instanceof LongDictionary); - long firstLong = heapDictionary.getLongValue(0); - Assert.assertEquals(heapDictionary.indexOf(firstLong), heapDictionary.indexOf(String.valueOf(firstLong))); - Assert.assertEquals(mmapDictionary.indexOf(firstLong), mmapDictionary.indexOf(String.valueOf(firstLong))); - break; - case FLOAT: - Assert.assertTrue(heapDictionary instanceof FloatDictionary); - Assert.assertTrue(mmapDictionary instanceof FloatDictionary); - float firstFloat = heapDictionary.getFloatValue(0); - Assert.assertEquals(heapDictionary.indexOf(firstFloat), heapDictionary.indexOf(String.valueOf(firstFloat))); - Assert.assertEquals(mmapDictionary.indexOf(firstFloat), mmapDictionary.indexOf(String.valueOf(firstFloat))); - break; - case DOUBLE: - Assert.assertTrue(heapDictionary instanceof DoubleDictionary); - Assert.assertTrue(mmapDictionary instanceof DoubleDictionary); - double firstDouble = heapDictionary.getDoubleValue(0); - Assert.assertEquals(heapDictionary.indexOf(firstDouble), heapDictionary.indexOf(String.valueOf(firstDouble))); - Assert.assertEquals(mmapDictionary.indexOf(firstDouble), mmapDictionary.indexOf(String.valueOf(firstDouble))); - break; - case BIG_DECIMAL: - Assert.assertTrue(heapDictionary instanceof BigDecimalDictionary); - Assert.assertTrue(mmapDictionary instanceof BigDecimalDictionary); - break; - case STRING: - Assert.assertTrue(heapDictionary instanceof StringDictionary); - Assert.assertTrue(mmapDictionary instanceof StringDictionary); - break; - default: - Assert.fail(); - break; - } + String columnName = fieldSpec.getName(); + Dictionary heapDictionary = heapSegment.getDictionary(columnName); + Dictionary mmapDictionary = mmapSegment.getDictionary(columnName); + + switch (fieldSpec.getDataType()) { + case INT: + Assert.assertTrue(heapDictionary instanceof IntDictionary); + Assert.assertTrue(mmapDictionary instanceof IntDictionary); + int firstInt = heapDictionary.getIntValue(0); + Assert.assertEquals(heapDictionary.indexOf(firstInt), heapDictionary.indexOf(String.valueOf(firstInt))); + Assert.assertEquals(mmapDictionary.indexOf(firstInt), mmapDictionary.indexOf(String.valueOf(firstInt))); + break; + case LONG: + Assert.assertTrue(heapDictionary instanceof LongDictionary); + Assert.assertTrue(mmapDictionary instanceof LongDictionary); + long firstLong = heapDictionary.getLongValue(0); + Assert.assertEquals(heapDictionary.indexOf(firstLong), heapDictionary.indexOf(String.valueOf(firstLong))); + Assert.assertEquals(mmapDictionary.indexOf(firstLong), mmapDictionary.indexOf(String.valueOf(firstLong))); + break; + case FLOAT: + Assert.assertTrue(heapDictionary instanceof FloatDictionary); + Assert.assertTrue(mmapDictionary instanceof FloatDictionary); + float firstFloat = heapDictionary.getFloatValue(0); + Assert.assertEquals(heapDictionary.indexOf(firstFloat), heapDictionary.indexOf(String.valueOf(firstFloat))); + Assert.assertEquals(mmapDictionary.indexOf(firstFloat), mmapDictionary.indexOf(String.valueOf(firstFloat))); + break; + case DOUBLE: + Assert.assertTrue(heapDictionary instanceof DoubleDictionary); + Assert.assertTrue(mmapDictionary instanceof DoubleDictionary); + double firstDouble = heapDictionary.getDoubleValue(0); + Assert.assertEquals(heapDictionary.indexOf(firstDouble), + heapDictionary.indexOf(String.valueOf(firstDouble))); + Assert.assertEquals(mmapDictionary.indexOf(firstDouble), + mmapDictionary.indexOf(String.valueOf(firstDouble))); + break; + case BIG_DECIMAL: + Assert.assertTrue(heapDictionary instanceof BigDecimalDictionary); + Assert.assertTrue(mmapDictionary instanceof BigDecimalDictionary); + break; + case STRING: + Assert.assertTrue(heapDictionary instanceof StringDictionary); + Assert.assertTrue(mmapDictionary instanceof StringDictionary); + break; + default: + Assert.fail(); + break; + } - Assert.assertEquals(mmapDictionary.length(), heapDictionary.length()); - for (int i = 0; i < heapDictionary.length(); i++) { - Assert.assertEquals(mmapDictionary.get(i), heapDictionary.get(i)); + Assert.assertEquals(mmapDictionary.length(), heapDictionary.length()); + for (int i = 0; i < heapDictionary.length(); i++) { + Assert.assertEquals(mmapDictionary.get(i), heapDictionary.get(i)); + } } + } finally { + heapSegment.destroy(); + mmapSegment.destroy(); } } @@ -202,27 +210,32 @@ public void test2() throws Exception { ImmutableSegment heapSegment = ImmutableSegmentLoader.load(_segmentDirectory, ReadMode.heap); ImmutableSegment mmapSegment = ImmutableSegmentLoader.load(_segmentDirectory, ReadMode.mmap); - - Schema schema = heapSegment.getSegmentMetadata().getSchema(); - for (String columnName : schema.getPhysicalColumnNames()) { - Dictionary heapDictionary = heapSegment.getDictionary(columnName); - Dictionary mmapDictionary = mmapSegment.getDictionary(columnName); - - for (Object entry : UNIQUE_ENTRIES.get(columnName)) { - String stringValue = entry.toString(); - Assert.assertEquals(mmapDictionary.indexOf(stringValue), heapDictionary.indexOf(stringValue)); - if (!columnName.equals("pageKey")) { - Assert.assertFalse(heapDictionary.indexOf(stringValue) < 0); - Assert.assertFalse(mmapDictionary.indexOf(stringValue) < 0); - } - if (entry instanceof Integer) { - Assert.assertEquals(mmapDictionary.indexOf((int) entry), mmapDictionary.indexOf(stringValue)); - Assert.assertEquals(heapDictionary.indexOf((int) entry), heapDictionary.indexOf(stringValue)); - } else if (entry instanceof Long) { - Assert.assertEquals(mmapDictionary.indexOf((long) entry), mmapDictionary.indexOf(stringValue)); - Assert.assertEquals(heapDictionary.indexOf((long) entry), heapDictionary.indexOf(stringValue)); + try { + + Schema schema = heapSegment.getSegmentMetadata().getSchema(); + for (String columnName : schema.getPhysicalColumnNames()) { + Dictionary heapDictionary = heapSegment.getDictionary(columnName); + Dictionary mmapDictionary = mmapSegment.getDictionary(columnName); + + for (Object entry : UNIQUE_ENTRIES.get(columnName)) { + String stringValue = entry.toString(); + Assert.assertEquals(mmapDictionary.indexOf(stringValue), heapDictionary.indexOf(stringValue)); + if (!columnName.equals("pageKey")) { + Assert.assertFalse(heapDictionary.indexOf(stringValue) < 0); + Assert.assertFalse(mmapDictionary.indexOf(stringValue) < 0); + } + if (entry instanceof Integer) { + Assert.assertEquals(mmapDictionary.indexOf((int) entry), mmapDictionary.indexOf(stringValue)); + Assert.assertEquals(heapDictionary.indexOf((int) entry), heapDictionary.indexOf(stringValue)); + } else if (entry instanceof Long) { + Assert.assertEquals(mmapDictionary.indexOf((long) entry), mmapDictionary.indexOf(stringValue)); + Assert.assertEquals(heapDictionary.indexOf((long) entry), heapDictionary.indexOf(stringValue)); + } } } + } finally { + heapSegment.destroy(); + mmapSegment.destroy(); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionaryOptimizerCardinalityTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionaryOptimizerCardinalityTest.java index 85f57beae7ac..ddf9158e88e7 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionaryOptimizerCardinalityTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionaryOptimizerCardinalityTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.PinotBuffersAfterClassCheckRule; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory; import org.apache.pinot.segment.spi.ImmutableSegment; @@ -51,7 +52,7 @@ import org.testng.annotations.Test; -public class DictionaryOptimizerCardinalityTest { +public class DictionaryOptimizerCardinalityTest implements PinotBuffersAfterClassCheckRule { private static final Logger LOGGER = LoggerFactory.getLogger(DictionaryOptimizerCardinalityTest.class); private static final File INDEX_DIR = new File(DictionaryOptimizerCardinalityTest.class.toString()); @@ -64,24 +65,28 @@ public void testDictionaryForMixedCardinalitiesStringType() throws Exception { ImmutableSegment heapSegment = ImmutableSegmentLoader.load(_segmentDirectory, ReadMode.heap); + try { - Schema schema = heapSegment.getSegmentMetadata().getSchema(); - for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { - // Skip virtual columns - if (fieldSpec.isVirtualColumn()) { - continue; - } + Schema schema = heapSegment.getSegmentMetadata().getSchema(); + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + // Skip virtual columns + if (fieldSpec.isVirtualColumn()) { + continue; + } - String columnName = fieldSpec.getName(); - if ("low_cardinality_strings".equals(columnName)) { - Assert.assertTrue(heapSegment.getForwardIndex(columnName).isDictionaryEncoded(), - "Low cardinality columns should be dictionary encoded"); - } + String columnName = fieldSpec.getName(); + if ("low_cardinality_strings".equals(columnName)) { + Assert.assertTrue(heapSegment.getForwardIndex(columnName).isDictionaryEncoded(), + "Low cardinality columns should be dictionary encoded"); + } - if ("high_cardinality_strings".equals(columnName)) { - Assert.assertFalse(heapSegment.getForwardIndex(columnName).isDictionaryEncoded(), - "High cardinality columns should be raw encoded"); + if ("high_cardinality_strings".equals(columnName)) { + Assert.assertFalse(heapSegment.getForwardIndex(columnName).isDictionaryEncoded(), + "High cardinality columns should be raw encoded"); + } } + } finally { + heapSegment.destroy(); } } @@ -100,12 +105,15 @@ private void setup() ingestionConfig.setRowTimeValueCheck(false); ingestionConfig.setSegmentTimeValueCheck(false); Schema schema = - new Schema.SchemaBuilder().addSingleValueDimension("low_cardinality_strings", FieldSpec.DataType.STRING) + new Schema.SchemaBuilder() + .addSingleValueDimension("low_cardinality_strings", FieldSpec.DataType.STRING) .addSingleValueDimension("high_cardinality_strings", FieldSpec.DataType.STRING) - .addDateTimeField("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build(); + .addDateTimeField("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") + .build(); + List stringColumns = - schema.getDimensionFieldSpecs().stream().filter(x -> x.getDataType() == FieldSpec.DataType.STRING).collect( - Collectors.toList()); + schema.getDimensionFieldSpecs().stream().filter(x -> x.getDataType() == FieldSpec.DataType.STRING) + .collect(Collectors.toList()); List fieldConfigList = stringColumns.stream().map( x -> new FieldConfig(x.getName(), FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), null, null)) diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreatorTest.java index ca86b86166c1..71bd9b2621b7 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreatorTest.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.PinotBuffersAfterClassCheckRule; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; import org.apache.pinot.segment.spi.IndexSegment; @@ -47,7 +48,7 @@ import static org.testng.Assert.assertTrue; -public class SegmentColumnarIndexCreatorTest { +public class SegmentColumnarIndexCreatorTest implements PinotBuffersAfterClassCheckRule { private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SegmentColumnarIndexCreatorTest"); private static final File CONFIG_FILE = new File(TEMP_DIR, "config"); private static final String COLUMN_NAME = "testColumn"; @@ -109,6 +110,8 @@ private static long getStartTimeInSegmentMetadata(String testDateTimeFormat, Str SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); config.setOutDir(indexDirPath); config.setSegmentName(segmentName); + IndexSegment indexSegment = null; + try { FileUtils.deleteQuietly(new File(indexDirPath)); @@ -119,10 +122,13 @@ private static long getStartTimeInSegmentMetadata(String testDateTimeFormat, Str SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); driver.init(config, new GenericRowRecordReader(rows)); driver.build(); - IndexSegment indexSegment = ImmutableSegmentLoader.load(new File(indexDirPath, segmentName), ReadMode.heap); + indexSegment = ImmutableSegmentLoader.load(new File(indexDirPath, segmentName), ReadMode.heap); SegmentMetadata md = indexSegment.getSegmentMetadata(); return md.getStartTime(); } finally { + if (indexSegment != null) { + indexSegment.destroy(); + } FileUtils.deleteQuietly(new File(indexDirPath)); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java index 32732e4cad80..2c02d13dd2cc 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.PinotBuffersAfterClassCheckRule; import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager; import org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2; import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2; @@ -46,7 +47,7 @@ import org.testng.annotations.Test; -public class CLPForwardIndexCreatorV2Test { +public class CLPForwardIndexCreatorV2Test implements PinotBuffersAfterClassCheckRule { private static final String COLUMN_NAME = "column1"; private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), CLPForwardIndexCreatorV2Test.class.getSimpleName()); @@ -76,6 +77,7 @@ public void setUp() @AfterClass public void tearDown() throws IOException { + _memoryManager.close(); TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR); } @@ -83,49 +85,51 @@ public void tearDown() public void testCLPWriter() throws IOException { // Create and ingest into a clp mutable forward indexes - CLPMutableForwardIndexV2 clpMutableForwardIndexV2 = new CLPMutableForwardIndexV2(COLUMN_NAME, _memoryManager); - int rawSizeBytes = 0; - int maxLength = 0; - for (int i = 0; i < _logMessages.size(); i++) { - String logMessage = _logMessages.get(i); - clpMutableForwardIndexV2.setString(i, logMessage); - rawSizeBytes += logMessage.length(); - maxLength = Math.max(maxLength, logMessage.length()); - } + try ( + CLPMutableForwardIndexV2 clpMutableForwardIndexV2 = new CLPMutableForwardIndexV2(COLUMN_NAME, _memoryManager)) { + int rawSizeBytes = 0; + int maxLength = 0; + for (int i = 0; i < _logMessages.size(); i++) { + String logMessage = _logMessages.get(i); + clpMutableForwardIndexV2.setString(i, logMessage); + rawSizeBytes += logMessage.length(); + maxLength = Math.max(maxLength, logMessage.length()); + } - // LZ4 compression type - long rawStringFwdIndexSizeLZ4 = createStringRawForwardIndex(ChunkCompressionType.LZ4, maxLength); - long clpFwdIndexSizeLZ4 = - createAndValidateClpImmutableForwardIndex(clpMutableForwardIndexV2, ChunkCompressionType.LZ4); - // For LZ4 compression: - // 1. CLP raw forward index should achieve at least 40x compression - // 2. at least 25% smaller file size compared to standard raw forward index with LZ4 compression - Assert.assertTrue((float) rawSizeBytes / clpFwdIndexSizeLZ4 >= 40); - Assert.assertTrue((float) rawStringFwdIndexSizeLZ4 / clpFwdIndexSizeLZ4 >= 0.25); - - // ZSTD compression type - long rawStringFwdIndexSizeZSTD = createStringRawForwardIndex(ChunkCompressionType.ZSTANDARD, maxLength); - long clpFwdIndexSizeZSTD = - createAndValidateClpImmutableForwardIndex(clpMutableForwardIndexV2, ChunkCompressionType.ZSTANDARD); - // For ZSTD compression - // 1. CLP raw forward index should achieve at least 66x compression - // 2. at least 19% smaller file size compared to standard raw forward index with ZSTD compression - Assert.assertTrue((float) rawSizeBytes / clpFwdIndexSizeZSTD >= 66); - Assert.assertTrue((float) rawStringFwdIndexSizeZSTD / clpFwdIndexSizeZSTD >= 0.19); + // LZ4 compression type + long rawStringFwdIndexSizeLZ4 = createStringRawForwardIndex(ChunkCompressionType.LZ4, maxLength); + long clpFwdIndexSizeLZ4 = + createAndValidateClpImmutableForwardIndex(clpMutableForwardIndexV2, ChunkCompressionType.LZ4); + // For LZ4 compression: + // 1. CLP raw forward index should achieve at least 40x compression + // 2. at least 25% smaller file size compared to standard raw forward index with LZ4 compression + Assert.assertTrue((float) rawSizeBytes / clpFwdIndexSizeLZ4 >= 40); + Assert.assertTrue((float) rawStringFwdIndexSizeLZ4 / clpFwdIndexSizeLZ4 >= 0.25); + + // ZSTD compression type + long rawStringFwdIndexSizeZSTD = createStringRawForwardIndex(ChunkCompressionType.ZSTANDARD, maxLength); + long clpFwdIndexSizeZSTD = + createAndValidateClpImmutableForwardIndex(clpMutableForwardIndexV2, ChunkCompressionType.ZSTANDARD); + // For ZSTD compression + // 1. CLP raw forward index should achieve at least 66x compression + // 2. at least 19% smaller file size compared to standard raw forward index with ZSTD compression + Assert.assertTrue((float) rawSizeBytes / clpFwdIndexSizeZSTD >= 66); + Assert.assertTrue((float) rawStringFwdIndexSizeZSTD / clpFwdIndexSizeZSTD >= 0.19); + } } private long createStringRawForwardIndex(ChunkCompressionType compressionType, int maxLength) throws IOException { // Create a raw string immutable forward index TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR); - SingleValueVarByteRawIndexCreator index = + try (SingleValueVarByteRawIndexCreator index = new SingleValueVarByteRawIndexCreator(TEMP_DIR, compressionType, COLUMN_NAME, _logMessages.size(), - FieldSpec.DataType.STRING, maxLength); - for (String logMessage : _logMessages) { - index.putString(logMessage); + FieldSpec.DataType.STRING, maxLength)) { + for (String logMessage : _logMessages) { + index.putString(logMessage); + } + index.seal(); } - index.seal(); - index.close(); File indexFile = new File(TEMP_DIR, COLUMN_NAME + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION); return indexFile.length(); @@ -138,11 +142,16 @@ private long createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2 // Read from immutable forward index and validate the content File indexFile = new File(TEMP_DIR, COLUMN_NAME + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION); - PinotDataBuffer pinotDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile); - CLPForwardIndexReaderV2 clpForwardIndexReaderV2 = new CLPForwardIndexReaderV2(pinotDataBuffer, _logMessages.size()); - CLPForwardIndexReaderV2.CLPReaderContext clpForwardIndexReaderV2Context = clpForwardIndexReaderV2.createContext(); - for (int i = 0; i < _logMessages.size(); i++) { - Assert.assertEquals(clpForwardIndexReaderV2.getString(i, clpForwardIndexReaderV2Context), _logMessages.get(i)); + try (PinotDataBuffer pinotDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile)) { + CLPForwardIndexReaderV2 clpForwardIndexReaderV2 = + new CLPForwardIndexReaderV2(pinotDataBuffer, _logMessages.size()); + try (CLPForwardIndexReaderV2.CLPReaderContext clpForwardIndexReaderV2Context = + clpForwardIndexReaderV2.createContext()) { + for (int i = 0; i < _logMessages.size(); i++) { + Assert.assertEquals(clpForwardIndexReaderV2.getString(i, clpForwardIndexReaderV2Context), + _logMessages.get(i)); + } + } } return indexSize; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java index 45a1afbf7480..012c2d920d20 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java @@ -35,6 +35,7 @@ import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.pinot.segment.local.PinotBuffersAfterClassCheckRule; import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.converter.SegmentV1V2ToV3FormatConverter; @@ -80,7 +81,7 @@ import static org.testng.Assert.*; -public class SegmentPreProcessorTest { +public class SegmentPreProcessorTest implements PinotBuffersAfterClassCheckRule { private static final String RAW_TABLE_NAME = "testTable"; private static final String SEGMENT_NAME = "testSegment"; private static final File TEMP_DIR =