diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index 5f7c138738..00486dc1e6 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -148,6 +148,7 @@ public CompletableFuture> getTopic(String topicName) { log.debug("[{}] Return null for getTopic({}) since channel is closing", requestHandler.ctx.channel(), topicName); } + return CompletableFuture.completedFuture(Optional.empty()); } CompletableFuture> topicCompletableFuture = kafkaTopicLookupService.getTopic(topicName, requestHandler.ctx.channel()); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index 16d63565bc..da6d1e2f17 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -711,7 +711,6 @@ private void handleEntries(final CompletableFuture future, log.debug("Partition {} read entry completed in {} ns", topicPartition, MathUtils.nowInNano() - startDecodingEntriesNanos); } - log.info("Partition {} read entry completed. {} ", topicPartition, abortedTransactions); future.complete(ReadRecordsResult .get(decodeResult, abortedTransactions, highWatermark, lso, lastPosition, this)); }, context.getDecodeExecutor()).exceptionally(ex -> { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java index d07e7d7555..bb5aecef0d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java @@ -145,29 +145,7 @@ void maybeTakeSnapshot(Executor executor) { } lastSnapshotTime = now; - - // take the snapshot in this thread, that is the same thread - // that executes mutations - ProducerStateManagerSnapshot snapshot = getProducerStateManagerSnapshot(); - - // write to Pulsar in another thread, and also ignore errors - executor.execute(new SafeRunnable() { - @Override - public void safeRun() { - producerStateManagerSnapshotBuffer - .write(snapshot) - .whenComplete((res, error) -> { - if (error == null) { - log.info("Snapshot for {} taken at offset {} written", - topicPartition, snapshot.getOffset()); - } else { - log.info("Error writing snapshot for {} taken at offset {}", - topicPartition, snapshot.getOffset(), error); - } - }); - } - }); - + takeSnapshot(executor); } private ProducerStateManagerSnapshot getProducerStateManagerSnapshot() {