diff --git a/kie-server-parent/kie-server-services/kie-server-services-kafka/src/main/java/org/kie/server/services/jbpm/kafka/KafkaServerConsumer.java b/kie-server-parent/kie-server-services/kie-server-services-kafka/src/main/java/org/kie/server/services/jbpm/kafka/KafkaServerConsumer.java index f587b70e6f..c720a60f99 100644 --- a/kie-server-parent/kie-server-services/kie-server-services-kafka/src/main/java/org/kie/server/services/jbpm/kafka/KafkaServerConsumer.java +++ b/kie-server-parent/kie-server-services/kie-server-services-kafka/src/main/java/org/kie/server/services/jbpm/kafka/KafkaServerConsumer.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -112,10 +113,7 @@ private void registrationUpdated(Set topics2Register, boolean shouldInit consumer = consumerSupplier.get(); consumer.subscribe(topics2Register); logger.debug("Created kafka consumer with these topics registered {}", topics2Register); - notifyService.set( - new ThreadPoolExecutor(1, Integer.getInteger(KAFKA_EXTENSION_PREFIX + "maxNotifyThreads", 10), - 60L, - TimeUnit.SECONDS, new LinkedBlockingQueue<>())); + notifyService.set(Executors.newFixedThreadPool(Integer.getInteger(KAFKA_EXTENSION_PREFIX + "maxNotifyThreads", 10))); new Thread(this).start(); } } else {