From 9a8fa791999cb792943fdaab19b498daeb9fb0dc Mon Sep 17 00:00:00 2001 From: swaminathanmanish <126024920+swaminathanmanish@users.noreply.github.com> Date: Fri, 1 Mar 2024 16:56:59 -0800 Subject: [PATCH] Allow passing custom record reader to be inited/closed in SegmentProcessorFramework (#12529) --- .../SegmentProcessorFrameworkTest.java | 17 ++++++++----- .../data/readers/RecordReaderFileConfig.java | 25 +++++++++++-------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java index 0ec9261d92ab..c2c4c51789ee 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java @@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; @@ -194,9 +195,11 @@ public void testRecordReaderFileConfigInit() throws Exception { FileUtils.forceMkdir(workingDir); ClassLoader classLoader = getClass().getClassLoader(); URL resource = classLoader.getResource("data/dimBaseballTeams.csv"); - RecordReaderFileConfig reader = new RecordReaderFileConfig(FileFormat.CSV, - new File(resource.toURI()), + RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.CSV, new File(resource.toURI()), null, null); + RecordReaderFileConfig recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, + new File(resource.toURI()), + null, null, recordReader); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable"). setTimeColumnName("time").build(); @@ -208,13 +211,15 @@ public void testRecordReaderFileConfigInit() throws Exception { SegmentProcessorConfig config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build(); - SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir, ImmutableList.of(reader), - Collections.emptyList(), null); + SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir, + ImmutableList.of(recordReaderFileConfig), Collections.emptyList(), null); List outputSegments = framework.process(); assertEquals(outputSegments.size(), 1); ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap); SegmentMetadata segmentMetadata = segment.getSegmentMetadata(); assertEquals(segmentMetadata.getTotalDocs(), 52); + // Verify reader is closed + assertEquals(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig(), true); } @Test @@ -686,7 +691,7 @@ public void testConfigurableMapperOutputSize() ClassLoader classLoader = getClass().getClassLoader(); URL resource = classLoader.getResource("data/dimBaseballTeams.csv"); RecordReaderFileConfig recordReaderFileConfig = - new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null); + new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build(); Schema schema = @@ -738,7 +743,7 @@ public void testConfigurableMapperOutputSize() // output size threshold configured). expectedTotalDocsCount = 52; - recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null); + recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null, null); segmentConfig = new SegmentConfig.Builder().setIntermediateFileSizeThreshold(19).setSegmentNamePrefix("testPrefix") .setSegmentNamePostfix("testPostfix").build(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java index 51e4ed0cfb17..e7566cb0ff5b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java @@ -24,10 +24,8 @@ /** - * Wraps RecordReader info to instantiate a reader. Users can either pass in the - * RecordReader instance directly or the info required to initialize the RecordReader, so that the - * RecordReader can be initialized just when its about to be used, which avoids early/eager - * initialization/memory allocation. + * Placeholder for all RecordReader configs. Manages the lifecycle of a RecordReader by initing/closing within the + * Segment creation framework. */ public class RecordReaderFileConfig { public final FileFormat _fileFormat; @@ -44,20 +42,22 @@ public class RecordReaderFileConfig { // Pass in the info needed to initialize the reader public RecordReaderFileConfig(FileFormat fileFormat, File dataFile, Set fieldsToRead, - @Nullable RecordReaderConfig recordReaderConfig) { + @Nullable RecordReaderConfig recordReaderConfig, @Nullable RecordReader recordReader) { _fileFormat = fileFormat; _dataFile = dataFile; _fieldsToRead = fieldsToRead; _recordReaderConfig = recordReaderConfig; - _recordReader = null; - // This is not a delegate RecordReader i.e. RecordReaderFileConfig owns the RecordReader, so it should be closed - // by RecordReaderFileConfig as well. + // Users can pass in custom readers + _recordReader = recordReader; + // RecordReaderFileConfig owns the lifecycle of RecordReader, to be inited and closed. _isDelegateReader = false; _isRecordReaderInitialized = false; _isRecordReaderClosed = false; } - // Pass in the reader instance directly + // Keeping this for backwards compatibility. We want the lifecycle of the reader to be managed internally + // (inited/closed) by SegmentProcessorFramework. + @Deprecated public RecordReaderFileConfig(RecordReader recordReader) { _recordReader = recordReader; _fileFormat = null; @@ -76,7 +76,12 @@ public RecordReaderFileConfig(RecordReader recordReader) { public RecordReader getRecordReader() throws Exception { if (!_isRecordReaderInitialized) { - _recordReader = RecordReaderFactory.getRecordReader(_fileFormat, _dataFile, _fieldsToRead, _recordReaderConfig); + if (_recordReader == null) { + // Record reader instance to be created and inited + _recordReader = RecordReaderFactory.getRecordReader(_fileFormat, _dataFile, _fieldsToRead, _recordReaderConfig); + } else { + _recordReader.init(_dataFile, _fieldsToRead, _recordReaderConfig); + } _isRecordReaderInitialized = true; } return _recordReader;