Skip to content

Commit

Permalink
tune defaults, use async executor supplier with multiplication factor (
Browse files Browse the repository at this point in the history
…#83)

* tune defaults & change interface for async executor supplier with multiplication factor

* wrap up

* nit
  • Loading branch information
Kishan Sairam Adapa authored Oct 10, 2023
1 parent 4d3f717 commit f57eb9b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static io.grpc.Deadline.after;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.FETCH_MIN_BYTES_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
Expand Down Expand Up @@ -225,7 +227,7 @@ public Map<String, Object> getBaseStreamsConfig() {
baseStreamsConfig.put(producerPrefix(LINGER_MS_CONFIG), "500");
// Increase batch.size for better throughput
// default = 16384
baseStreamsConfig.put(producerPrefix(BATCH_SIZE_CONFIG), "524288");
baseStreamsConfig.put(producerPrefix(BATCH_SIZE_CONFIG), "1048576");
// Enable compression on producer for better throughput
// default - none
baseStreamsConfig.put(producerPrefix(COMPRESSION_TYPE_CONFIG), CompressionType.ZSTD.name);
Expand All @@ -235,6 +237,12 @@ public Map<String, Object> getBaseStreamsConfig() {
// ##########################
// default - earliest (kafka streams)
baseStreamsConfig.put(consumerPrefix(AUTO_OFFSET_RESET_CONFIG), "latest");
// Increase fetch max wait time for increased throughput, reduced network calls
// default - 500ms
baseStreamsConfig.put(consumerPrefix(FETCH_MAX_WAIT_MS_CONFIG), 5000);
// Increase fetch min bytes for increased throughput, reduced network calls
// default - 1 byte
baseStreamsConfig.put(consumerPrefix(FETCH_MIN_BYTES_CONFIG), "1048576");

// ##########################
// Changelog topic configurations
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.hypertrace.core.kafkastreams.framework.async;

public class Constants {
public static int DEFAULT_ASYNC_EXECUTOR_POOL_SIZE = 16;
public static int DEFAULT_ASYNC_PROCESSOR_BATCH_SIZE = 5120;
public static int DEFAULT_ASYNC_PROCESSOR_COMMIT_INTERVAL = 5000;
public static String ASYNC_EXECUTOR_POOL_SIZE_KEY = "async.executors.maxPoolSize";
public static int DEFAULT_ASYNC_EXECUTOR_POOL_MULTIPLICATION_FACTOR = 4;
public static int DEFAULT_ASYNC_PROCESSOR_BATCH_SIZE = 10240;
public static int DEFAULT_ASYNC_PROCESSOR_COMMIT_INTERVAL = 30000;
public static String ASYNC_EXECUTOR_POOL_MULTIPLICATION_FACTOR_KEY =
"async.executors.pool.multiplication.factor";
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.hypertrace.core.kafkastreams.framework.async;

import static org.hypertrace.core.kafkastreams.framework.async.Constants.ASYNC_EXECUTOR_POOL_SIZE_KEY;
import static org.hypertrace.core.kafkastreams.framework.async.Constants.DEFAULT_ASYNC_EXECUTOR_POOL_SIZE;
import static org.hypertrace.core.kafkastreams.framework.async.Constants.ASYNC_EXECUTOR_POOL_MULTIPLICATION_FACTOR_KEY;
import static org.hypertrace.core.kafkastreams.framework.async.Constants.DEFAULT_ASYNC_EXECUTOR_POOL_MULTIPLICATION_FACTOR;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.typesafe.config.Config;
Expand All @@ -10,17 +10,20 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;
import org.apache.kafka.streams.StreamsConfig;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;

public class ExecutorFactory {
private static Executor executor;

/** Config input should be complete kafka streams config */
public static synchronized Supplier<Executor> getExecutorSupplier(Config config) {
if (executor == null) {
int poolSize =
config.hasPath(ASYNC_EXECUTOR_POOL_SIZE_KEY)
? config.getInt(ASYNC_EXECUTOR_POOL_SIZE_KEY)
: DEFAULT_ASYNC_EXECUTOR_POOL_SIZE;
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)
* (config.hasPath(ASYNC_EXECUTOR_POOL_MULTIPLICATION_FACTOR_KEY)
? config.getInt(ASYNC_EXECUTOR_POOL_MULTIPLICATION_FACTOR_KEY)
: DEFAULT_ASYNC_EXECUTOR_POOL_MULTIPLICATION_FACTOR);

if (poolSize > 0) {
ThreadFactory threadFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ service.name = sample-kafka-streams-service

kafka.streams.config = {
application.id = testapp
num.stream.threads = 1
bootstrap.servers = "localhost:9092"
default.key.serde="org.apache.kafka.common.serialization.Serdes$StringSerde"
default.value.serde="org.apache.kafka.common.serialization.Serdes$StringSerde"
Expand Down

0 comments on commit f57eb9b

Please sign in to comment.