From 10de3d0fc2f1bf1f6de496915add0718a9fa3c0a Mon Sep 17 00:00:00 2001 From: Rajeshbabu Chintaguntla Date: Sat, 11 Jan 2025 11:35:33 +0530 Subject: [PATCH] Make use of move APIs instead of copy segments from staging directory to output directory in the SparkSegmentGenerationJobRunner #14746 (#14755) --- .../common/SegmentGenerationJobUtils.java | 32 +++++++++++++++++ .../HadoopSegmentGenerationJobRunner.java | 35 ++----------------- .../SparkSegmentGenerationJobRunner.java | 6 ++-- .../SparkSegmentGenerationJobRunner.java | 4 +-- 4 files changed, 39 insertions(+), 38 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java index 29c68ec3ecd9..816bef6232e7 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java @@ -19,8 +19,10 @@ package org.apache.pinot.plugin.ingestion.batch.common; import java.io.File; +import java.io.IOException; import java.io.Serializable; import java.net.URI; +import java.net.URISyntaxException; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; @@ -28,6 +30,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.spi.filesystem.PinotFS; @@ -92,4 +95,33 @@ public static void moveLocalTarFileToRemote(File localMetadataTarFile, URI outpu } FileUtils.deleteQuietly(localMetadataTarFile); } + + /** + * Move all files from the to the , but don't delete existing contents of destDir. + * If is true, and the source file exists in the destination directory, then replace it, otherwise + * log a warning and continue. We assume that source and destination directories are on the same filesystem, + * so that move() can be used. + * + * @param fs + * @param sourceDir + * @param destDir + * @param overwrite + * @throws IOException + * @throws URISyntaxException + */ + public static void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite) + throws IOException, URISyntaxException { + for (String sourcePath : fs.listFiles(sourceDir, true)) { + URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir); + String sourceFilename = SegmentGenerationUtils.getFileName(sourceFileUri); + URI destFileUri = + SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename); + + if (!overwrite && fs.exists(destFileUri)) { + LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri); + } else { + fs.move(sourceFileUri, destFileUri, true); + } + } + } } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java index 188757bb94a8..835f518d0957 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -22,10 +22,8 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; -import java.io.IOException; import java.io.Serializable; import java.net.URI; -import java.net.URISyntaxException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; @@ -280,8 +278,8 @@ public void run() LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", stagingDirURI, outputDirURI); - moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, - _spec.isOverwriteOutput()); + SegmentGenerationJobUtils.moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), + outputDirURI, _spec.isOverwriteOutput()); } finally { LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI); outputDirFS.delete(stagingDirURI, true); @@ -300,35 +298,6 @@ private void createInputFileUriAndSeqIdFile(URI inputFileURI, PinotFS outputDirF } } - /** - * Move all files from the to the , but don't delete existing contents of destDir. - * If is true, and the source file exists in the destination directory, then replace it, otherwise - * log a warning and continue. We assume that source and destination directories are on the same filesystem, - * so that move() can be used. - * - * @param fs - * @param sourceDir - * @param destDir - * @param overwrite - * @throws IOException - * @throws URISyntaxException - */ - private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite) - throws IOException, URISyntaxException { - for (String sourcePath : fs.listFiles(sourceDir, true)) { - URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir); - String sourceFilename = SegmentGenerationUtils.getFileName(sourceFileUri); - URI destFileUri = - SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename); - - if (!overwrite && fs.exists(destFileUri)) { - LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri); - } else { - fs.move(sourceFileUri, destFileUri, true); - } - } - } - /** * Can be overridden to plug in custom mapper. */ diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index dcaf01379a18..edcd13e3a6ac 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -318,9 +318,9 @@ public void call(String pathAndIdx) } }); if (stagingDirURI != null) { - LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI, - outputDirURI); - outputDirFS.copyDir(stagingDirURI, outputDirURI); + LOGGER.info("Trying to move segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI, + outputDirURI); + SegmentGenerationJobUtils.moveFiles(outputDirFS, stagingDirURI, outputDirURI, true); } } finally { if (stagingDirURI != null) { diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java index 4d6b9eb699cb..c3ecdb332641 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java @@ -326,9 +326,9 @@ public void call(String pathAndIdx) } }); if (stagingDirURI != null) { - LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI, + LOGGER.info("Trying to move segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI, outputDirURI); - outputDirFS.copyDir(stagingDirURI, outputDirURI); + SegmentGenerationJobUtils.moveFiles(outputDirFS, stagingDirURI, outputDirURI, true); } } finally { if (stagingDirURI != null) {