diff --git a/.gitignore b/.gitignore
index 3aad54886..6a087a97c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,3 +9,4 @@ build/
rpm/
rpmbuild/
*.sh
+io.*/
diff --git a/Makefile b/Makefile
index 3cf971ad6..42dbcf431 100644
--- a/Makefile
+++ b/Makefile
@@ -21,7 +21,7 @@ clean:
./gradlew clean
build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz:
- ./gradlew check releaseTarGz
+ ./gradlew check installDist releaseTarGz
.PHONY: docker_image
docker_image: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz
@@ -32,3 +32,12 @@ docker_image: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz
.PHONY: docker_push
docker_push:
docker push $(IMAGE_TAG)
+
+bench_prep:
+ sudo sh -c 'echo 1 >/proc/sys/kernel/perf_event_paranoid'
+ sudo sh -c 'echo 0 >/proc/sys/kernel/kptr_restrict'
+
+BENCH=io.aiven.kafka.tieredstorage.benchs.transform.TransformBench
+
+bench_run:
+ java -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -cp "benchmarks/build/install/benchmarks/*" $(BENCH)
\ No newline at end of file
diff --git a/benchmarks/README.md b/benchmarks/README.md
new file mode 100644
index 000000000..0b8424a0a
--- /dev/null
+++ b/benchmarks/README.md
@@ -0,0 +1,28 @@
+# Benchmarks
+
+## How to run
+
+> from https://www.baeldung.com/java-async-profiler
+
+Enable Kernel configs:
+
+```shell
+sudo sh -c 'echo 1 >/proc/sys/kernel/perf_event_paranoid'
+sudo sh -c 'echo 0 >/proc/sys/kernel/kptr_restrict'
+```
+
+set `LD_LIBRARY_PATH` environment variable with async-profile build directory:
+
+```shell
+export LD_LIBRARY_PATH=/opt/async-profiler-2.9-linux-x64/build/
+```
+
+```shell
+./gradlew benchmarks:installDist
+```
+
+Run benchmark:
+
+```shell
+java -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -cp "benchmarks/build/install/benchmarks/*" io.aiven.kafka.tieredstorage.benchs.transform.DetransformBench > results.txt 2>&1
+```
\ No newline at end of file
diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle
new file mode 100644
index 000000000..cb7b7f7c6
--- /dev/null
+++ b/benchmarks/build.gradle
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2021 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ext {
+ jmhVersion = "1.36"
+}
+
+dependencies {
+ implementation project(':core')
+ implementation group: "org.apache.kafka", name: "kafka-storage-api", version: kafkaVersion
+ implementation group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion
+
+ implementation "org.openjdk.jmh:jmh-core:$jmhVersion"
+ implementation "org.openjdk.jmh:jmh-core-benchmarks:$jmhVersion"
+ annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$jmhVersion"
+
+ implementation "org.slf4j:slf4j-log4j12:1.7.36"
+}
diff --git a/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchs/transform/AesKeyAware.java b/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchs/transform/AesKeyAware.java
new file mode 100644
index 000000000..5375d9eb3
--- /dev/null
+++ b/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchs/transform/AesKeyAware.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.benchs.transform;
+
+import javax.crypto.Cipher;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.Random;
+
+public class AesKeyAware {
+ protected static int ivSize;
+ protected static SecretKeySpec secretKey;
+ protected static byte[] aad;
+
+ static void initCrypto() {
+ // These are tests, we don't need a secure source of randomness.
+ final Random random = new Random();
+
+ final byte[] dataKey = new byte[32];
+ random.nextBytes(dataKey);
+ secretKey = new SecretKeySpec(dataKey, "AES");
+
+ aad = new byte[32];
+ random.nextBytes(aad);
+
+ ivSize = encryptionCipherSupplier().getIV().length;
+ }
+
+ protected static Cipher encryptionCipherSupplier() {
+ try {
+ final Cipher encryptCipher = getCipher();
+ encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey, SecureRandom.getInstanceStrong());
+ encryptCipher.updateAAD(aad);
+ return encryptCipher;
+ } catch (final NoSuchAlgorithmException | InvalidKeyException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static Cipher decryptionCipherSupplier(final byte[] encryptedChunk) {
+ try {
+ final Cipher encryptCipher = getCipher();
+ encryptCipher.init(Cipher.DECRYPT_MODE, secretKey,
+ new GCMParameterSpec(128, encryptedChunk, 0, ivSize),
+ SecureRandom.getInstanceStrong());
+ encryptCipher.updateAAD(aad);
+ return encryptCipher;
+ } catch (final NoSuchAlgorithmException | InvalidKeyException | InvalidAlgorithmParameterException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static Cipher getCipher() {
+ try {
+ return Cipher.getInstance("AES/GCM/NoPadding");
+ } catch (final NoSuchAlgorithmException | NoSuchPaddingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchs/transform/DetransformBench.java b/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchs/transform/DetransformBench.java
new file mode 100644
index 000000000..48aa5ef20
--- /dev/null
+++ b/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchs/transform/DetransformBench.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.benchs.transform;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.SequenceInputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.SecureRandom;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
+import io.aiven.kafka.tieredstorage.transform.BaseDetransformChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.BaseTransformChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.CompressionChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.DecompressionChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.DecryptionChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.DetransformChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.DetransformFinisher;
+import io.aiven.kafka.tieredstorage.transform.EncryptionChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.TransformChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.TransformFinisher;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.profile.AsyncProfiler;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 2)
+@Measurement(iterations = 3)
+@BenchmarkMode({Mode.AverageTime, Mode.SampleTime})
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class DetransformBench extends AesKeyAware {
+ static Path segmentPath;
+ @Param({"1073741824"})
+ public int contentLength; // 1GiB
+ @Param({"102400", "1048576", "5242880"})
+ public int chunkSize; // 100KiB, 1MiB, 5MiB
+ @Param({"false", "true"})
+ public boolean compression;
+ @Param({"false", "true"})
+ public boolean encryption;
+
+ byte[] uploadedData;
+ ChunkIndex chunkIndex;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ segmentPath = Files.createTempFile("segment", ".log");
+ // to fill with random bytes.
+ final SecureRandom secureRandom = new SecureRandom();
+ try (final var out = Files.newOutputStream(segmentPath)) {
+ final byte[] bytes = new byte[contentLength];
+ secureRandom.nextBytes(bytes);
+ out.write(bytes);
+ }
+ if (encryption) {
+ initCrypto();
+ }
+
+ // Transform.
+ TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(
+ Files.newInputStream(segmentPath), chunkSize);
+ if (compression) {
+ transformEnum = new CompressionChunkEnumeration(transformEnum);
+ }
+ if (encryption) {
+ transformEnum = new EncryptionChunkEnumeration(transformEnum, AesKeyAware::encryptionCipherSupplier);
+ }
+ final var transformFinisher = new TransformFinisher(transformEnum, contentLength);
+ try (final var sis = new SequenceInputStream(transformFinisher)) {
+ uploadedData = sis.readAllBytes();
+ chunkIndex = transformFinisher.chunkIndex();
+ }
+ }
+
+ @TearDown
+ public void teardown() throws IOException {
+ Files.deleteIfExists(segmentPath);
+ }
+
+ @Benchmark
+ public byte[] test() throws IOException {
+ // Detransform.
+ DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(
+ new ByteArrayInputStream(uploadedData), List.of(chunkIndex.chunks().iterator().next()));
+ if (encryption) {
+ detransformEnum = new DecryptionChunkEnumeration(
+ detransformEnum, ivSize, AesKeyAware::decryptionCipherSupplier);
+ }
+ if (compression) {
+ detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
+ }
+ final var detransformFinisher = new DetransformFinisher(detransformEnum);
+ try (final var sis = new SequenceInputStream(detransformFinisher)) {
+ return sis.readAllBytes();
+ }
+ }
+
+ public static void main(final String[] args) throws Exception {
+ final Options opts = new OptionsBuilder()
+ .include(DetransformBench.class.getSimpleName())
+ .addProfiler(AsyncProfiler.class, "output=flamegraph")
+ .build();
+ new Runner(opts).run();
+ }
+}
diff --git a/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchs/transform/TransformBench.java b/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchs/transform/TransformBench.java
new file mode 100644
index 000000000..be41770fc
--- /dev/null
+++ b/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchs/transform/TransformBench.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.benchs.transform;
+
+import java.io.IOException;
+import java.io.SequenceInputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.SecureRandom;
+import java.util.concurrent.TimeUnit;
+
+import io.aiven.kafka.tieredstorage.transform.BaseTransformChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.CompressionChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.EncryptionChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.TransformChunkEnumeration;
+import io.aiven.kafka.tieredstorage.transform.TransformFinisher;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.profile.AsyncProfiler;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 2)
+@Measurement(iterations = 3)
+@BenchmarkMode({Mode.AverageTime, Mode.SampleTime})
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TransformBench extends AesKeyAware {
+ static Path segmentPath;
+ @Param({"10485760", "104857600", "1073741824"})
+ public int contentLength; // 10MiB, 100MiB, 1GiB
+ @Param({"102400", "1048576", "5242880"})
+ public int chunkSize; // 100KiB, 1MiB, 5MiB
+ @Param({"false", "true"})
+ public boolean compression;
+ @Param({"false", "true"})
+ public boolean encryption;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ segmentPath = Files.createTempFile("segment", ".log");
+ // to fill with random bytes.
+ final SecureRandom secureRandom = new SecureRandom();
+ try (final var out = Files.newOutputStream(segmentPath)) {
+ final byte[] bytes = new byte[contentLength];
+ secureRandom.nextBytes(bytes);
+ out.write(bytes);
+ }
+ if (encryption) {
+ initCrypto();
+ }
+ }
+
+ @TearDown
+ public void teardown() throws IOException {
+ Files.deleteIfExists(segmentPath);
+ }
+
+ @Benchmark
+ public byte[] test() throws IOException {
+ // Transform.
+ TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(
+ Files.newInputStream(segmentPath), chunkSize);
+ if (compression) {
+ transformEnum = new CompressionChunkEnumeration(transformEnum);
+ }
+ if (encryption) {
+ transformEnum = new EncryptionChunkEnumeration(transformEnum, AesKeyAware::encryptionCipherSupplier);
+ }
+ final var transformFinisher = new TransformFinisher(transformEnum, contentLength);
+ try (final var sis = new SequenceInputStream(transformFinisher)) {
+ return sis.readAllBytes();
+ }
+ }
+
+ public static void main(final String[] args) throws Exception {
+ final Options opts = new OptionsBuilder()
+ .include(TransformBench.class.getSimpleName())
+ .addProfiler(AsyncProfiler.class, "output=flamegraph")
+ .build();
+ new Runner(opts).run();
+ }
+}
diff --git a/benchmarks/src/main/resources/log4j.properties b/benchmarks/src/main/resources/log4j.properties
new file mode 100644
index 000000000..3a64848a3
--- /dev/null
+++ b/benchmarks/src/main/resources/log4j.properties
@@ -0,0 +1,21 @@
+#
+# Copyright 2023 Aiven Oy
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=WARN, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7556bb6db..076ad29e6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -22,9 +22,13 @@
+
+
+
+
diff --git a/settings.gradle b/settings.gradle
index a901fae08..6e01783a9 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -20,3 +20,4 @@ include 'storage'
include 'storage:core'
include 'storage:filesystem'
include 'storage:s3'
+include 'benchmarks'