Skip to content

Commit

Permalink
Make use of move APIs instead of copy segments from staging director…
Browse files Browse the repository at this point in the history
…y to output directory in the SparkSegmentGenerationJobRunner #14746 (#14755)
  • Loading branch information
chrajeshbabu authored Jan 11, 2025
1 parent 5dd6f8a commit 10de3d0
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@
package org.apache.pinot.plugin.ingestion.batch.common;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.spi.filesystem.PinotFS;
Expand Down Expand Up @@ -92,4 +95,33 @@ public static void moveLocalTarFileToRemote(File localMetadataTarFile, URI outpu
}
FileUtils.deleteQuietly(localMetadataTarFile);
}

/**
* Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
* If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
* log a warning and continue. We assume that source and destination directories are on the same filesystem,
* so that move() can be used.
*
* @param fs
* @param sourceDir
* @param destDir
* @param overwrite
* @throws IOException
* @throws URISyntaxException
*/
public static void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite)
throws IOException, URISyntaxException {
for (String sourcePath : fs.listFiles(sourceDir, true)) {
URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir);
String sourceFilename = SegmentGenerationUtils.getFileName(sourceFileUri);
URI destFileUri =
SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename);

if (!overwrite && fs.exists(destFileUri)) {
LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri);
} else {
fs.move(sourceFileUri, destFileUri, true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -280,8 +278,8 @@ public void run()

LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", stagingDirURI,
outputDirURI);
moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI,
_spec.isOverwriteOutput());
SegmentGenerationJobUtils.moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(),
outputDirURI, _spec.isOverwriteOutput());
} finally {
LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
outputDirFS.delete(stagingDirURI, true);
Expand All @@ -300,35 +298,6 @@ private void createInputFileUriAndSeqIdFile(URI inputFileURI, PinotFS outputDirF
}
}

/**
* Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
* If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
* log a warning and continue. We assume that source and destination directories are on the same filesystem,
* so that move() can be used.
*
* @param fs
* @param sourceDir
* @param destDir
* @param overwrite
* @throws IOException
* @throws URISyntaxException
*/
private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite)
throws IOException, URISyntaxException {
for (String sourcePath : fs.listFiles(sourceDir, true)) {
URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir);
String sourceFilename = SegmentGenerationUtils.getFileName(sourceFileUri);
URI destFileUri =
SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename);

if (!overwrite && fs.exists(destFileUri)) {
LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri);
} else {
fs.move(sourceFileUri, destFileUri, true);
}
}
}

/**
* Can be overridden to plug in custom mapper.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,9 @@ public void call(String pathAndIdx)
}
});
if (stagingDirURI != null) {
LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
outputDirURI);
outputDirFS.copyDir(stagingDirURI, outputDirURI);
LOGGER.info("Trying to move segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
outputDirURI);
SegmentGenerationJobUtils.moveFiles(outputDirFS, stagingDirURI, outputDirURI, true);
}
} finally {
if (stagingDirURI != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@ public void call(String pathAndIdx)
}
});
if (stagingDirURI != null) {
LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
LOGGER.info("Trying to move segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
outputDirURI);
outputDirFS.copyDir(stagingDirURI, outputDirURI);
SegmentGenerationJobUtils.moveFiles(outputDirFS, stagingDirURI, outputDirURI, true);
}
} finally {
if (stagingDirURI != null) {
Expand Down

0 comments on commit 10de3d0

Please sign in to comment.