Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Enable the capability to specify zstd and lz4 segment compression via config #14008

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public class TarCompressionUtils {
private TarCompressionUtils() {
}

/**
* This generic compressed tar file extension does not bind to a particular compressor. Decompression determines the
* appropriate compressor at run-time based on the file's magic number irrespective of the file extension.
* Compression uses the default compressor automatically if this generic extension is used.
*/
public static final String TAR_COMPRESSED_FILE_EXTENSION = ".tar.compressed";
Copy link
Contributor

@deemoliu deemoliu Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly good. we are able to compress and decompress different segment format (tar.gz, tar.zst, tar.compressed) even they appear in one table.

Curious, if we exposed and updated the value of _defaultCompressorName (line 86), how can we make sure the .tar.compressed files can still be decompress by updated compressor?

in another word, do we have test covered the scenarios for changing the default compressor and make sure the existing segments with (.tar.compressed) can be decompressed?

Copy link
Contributor Author

@jackluo923 jackluo923 Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit test is here in Apache commons library. You can name the file extension to whatever you want, such as .tar.deemoliu and you'd still be able to decompress the segment. Decompression does not rely on the file extension to figure out the compressor to use for decompression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, so we using the first bytes to identify which decompressor to use. the apache library initializes the correct compressor for us.

public static final String TAR_GZ_FILE_EXTENSION = ".tar.gz";
public static final String TAR_LZ4_FILE_EXTENSION = ".tar.lz4";
public static final String TAR_ZST_FILE_EXTENSION = ".tar.zst";
Expand All @@ -77,6 +83,13 @@ private TarCompressionUtils() {
CompressorStreamFactory.LZ4_FRAMED, TAR_ZST_FILE_EXTENSION, CompressorStreamFactory.ZSTANDARD);
private static final CompressorStreamFactory COMPRESSOR_STREAM_FACTORY = CompressorStreamFactory.getSingleton();
private static final char ENTRY_NAME_SEPARATOR = '/';
private static String _defaultCompressorName = CompressorStreamFactory.GZIP;

public static void setDefaultCompressor(String compressorName) {
if (COMPRESSOR_NAME_BY_FILE_EXTENSIONS.containsKey(compressorName)) {
_defaultCompressorName = compressorName;
}
}

/**
* Creates a compressed tar file from the input file/directory to the output file. The output file must have
Expand All @@ -93,15 +106,29 @@ public static void createCompressedTarFile(File inputFile, File outputFile)
*/
public static void createCompressedTarFile(File[] inputFiles, File outputFile)
throws IOException {
String compressorName = null;
for (String supportedCompressorExtension : COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
if (outputFile.getName().endsWith(supportedCompressorExtension)) {
compressorName = COMPRESSOR_NAME_BY_FILE_EXTENSIONS.get(supportedCompressorExtension);
break;
if (outputFile.getName().endsWith(TAR_COMPRESSED_FILE_EXTENSION)) {
createCompressedTarFile(inputFiles, outputFile, _defaultCompressorName);
} else {
String compressorName = null;
for (String supportedCompressorExtension : COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
if (outputFile.getName().endsWith(supportedCompressorExtension)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can outputFile endswith ".tar.compressed" which is not a supported compressor file extension?

compressorName = COMPRESSOR_NAME_BY_FILE_EXTENSIONS.get(supportedCompressorExtension);
createCompressedTarFile(inputFiles, outputFile, compressorName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move common code createCompressedTarFile(inputFiles, outputFile, compressorName) after precondition check.

return;
}
Copy link
Contributor

@deemoliu deemoliu Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break in if loop?

}
Preconditions.checkState(null != compressorName,
"Output file: %s does not have a supported compressed tar file extension", outputFile);
}
Preconditions.checkState(null != compressorName,
"Output file: %s does not have a supported compressed tar file extension", outputFile);
}

public static void createCompressedTarFile(File inputFile, File outputFile, String compressorName)
throws IOException {
createCompressedTarFile(new File[]{inputFile}, outputFile, compressorName);
}

public static void createCompressedTarFile(File[] inputFiles, File outputFile, String compressorName)
throws IOException {
try (OutputStream fileOut = Files.newOutputStream(outputFile.toPath());
BufferedOutputStream bufferedOut = new BufferedOutputStream(fileOut);
OutputStream compressorOut = COMPRESSOR_STREAM_FACTORY.createCompressorOutputStream(compressorName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ private File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
failedAttempts.get());
}
} else {
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl, segmentTarFile, zkMetadata.getCrypterName());
_logger.info("Downloaded tarred segment: {} from: {} to: {}, file length: {}", segmentName, downloadUrl,
segmentTarFile, segmentTarFile.length());
Expand All @@ -820,7 +820,7 @@ private File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata)
_tableNameWithType);
_logger.info("Downloading segment: {} from peers", segmentName);
File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
try {
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, _peerDownloadScheme, () -> {
List<URI> peerServerURIs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,8 @@ SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
_serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, 1L);

final long lockAcquireTimeMillis = now();
// Build a segment from in-memory rows.If buildTgz is true, then build the tar.gz file as well
// Build a segment from in-memory rows.
// If build compressed archive is true, then build the tar.compressed file as well
// TODO Use an auto-closeable object to delete temp resources.
File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" + _segmentNameStr + "-" + now());

Expand Down Expand Up @@ -1069,7 +1070,7 @@ SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));

if (forCommit) {
File segmentTarFile = new File(dataDir, _segmentNameStr + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
File segmentTarFile = new File(dataDir, _segmentNameStr + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
try {
TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ public void testReloadSegmentForceDownload()
throws Exception {
File indexDir = createSegment(SegmentVersion.v3, 5);
SegmentZKMetadata zkMetadata =
makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION), false);
makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION),
false);

// Same CRC but force to download.
BaseTableDataManager tableDataManager = createTableManager();
Expand Down Expand Up @@ -567,7 +568,7 @@ public void testDownloadAndDecrypt()
File tempDir = new File(TEMP_DIR, "testDownloadAndDecrypt");
String fileName = "tmp.txt";
FileUtils.write(new File(tempDir, fileName), "this is from somewhere remote");
String tarFileName = SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION;
String tarFileName = SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION;
File tempTarFile = new File(TEMP_DIR, tarFileName);
TarCompressionUtils.createCompressedTarFile(tempDir, tempTarFile);

Expand Down Expand Up @@ -607,7 +608,7 @@ public void testUntarAndMoveSegment()
File tempRootDir = tableDataManager.getTmpSegmentDataDir("test-untar-move");

// All input and intermediate files are put in the tempRootDir.
File tempTar = new File(tempRootDir, SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
File tempTar = new File(tempRootDir, SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
File tempInputDir = new File(tempRootDir, "input");
FileUtils.write(new File(tempInputDir, "tmp.txt"), "this is in segment dir");
TarCompressionUtils.createCompressedTarFile(tempInputDir, tempTar);
Expand Down Expand Up @@ -687,7 +688,8 @@ private static File createSegment(SegmentVersion segmentVersion, int numRows)
private static SegmentZKMetadata createRawSegment(SegmentVersion segmentVersion, int numRows)
throws Exception {
File indexDir = createSegment(segmentVersion, numRows);
return makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION), true);
return makeRawSegment(indexDir,
new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION), true);
}

private static SegmentZKMetadata makeRawSegment(File indexDir, File rawSegmentFile, boolean deleteIndexDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.ServiceStatus.Status;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.helix.HelixHelper;
Expand Down Expand Up @@ -161,6 +162,12 @@ public void init(PinotConfiguration serverConf)
_serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
CommonConstants.DEFAULT_PINOT_INSECURE_MODE)));

String tarCompressionCodecName =
_serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME);
if (null != tarCompressionCodecName) {
TarCompressionUtils.setDefaultCompressor(tarCompressionCodecName);
}

setupHelixSystemProperties();
_listenerConfigs = ListenerConfigUtil.buildServerAdminConfigs(_serverConf);
_hostname = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_HOST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ private CommonConstants() {
public static final String CONFIG_OF_EXECUTORS_FIXED_NUM_THREADS = "pinot.executors.fixed.default.numThreads";
public static final String DEFAULT_EXECUTORS_FIXED_NUM_THREADS = "-1";

public static final String CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME = "pinot.tar.compression.codec.name";

/**
* The state of the consumer for a given segment
*/
Expand Down
Loading