Skip to content

Commit

Permalink
Fix memory leak in tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
bziobrowski committed Jan 8, 2025
1 parent 81e922a commit 1a6ffc1
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public FixedByteSingleValueMultiColWriter(PinotDataBuffer dataBuffer, int cols,
}
_rowSizeInBytes = rowSizeInBytes;
_dataBuffer = dataBuffer;
// For passed in PinotDataBuffer, the caller is responsible of closing the PinotDataBuffer.
// For passed in PinotDataBuffer, the caller is responsible for closing the PinotDataBuffer.
_shouldCloseDataBuffer = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.PinotBuffersAfterClassCheckRule;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
Expand All @@ -46,7 +47,7 @@
import org.testng.annotations.Test;


public class MutableSegmentEntriesAboveThresholdTest {
public class MutableSegmentEntriesAboveThresholdTest implements PinotBuffersAfterClassCheckRule {
private static final File TEMP_DIR =
new File(FileUtils.getTempDirectory(), MutableSegmentEntriesAboveThresholdTest.class.getSimpleName());
private static final String AVRO_FILE = "data/test_data-mv.avro";
Expand Down Expand Up @@ -140,55 +141,63 @@ public void testNoLimitBreached()
throws Exception {
File avroFile = getAvroFile();
MutableSegmentImpl mutableSegment = getMutableSegment(avroFile);
StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
try (RecordReader recordReader = RecordReaderFactory
.getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) {
GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
mutableSegment.index(recordReader.next(reuse), defaultMetadata);
try {
StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
try (RecordReader recordReader = RecordReaderFactory
.getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) {
GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
mutableSegment.index(recordReader.next(reuse), defaultMetadata);
}
}
assert mutableSegment.canAddMore();
} finally {
mutableSegment.destroy();
}
assert mutableSegment.canAddMore();
}

@Test
public void testLimitBreached()
throws Exception {
File avroFile = getAvroFile();
MutableSegmentImpl mutableSegment = getMutableSegment(avroFile);

Field indexContainerMapField = MutableSegmentImpl.class.getDeclaredField("_indexContainerMap");
indexContainerMapField.setAccessible(true);
Map<String, Object> colVsIndexContainer = (Map<String, Object>) indexContainerMapField.get(mutableSegment);

for (Map.Entry<String, Object> entry : colVsIndexContainer.entrySet()) {
Object indexContainer = entry.getValue();
Field mutableIndexesField = indexContainer.getClass().getDeclaredField("_mutableIndexes");
mutableIndexesField.setAccessible(true);
Map<IndexType, MutableIndex> indexTypeVsMutableIndex =
(Map<IndexType, MutableIndex>) mutableIndexesField.get(indexContainer);

MutableForwardIndex mutableForwardIndex = null;
for (IndexType indexType : indexTypeVsMutableIndex.keySet()) {
if (indexType.getId().equals(StandardIndexes.FORWARD_ID)) {
mutableForwardIndex = (MutableForwardIndex) indexTypeVsMutableIndex.get(indexType);
try {

Field indexContainerMapField = MutableSegmentImpl.class.getDeclaredField("_indexContainerMap");
indexContainerMapField.setAccessible(true);
Map<String, Object> colVsIndexContainer = (Map<String, Object>) indexContainerMapField.get(mutableSegment);

for (Map.Entry<String, Object> entry : colVsIndexContainer.entrySet()) {
Object indexContainer = entry.getValue();
Field mutableIndexesField = indexContainer.getClass().getDeclaredField("_mutableIndexes");
mutableIndexesField.setAccessible(true);
Map<IndexType, MutableIndex> indexTypeVsMutableIndex =
(Map<IndexType, MutableIndex>) mutableIndexesField.get(indexContainer);

MutableForwardIndex mutableForwardIndex = null;
for (IndexType indexType : indexTypeVsMutableIndex.keySet()) {
if (indexType.getId().equals(StandardIndexes.FORWARD_ID)) {
mutableForwardIndex = (MutableForwardIndex) indexTypeVsMutableIndex.get(indexType);
}
}
}

assert mutableForwardIndex != null;
assert mutableForwardIndex != null;

indexTypeVsMutableIndex.put(new ForwardIndexPlugin().getIndexType(),
new FakeMutableForwardIndex(mutableForwardIndex));
}
StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
try (RecordReader recordReader = RecordReaderFactory
.getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) {
GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
mutableSegment.index(recordReader.next(reuse), defaultMetadata);
indexTypeVsMutableIndex.put(new ForwardIndexPlugin().getIndexType(),
new FakeMutableForwardIndex(mutableForwardIndex));
}
StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
try (RecordReader recordReader = RecordReaderFactory
.getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) {
GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
mutableSegment.index(recordReader.next(reuse), defaultMetadata);
}
}
}

assert !mutableSegment.canAddMore();
assert !mutableSegment.canAddMore();
} finally {
mutableSegment.destroy();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,24 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<Str
String upsertOutOfOrderRecordColumn = upsertConfig == null ? null : upsertConfig.getOutOfOrderRecordColumn();
String dedupTimeColumn = dedupConfig == null ? null : dedupConfig.getDedupTimeColumn();
DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false, true);

RealtimeSegmentConfig.Builder segmentConfBuilder = new RealtimeSegmentConfig.Builder()
.setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
.setStreamName(STREAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000)
.setTableNameWithType(TABLE_NAME_WITH_TYPE)
.setSegmentName(SEGMENT_NAME)
.setStreamName(STREAM_NAME)
.setSchema(schema)
.setTimeColumnName(timeColumnName)
.setCapacity(100000)
.setAvgNumMultiValues(2)
.setIndex(noDictionaryColumns, StandardIndexes.dictionary(), DictionaryIndexConfig.DISABLED)
.setIndex(varLengthDictionaryColumns, StandardIndexes.dictionary(), varLengthDictConf)
.setIndex(invertedIndexColumns, StandardIndexes.inverted(), IndexConfig.ENABLED)
.setSegmentZKMetadata(new SegmentZKMetadata(SEGMENT_NAME))
.setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
.setMemoryManager(new DirectMemoryManager(SEGMENT_NAME))
.setStatsHistory(statsHistory)
.setAggregateMetrics(aggregateMetrics)
.setNullHandlingEnabled(nullHandlingEnabled)
.setUpsertMode(upsertMode)
.setUpsertComparisonColumns(comparisonColumns)
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setIngestionAggregationConfigs(aggregationConfigs)
Expand All @@ -127,9 +135,11 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<Str
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setDedupTimeColumn(dedupTimeColumn)
.setConsumerDir(TEMP_DIR.getAbsolutePath() + "/" + UUID.randomUUID() + "/consumerDir");

for (Map.Entry<String, JsonIndexConfig> entry : jsonIndexConfigs.entrySet()) {
segmentConfBuilder.setIndex(entry.getKey(), StandardIndexes.json(), entry.getValue());
}

RealtimeSegmentConfig realtimeSegmentConfig = segmentConfBuilder.build();
return new MutableSegmentImpl(realtimeSegmentConfig, serverMetrics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.avro.util.Utf8;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
Expand Down Expand Up @@ -73,7 +74,7 @@
import org.testng.annotations.Test;


public class DictionariesTest {
public class DictionariesTest implements PinotBuffersAfterMethodCheckRule {
private static final String AVRO_DATA = "data/test_sample_data.avro";
private static final File INDEX_DIR = new File(DictionariesTest.class.toString());
private static final Map<String, Set<Object>> UNIQUE_ENTRIES = new HashMap<>();
Expand Down Expand Up @@ -136,64 +137,71 @@ public void test1()
throws Exception {
ImmutableSegment heapSegment = ImmutableSegmentLoader.load(_segmentDirectory, ReadMode.heap);
ImmutableSegment mmapSegment = ImmutableSegmentLoader.load(_segmentDirectory, ReadMode.mmap);
try {

Schema schema = heapSegment.getSegmentMetadata().getSchema();
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
// Skip virtual columns
if (fieldSpec.isVirtualColumn()) {
continue;
}
Schema schema = heapSegment.getSegmentMetadata().getSchema();
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
// Skip virtual columns
if (fieldSpec.isVirtualColumn()) {
continue;
}

String columnName = fieldSpec.getName();
Dictionary heapDictionary = heapSegment.getDictionary(columnName);
Dictionary mmapDictionary = mmapSegment.getDictionary(columnName);

switch (fieldSpec.getDataType()) {
case INT:
Assert.assertTrue(heapDictionary instanceof IntDictionary);
Assert.assertTrue(mmapDictionary instanceof IntDictionary);
int firstInt = heapDictionary.getIntValue(0);
Assert.assertEquals(heapDictionary.indexOf(firstInt), heapDictionary.indexOf(String.valueOf(firstInt)));
Assert.assertEquals(mmapDictionary.indexOf(firstInt), mmapDictionary.indexOf(String.valueOf(firstInt)));
break;
case LONG:
Assert.assertTrue(heapDictionary instanceof LongDictionary);
Assert.assertTrue(mmapDictionary instanceof LongDictionary);
long firstLong = heapDictionary.getLongValue(0);
Assert.assertEquals(heapDictionary.indexOf(firstLong), heapDictionary.indexOf(String.valueOf(firstLong)));
Assert.assertEquals(mmapDictionary.indexOf(firstLong), mmapDictionary.indexOf(String.valueOf(firstLong)));
break;
case FLOAT:
Assert.assertTrue(heapDictionary instanceof FloatDictionary);
Assert.assertTrue(mmapDictionary instanceof FloatDictionary);
float firstFloat = heapDictionary.getFloatValue(0);
Assert.assertEquals(heapDictionary.indexOf(firstFloat), heapDictionary.indexOf(String.valueOf(firstFloat)));
Assert.assertEquals(mmapDictionary.indexOf(firstFloat), mmapDictionary.indexOf(String.valueOf(firstFloat)));
break;
case DOUBLE:
Assert.assertTrue(heapDictionary instanceof DoubleDictionary);
Assert.assertTrue(mmapDictionary instanceof DoubleDictionary);
double firstDouble = heapDictionary.getDoubleValue(0);
Assert.assertEquals(heapDictionary.indexOf(firstDouble), heapDictionary.indexOf(String.valueOf(firstDouble)));
Assert.assertEquals(mmapDictionary.indexOf(firstDouble), mmapDictionary.indexOf(String.valueOf(firstDouble)));
break;
case BIG_DECIMAL:
Assert.assertTrue(heapDictionary instanceof BigDecimalDictionary);
Assert.assertTrue(mmapDictionary instanceof BigDecimalDictionary);
break;
case STRING:
Assert.assertTrue(heapDictionary instanceof StringDictionary);
Assert.assertTrue(mmapDictionary instanceof StringDictionary);
break;
default:
Assert.fail();
break;
}
String columnName = fieldSpec.getName();
Dictionary heapDictionary = heapSegment.getDictionary(columnName);
Dictionary mmapDictionary = mmapSegment.getDictionary(columnName);

switch (fieldSpec.getDataType()) {
case INT:
Assert.assertTrue(heapDictionary instanceof IntDictionary);
Assert.assertTrue(mmapDictionary instanceof IntDictionary);
int firstInt = heapDictionary.getIntValue(0);
Assert.assertEquals(heapDictionary.indexOf(firstInt), heapDictionary.indexOf(String.valueOf(firstInt)));
Assert.assertEquals(mmapDictionary.indexOf(firstInt), mmapDictionary.indexOf(String.valueOf(firstInt)));
break;
case LONG:
Assert.assertTrue(heapDictionary instanceof LongDictionary);
Assert.assertTrue(mmapDictionary instanceof LongDictionary);
long firstLong = heapDictionary.getLongValue(0);
Assert.assertEquals(heapDictionary.indexOf(firstLong), heapDictionary.indexOf(String.valueOf(firstLong)));
Assert.assertEquals(mmapDictionary.indexOf(firstLong), mmapDictionary.indexOf(String.valueOf(firstLong)));
break;
case FLOAT:
Assert.assertTrue(heapDictionary instanceof FloatDictionary);
Assert.assertTrue(mmapDictionary instanceof FloatDictionary);
float firstFloat = heapDictionary.getFloatValue(0);
Assert.assertEquals(heapDictionary.indexOf(firstFloat), heapDictionary.indexOf(String.valueOf(firstFloat)));
Assert.assertEquals(mmapDictionary.indexOf(firstFloat), mmapDictionary.indexOf(String.valueOf(firstFloat)));
break;
case DOUBLE:
Assert.assertTrue(heapDictionary instanceof DoubleDictionary);
Assert.assertTrue(mmapDictionary instanceof DoubleDictionary);
double firstDouble = heapDictionary.getDoubleValue(0);
Assert.assertEquals(heapDictionary.indexOf(firstDouble),
heapDictionary.indexOf(String.valueOf(firstDouble)));
Assert.assertEquals(mmapDictionary.indexOf(firstDouble),
mmapDictionary.indexOf(String.valueOf(firstDouble)));
break;
case BIG_DECIMAL:
Assert.assertTrue(heapDictionary instanceof BigDecimalDictionary);
Assert.assertTrue(mmapDictionary instanceof BigDecimalDictionary);
break;
case STRING:
Assert.assertTrue(heapDictionary instanceof StringDictionary);
Assert.assertTrue(mmapDictionary instanceof StringDictionary);
break;
default:
Assert.fail();
break;
}

Assert.assertEquals(mmapDictionary.length(), heapDictionary.length());
for (int i = 0; i < heapDictionary.length(); i++) {
Assert.assertEquals(mmapDictionary.get(i), heapDictionary.get(i));
Assert.assertEquals(mmapDictionary.length(), heapDictionary.length());
for (int i = 0; i < heapDictionary.length(); i++) {
Assert.assertEquals(mmapDictionary.get(i), heapDictionary.get(i));
}
}
} finally {
heapSegment.destroy();
mmapSegment.destroy();
}
}

Expand All @@ -202,27 +210,32 @@ public void test2()
throws Exception {
ImmutableSegment heapSegment = ImmutableSegmentLoader.load(_segmentDirectory, ReadMode.heap);
ImmutableSegment mmapSegment = ImmutableSegmentLoader.load(_segmentDirectory, ReadMode.mmap);

Schema schema = heapSegment.getSegmentMetadata().getSchema();
for (String columnName : schema.getPhysicalColumnNames()) {
Dictionary heapDictionary = heapSegment.getDictionary(columnName);
Dictionary mmapDictionary = mmapSegment.getDictionary(columnName);

for (Object entry : UNIQUE_ENTRIES.get(columnName)) {
String stringValue = entry.toString();
Assert.assertEquals(mmapDictionary.indexOf(stringValue), heapDictionary.indexOf(stringValue));
if (!columnName.equals("pageKey")) {
Assert.assertFalse(heapDictionary.indexOf(stringValue) < 0);
Assert.assertFalse(mmapDictionary.indexOf(stringValue) < 0);
}
if (entry instanceof Integer) {
Assert.assertEquals(mmapDictionary.indexOf((int) entry), mmapDictionary.indexOf(stringValue));
Assert.assertEquals(heapDictionary.indexOf((int) entry), heapDictionary.indexOf(stringValue));
} else if (entry instanceof Long) {
Assert.assertEquals(mmapDictionary.indexOf((long) entry), mmapDictionary.indexOf(stringValue));
Assert.assertEquals(heapDictionary.indexOf((long) entry), heapDictionary.indexOf(stringValue));
try {

Schema schema = heapSegment.getSegmentMetadata().getSchema();
for (String columnName : schema.getPhysicalColumnNames()) {
Dictionary heapDictionary = heapSegment.getDictionary(columnName);
Dictionary mmapDictionary = mmapSegment.getDictionary(columnName);

for (Object entry : UNIQUE_ENTRIES.get(columnName)) {
String stringValue = entry.toString();
Assert.assertEquals(mmapDictionary.indexOf(stringValue), heapDictionary.indexOf(stringValue));
if (!columnName.equals("pageKey")) {
Assert.assertFalse(heapDictionary.indexOf(stringValue) < 0);
Assert.assertFalse(mmapDictionary.indexOf(stringValue) < 0);
}
if (entry instanceof Integer) {
Assert.assertEquals(mmapDictionary.indexOf((int) entry), mmapDictionary.indexOf(stringValue));
Assert.assertEquals(heapDictionary.indexOf((int) entry), heapDictionary.indexOf(stringValue));
} else if (entry instanceof Long) {
Assert.assertEquals(mmapDictionary.indexOf((long) entry), mmapDictionary.indexOf(stringValue));
Assert.assertEquals(heapDictionary.indexOf((long) entry), heapDictionary.indexOf(stringValue));
}
}
}
} finally {
heapSegment.destroy();
mmapSegment.destroy();
}
}

Expand Down
Loading

0 comments on commit 1a6ffc1

Please sign in to comment.