From 49d3dc7d05d1c944e09e9697aa8c75922fe04f7d Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 12 Dec 2022 22:36:58 +0800 Subject: [PATCH] [Transaction] Support producer state manager --- .../pulsar/handlers/kop/DelayedFetch.java | 2 +- .../handlers/kop/KafkaProtocolHandler.java | 52 +- .../kop/KafkaServiceConfiguration.java | 20 + .../kop/KafkaTopicConsumerManager.java | 31 +- .../handlers/kop/KafkaTopicLookupService.java | 19 +- .../coordinator/group/GroupCoordinator.java | 2 +- .../transaction/TransactionConfig.java | 3 + .../transaction/TransactionCoordinator.java | 17 +- .../handlers/kop/storage/AbortedTxn.java | 47 ++ .../handlers/kop/storage/CompletedTxn.java | 28 ++ ...oryProducerStateManagerSnapshotBuffer.java | 40 ++ .../handlers/kop/storage/PartitionLog.java | 446 +++++++++++++----- .../kop/storage/PartitionLogManager.java | 61 ++- .../kop/storage/ProducerAppendInfo.java | 13 +- .../kop/storage/ProducerStateEntry.java | 7 +- .../kop/storage/ProducerStateManager.java | 137 +++--- .../storage/ProducerStateManagerSnapshot.java | 33 ++ .../ProducerStateManagerSnapshotBuffer.java | 43 ++ ...picProducerStateManagerSnapshotBuffer.java | 335 +++++++++++++ .../handlers/kop/storage/ReplicaManager.java | 78 +-- .../handlers/kop/storage/TxnMetadata.java | 32 ++ .../handlers/kop/utils/MetadataUtils.java | 68 ++- .../kop/format/EncodePerformanceTest.java | 9 +- .../kop/format/EntryFormatterTest.java | 6 +- .../handlers/kop/utils/MetadataUtilsTest.java | 14 +- .../pulsar/handlers/kop/TransactionTest.java | 382 ++++++++++++++- .../TransactionWithOAuthBearerAuthTest.java | 17 + .../TransactionCoordinatorTest.java | 5 +- .../kop/storage/PartitionLogTest.java | 6 +- .../kop/storage/ProducerStateManagerTest.java | 6 +- 30 files changed, 1663 insertions(+), 296 deletions(-) create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AbortedTxn.java create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/CompletedTxn.java create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/MemoryProducerStateManagerSnapshotBuffer.java create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerSnapshot.java create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerSnapshotBuffer.java create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/TxnMetadata.java diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java index 11ebb1dcf6..a79f41568b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java @@ -109,7 +109,7 @@ public boolean tryComplete() { for (Map.Entry entry : readRecordsResult.entrySet()) { TopicPartition tp = entry.getKey(); PartitionLog.ReadRecordsResult result = entry.getValue(); - PartitionLog partitionLog = replicaManager.getPartitionLog(tp, context.getNamespacePrefix()); + PartitionLog partitionLog = result.partitionLog(); PositionImpl currLastPosition = (PositionImpl) partitionLog.getLastPosition(context.getTopicManager()); if (currLastPosition.compareTo(PositionImpl.EARLIEST) == 0) { HAS_ERROR_UPDATER.set(this, true); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 43e23f1335..8afa017ceb 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -30,6 +30,8 @@ import io.streamnative.pulsar.handlers.kop.schemaregistry.SchemaRegistryChannelInitializer; import io.streamnative.pulsar.handlers.kop.stats.PrometheusMetricsProvider; import io.streamnative.pulsar.handlers.kop.stats.StatsLogger; +import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer; +import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.storage.ReplicaManager; import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; @@ -45,8 +47,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedScheduler; @@ -87,6 +91,8 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag private DelayedOperationPurgatory producePurgatory; private DelayedOperationPurgatory fetchPurgatory; private LookupClient lookupClient; + + private KafkaTopicLookupService kafkaTopicLookupService; @VisibleForTesting @Getter private Map> channelInitializerMap; @@ -110,6 +116,8 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag private MigrationManager migrationManager; private ReplicaManager replicaManager; + private ScheduledFuture txnProducerStateSnapshotsTimeHandle; + private final Map groupCoordinatorsByTenant = new ConcurrentHashMap<>(); private final Map transactionCoordinatorByTenant = new ConcurrentHashMap<>(); @@ -282,6 +290,16 @@ private void invalidatePartitionLog(TopicName topicName) { schemaRegistryManager = new SchemaRegistryManager(kafkaConfig, brokerService.getPulsar(), brokerService.getAuthenticationService()); migrationManager = new MigrationManager(kafkaConfig, brokerService.getPulsar()); + + if (kafkaConfig.isKafkaTransactionCoordinatorEnabled() + && kafkaConfig.getKafkaTxnProducerStateTopicSnapshotIntervalSeconds() > 0) { + txnProducerStateSnapshotsTimeHandle = service.getPulsar().getExecutor().scheduleWithFixedDelay(() -> { + getReplicaManager().takeProducerStateSnapshots(); + }, + kafkaConfig.getKafkaTxnProducerStateTopicSnapshotIntervalSeconds(), + kafkaConfig.getKafkaTxnProducerStateTopicSnapshotIntervalSeconds(), + TimeUnit.SECONDS); + } } private TransactionCoordinator createAndBootTransactionCoordinator(String tenant) { @@ -411,6 +429,20 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi lookupClient); } + class ProducerStateManagerSnapshotProvider implements Function { + @Override + public ProducerStateManagerSnapshotBuffer apply(String tenant) { + if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { + return new MemoryProducerStateManagerSnapshotBuffer(); + } + return getTransactionCoordinator(tenant) + .getProducerStateManagerSnapshotBuffer(); + } + } + + private Function getProducerStateManagerSnapshotBufferByTenant = + new ProducerStateManagerSnapshotProvider(); + // this is called after initialize, and with kafkaConfig, brokerService all set. @Override public Map> newChannelInitializers() { @@ -426,13 +458,18 @@ public Map> newChannelIniti .timeoutTimer(SystemTimer.builder().executorName("fetch").build()) .build(); + kafkaTopicLookupService = new KafkaTopicLookupService(brokerService); + replicaManager = new ReplicaManager( kafkaConfig, requestStats, Time.SYSTEM, brokerService.getEntryFilterProvider().getBrokerEntryFilters(), producePurgatory, - fetchPurgatory); + fetchPurgatory, + kafkaTopicLookupService, + getProducerStateManagerSnapshotBufferByTenant + ); try { ImmutableMap.Builder> builder = @@ -462,6 +499,19 @@ public Map> newChannelIniti @Override public void close() { + if (txnProducerStateSnapshotsTimeHandle != null) { + txnProducerStateSnapshotsTimeHandle.cancel(false); + } + + if (offsetTopicClient != null) { + offsetTopicClient.close(); + } + if (txnTopicClient != null) { + txnTopicClient.close(); + } + if (adminManager != null) { + adminManager.shutdown(); + } if (producePurgatory != null) { producePurgatory.shutdown(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 5a431199da..4ebda90fb6 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -55,6 +55,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { private static final int OffsetsMessageTTL = 3 * 24 * 3600; // txn configuration public static final int DefaultTxnLogTopicNumPartitions = 50; + public static final int DefaultTxnProducerStateLogTopicNumPartitions = 8; public static final int DefaultTxnCoordinatorSchedulerNum = 1; public static final int DefaultTxnStateManagerSchedulerNum = 1; public static final long DefaultAbortTimedOutTransactionsIntervalMs = TimeUnit.SECONDS.toMillis(10); @@ -407,6 +408,13 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private boolean kafkaTransactionProducerIdsStoredOnPulsar = false; + @FieldContext( + category = CATEGORY_KOP, + required = true, + doc = "The namespace used for storing Kafka Producer Id" + ) + private String kafkaTransactionProducerIdsNamespace = "__kafka_producerid"; + @FieldContext( category = CATEGORY_KOP_TRANSACTION, doc = "Flag to enable transaction coordinator" @@ -419,6 +427,18 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private int kafkaTxnLogTopicNumPartitions = DefaultTxnLogTopicNumPartitions; + @FieldContext( + category = CATEGORY_KOP_TRANSACTION, + doc = "Number of partitions for the transaction producer state topic" + ) + private int kafkaTxnProducerStateTopicNumPartitions = DefaultTxnProducerStateLogTopicNumPartitions; + + @FieldContext( + category = CATEGORY_KOP_TRANSACTION, + doc = "Interval for taking snapshots of the status of pending transactions" + ) + private int kafkaTxnProducerStateTopicSnapshotIntervalSeconds = 60; + @FieldContext( category = CATEGORY_KOP_TRANSACTION, doc = "The interval in milliseconds at which to rollback transactions that have timed out." diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java index a09f1e24c5..9b81124b21 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java @@ -48,7 +48,6 @@ public class KafkaTopicConsumerManager implements Closeable { private final PersistentTopic topic; - private final KafkaRequestHandler requestHandler; private final AtomicBoolean closed = new AtomicBoolean(false); @@ -67,13 +66,21 @@ public class KafkaTopicConsumerManager implements Closeable { private final boolean skipMessagesWithoutIndex; + private final String description; + KafkaTopicConsumerManager(KafkaRequestHandler requestHandler, PersistentTopic topic) { + this(requestHandler.ctx.channel() + "", + requestHandler.isSkipMessagesWithoutIndex(), + topic); + } + + public KafkaTopicConsumerManager(String description, boolean skipMessagesWithoutIndex, PersistentTopic topic) { this.topic = topic; this.cursors = new ConcurrentHashMap<>(); this.createdCursors = new ConcurrentHashMap<>(); this.lastAccessTimes = new ConcurrentHashMap<>(); - this.requestHandler = requestHandler; - this.skipMessagesWithoutIndex = requestHandler.isSkipMessagesWithoutIndex(); + this.description = description; + this.skipMessagesWithoutIndex = skipMessagesWithoutIndex; } // delete expired cursors, so backlog can be cleared. @@ -96,7 +103,7 @@ void deleteOneExpiredCursor(long offset) { if (cursorFuture != null) { if (log.isDebugEnabled()) { log.debug("[{}] Cursor timed out for offset: {}, cursors cache size: {}", - requestHandler.ctx.channel(), offset, cursors.size()); + description, offset, cursors.size()); } // TODO: Should we just cancel this future? @@ -118,14 +125,14 @@ public void deleteOneCursorAsync(ManagedCursor cursor, String reason) { public void deleteCursorComplete(Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}] Cursor {} for topic {} deleted successfully for reason: {}.", - requestHandler.ctx.channel(), cursor.getName(), topic.getName(), reason); + description, cursor.getName(), topic.getName(), reason); } } @Override public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Error deleting cursor {} for topic {} for reason: {}.", - requestHandler.ctx.channel(), cursor.getName(), topic.getName(), reason, exception); + description, cursor.getName(), topic.getName(), reason, exception); } }, null); createdCursors.remove(cursor.getName()); @@ -148,7 +155,7 @@ public CompletableFuture> removeCursorFuture(long offs if (log.isDebugEnabled()) { log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}", - requestHandler.ctx.channel(), offset, cursors.size()); + description, offset, cursors.size()); } return cursorFuture; } @@ -182,7 +189,7 @@ public void add(long offset, Pair pair) { if (log.isDebugEnabled()) { log.debug("[{}] Add cursor back {} for offset: {}", - requestHandler.ctx.channel(), pair.getLeft().getName(), offset); + description, pair.getLeft().getName(), offset); } } @@ -194,7 +201,7 @@ public void close() { } if (log.isDebugEnabled()) { log.debug("[{}] Close TCM for topic {}.", - requestHandler.ctx.channel(), topic.getName()); + description, topic.getName()); } final List>> cursorFuturesToClose = new ArrayList<>(); cursors.forEach((ignored, cursorFuture) -> cursorFuturesToClose.add(cursorFuture)); @@ -231,7 +238,7 @@ private CompletableFuture> asyncGetCursorByOffset(long if (((ManagedLedgerImpl) ledger).getState() == ManagedLedgerImpl.State.Closed) { log.error("[{}] Async get cursor for offset {} for topic {} failed, " + "because current managedLedger has been closed", - requestHandler.ctx.channel(), offset, topic.getName()); + description, offset, topic.getName()); CompletableFuture> future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Current managedLedger for " + topic.getName() + " has been closed.")); @@ -250,7 +257,7 @@ private CompletableFuture> asyncGetCursorByOffset(long final PositionImpl previous = ((ManagedLedgerImpl) ledger).getPreviousPosition((PositionImpl) position); if (log.isDebugEnabled()) { log.debug("[{}] Create cursor {} for offset: {}. position: {}, previousPosition: {}", - requestHandler.ctx.channel(), cursorName, offset, position, previous); + description, cursorName, offset, position, previous); } try { final ManagedCursor newCursor = ledger.newNonDurableCursor(previous, cursorName); @@ -259,7 +266,7 @@ private CompletableFuture> asyncGetCursorByOffset(long return Pair.of(newCursor, offset); } catch (ManagedLedgerException e) { log.error("[{}] Error new cursor for topic {} at offset {} - {}. will cause fetch data error.", - requestHandler.ctx.channel(), topic.getName(), offset, previous, e); + description, topic.getName(), offset, previous, e); return null; } }); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java index 8cb90fe401..22831608b7 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java @@ -13,7 +13,6 @@ */ package io.streamnative.pulsar.handlers.kop; -import io.netty.channel.Channel; import java.util.Optional; import java.util.concurrent.CompletableFuture; import lombok.NonNull; @@ -37,7 +36,7 @@ public KafkaTopicLookupService(BrokerService brokerService) { } // A wrapper of `BrokerService#getTopic` that is to find the topic's associated `PersistentTopic` instance - public CompletableFuture> getTopic(String topicName, Channel channel) { + public CompletableFuture> getTopic(String topicName, Object requestor) { CompletableFuture> topicCompletableFuture = new CompletableFuture<>(); brokerService.getTopicIfExists(topicName).whenComplete((t2, throwable) -> { TopicName topicNameObject = TopicName.get(topicName); @@ -47,7 +46,7 @@ public CompletableFuture> getTopic(String topicName, C if (topicNameObject.getPartitionIndex() == 0) { log.warn("Get partition-0 error [{}].", throwable.getMessage()); } else { - handleGetTopicException(topicName, topicCompletableFuture, throwable, channel); + handleGetTopicException(topicName, topicCompletableFuture, throwable, requestor); return; } } @@ -60,11 +59,11 @@ public CompletableFuture> getTopic(String topicName, C String nonPartitionedTopicName = topicNameObject.getPartitionedTopicName(); if (log.isDebugEnabled()) { log.debug("[{}]Try to get non-partitioned topic for name {}", - channel, nonPartitionedTopicName); + requestor, nonPartitionedTopicName); } brokerService.getTopicIfExists(nonPartitionedTopicName).whenComplete((nonPartitionedTopic, ex) -> { if (ex != null) { - handleGetTopicException(nonPartitionedTopicName, topicCompletableFuture, ex, channel); + handleGetTopicException(nonPartitionedTopicName, topicCompletableFuture, ex, requestor); // Failed to getTopic from current broker, remove non-partitioned topic cache, // which added in getTopicBroker. KopBrokerLookupManager.removeTopicManagerCache(nonPartitionedTopicName); @@ -75,14 +74,14 @@ public CompletableFuture> getTopic(String topicName, C topicCompletableFuture.complete(Optional.of(persistentTopic)); } else { log.error("[{}]Get empty non-partitioned topic for name {}", - channel, nonPartitionedTopicName); + requestor, nonPartitionedTopicName); KopBrokerLookupManager.removeTopicManagerCache(nonPartitionedTopicName); topicCompletableFuture.complete(Optional.empty()); } }); return; } - log.error("[{}]Get empty topic for name {}", channel, topicName); + log.error("[{}]Get empty topic for name {}", requestor, topicName); KopBrokerLookupManager.removeTopicManagerCache(topicName); topicCompletableFuture.complete(Optional.empty()); }); @@ -93,15 +92,15 @@ private void handleGetTopicException(@NonNull final String topicName, @NonNull final CompletableFuture> topicCompletableFuture, @NonNull final Throwable ex, - @NonNull final Channel channel) { + @NonNull final Object requestor) { // The ServiceUnitNotReadyException is retryable, so we should print a warning log instead of error log if (ex instanceof BrokerServiceException.ServiceUnitNotReadyException) { log.warn("[{}] Failed to getTopic {}: {}", - channel, topicName, ex.getMessage()); + requestor, topicName, ex.getMessage()); topicCompletableFuture.complete(Optional.empty()); } else { log.error("[{}] Failed to getTopic {}. exception:", - channel, topicName, ex); + requestor, topicName, ex); topicCompletableFuture.completeExceptionally(ex); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java index b5c7edba03..f47c380059 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java @@ -82,7 +82,7 @@ public static GroupCoordinator of( Time time ) { ScheduledExecutorService coordinatorExecutor = OrderedScheduler.newSchedulerBuilder() - .name("group-coordinator-executor") + .name("group-coordinator-executor-" + tenant) .numThreads(1) .build(); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionConfig.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionConfig.java index 90e745539a..29927490f2 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionConfig.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionConfig.java @@ -26,6 +26,7 @@ public class TransactionConfig { public static final String DefaultTransactionMetadataTopicName = "public/default/__transaction_state"; + public static final String DefaultProducerStateSnapshotTopicName = "public/default/__transaction_state"; public static final String DefaultProducerIdTopicName = "public/default/__transaction_producerid_generator"; public static final long DefaultTransactionsMaxTimeoutMs = TimeUnit.MINUTES.toMillis(15); public static final long DefaultTransactionalIdExpirationMs = TimeUnit.DAYS.toMillis(7); @@ -42,6 +43,8 @@ public class TransactionConfig { @Default private String transactionMetadataTopicName = DefaultTransactionMetadataTopicName; @Default + private String transactionProducerStateSnapshotTopicName = DefaultProducerStateSnapshotTopicName; + @Default private long transactionMaxTimeoutMs = DefaultTransactionsMaxTimeoutMs; @Default private long transactionalIdExpirationMs = DefaultTransactionalIdExpirationMs; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java index 9bb5546687..c223dd41e5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -27,6 +27,8 @@ import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMetadata.TxnTransitMetadata; import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager.CoordinatorEpochAndTxnMetadata; import io.streamnative.pulsar.handlers.kop.scala.Either; +import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManagerSnapshotBuffer; +import io.streamnative.pulsar.handlers.kop.storage.PulsarTopicProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils; import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch; import java.util.HashSet; @@ -38,6 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -69,6 +72,9 @@ public class TransactionCoordinator { private final TransactionStateManager txnManager; private final TransactionMarkerChannelManager transactionMarkerChannelManager; + @Getter + private ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer; + private final ScheduledExecutorService scheduler; private final Time time; @@ -106,7 +112,9 @@ protected TransactionCoordinator(TransactionConfig transactionConfig, TransactionStateManager txnManager, Time time, String namespacePrefixForMetadata, - String namespacePrefixForUserTopics) { + String namespacePrefixForUserTopics, + Function + producerStateManagerSnapshotBufferFactory) { this.namespacePrefixForMetadata = namespacePrefixForMetadata; this.namespacePrefixForUserTopics = namespacePrefixForUserTopics; this.transactionConfig = transactionConfig; @@ -115,6 +123,7 @@ protected TransactionCoordinator(TransactionConfig transactionConfig, this.transactionMarkerChannelManager = transactionMarkerChannelManager; this.scheduler = scheduler; this.time = time; + this.producerStateManagerSnapshotBuffer = producerStateManagerSnapshotBufferFactory.apply(transactionConfig); } public static TransactionCoordinator of(String tenant, @@ -146,7 +155,10 @@ public static TransactionCoordinator of(String tenant, transactionStateManager, time, namespacePrefixForMetadata, - namespacePrefixForUserTopics); + namespacePrefixForUserTopics, + (config) -> new PulsarTopicProducerStateManagerSnapshotBuffer( + config.getTransactionProducerStateSnapshotTopicName(), txnTopicClient) + ); } /** @@ -899,6 +911,7 @@ public void shutdown() { producerIdManager.shutdown(); txnManager.shutdown(); transactionMarkerChannelManager.close(); + producerStateManagerSnapshotBuffer.shutdown(); scheduler.shutdown(); // TODO shutdown txn log.info("Shutdown transaction coordinator complete."); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AbortedTxn.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AbortedTxn.java new file mode 100644 index 0000000000..fe1e58995d --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AbortedTxn.java @@ -0,0 +1,47 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.storage; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.experimental.Accessors; + +/** + * AbortedTxn is used cache the aborted index. + */ +@Data +@Accessors(fluent = true) +@AllArgsConstructor +public final class AbortedTxn { + + private static final int VersionOffset = 0; + private static final int VersionSize = 2; + private static final int ProducerIdOffset = VersionOffset + VersionSize; + private static final int ProducerIdSize = 8; + private static final int FirstOffsetOffset = ProducerIdOffset + ProducerIdSize; + private static final int FirstOffsetSize = 8; + private static final int LastOffsetOffset = FirstOffsetOffset + FirstOffsetSize; + private static final int LastOffsetSize = 8; + private static final int LastStableOffsetOffset = LastOffsetOffset + LastOffsetSize; + private static final int LastStableOffsetSize = 8; + private static final int TotalSize = LastStableOffsetOffset + LastStableOffsetSize; + + private static final Short CurrentVersion = 0; + + private final long producerId; + private final long firstOffset; + private final long lastOffset; + private final long lastStableOffset; + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/CompletedTxn.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/CompletedTxn.java new file mode 100644 index 0000000000..6a616da3f4 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/CompletedTxn.java @@ -0,0 +1,28 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.storage; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.experimental.Accessors; + +@Data +@Accessors(fluent = true) +@AllArgsConstructor +public final class CompletedTxn { + private long producerId; + private long firstOffset; + private long lastOffset; + private boolean isAborted; +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/MemoryProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/MemoryProducerStateManagerSnapshotBuffer.java new file mode 100644 index 0000000000..7a893b5fa8 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/MemoryProducerStateManagerSnapshotBuffer.java @@ -0,0 +1,40 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.storage; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +public class MemoryProducerStateManagerSnapshotBuffer implements ProducerStateManagerSnapshotBuffer { + private Map latestSnapshots = new ConcurrentHashMap<>(); + + @Override + public CompletableFuture write(ProducerStateManagerSnapshot snapshot) { + return CompletableFuture.runAsync(() -> { + latestSnapshots.compute(snapshot.getTopicPartition(), (tp, current) -> { + if (current == null || current.getOffset() <= snapshot.getOffset()) { + return snapshot; + } else { + return current; + } + }); + }); + } + + @Override + public CompletableFuture readLatestSnapshot(String topicPartition) { + return CompletableFuture.supplyAsync(() -> latestSnapshots.get(topicPartition)); + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index e36bfbf6e5..ab3dfe37b5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -20,6 +20,7 @@ import io.netty.util.Recycler; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; import io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; import io.streamnative.pulsar.handlers.kop.KafkaTopicManager; import io.streamnative.pulsar.handlers.kop.MessageFetchContext; import io.streamnative.pulsar.handlers.kop.MessagePublishContext; @@ -48,6 +49,7 @@ import java.util.function.Consumer; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.Getter; import lombok.ToString; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -67,7 +69,11 @@ import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; @@ -80,6 +86,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; /** * Analyze result. @@ -109,10 +116,13 @@ public class PartitionLog { private final TopicPartition topicPartition; private final String fullPartitionName; private final AtomicReference> entryFormatter = new AtomicReference<>(); + @Getter private final ProducerStateManager producerStateManager; private final List entryFilters; private final boolean preciseTopicPublishRateLimitingEnable; + private final KafkaTopicLookupService kafkaTopicLookupService; + public PartitionLog(KafkaServiceConfiguration kafkaConfig, RequestStats requestStats, @@ -120,15 +130,28 @@ public PartitionLog(KafkaServiceConfiguration kafkaConfig, TopicPartition topicPartition, String fullPartitionName, List entryFilters, - ProducerStateManager producerStateManager) { + KafkaTopicLookupService kafkaTopicLookupService, + ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer) { this.kafkaConfig = kafkaConfig; this.entryFilters = entryFilters; this.requestStats = requestStats; this.time = time; this.topicPartition = topicPartition; this.fullPartitionName = fullPartitionName; - this.producerStateManager = producerStateManager; this.preciseTopicPublishRateLimitingEnable = kafkaConfig.isPreciseTopicPublishRateLimiterEnable(); + this.kafkaTopicLookupService = kafkaTopicLookupService; + this.producerStateManager = new ProducerStateManager(fullPartitionName, producerStateManagerSnapshotBuffer); + } + + public CompletableFuture recoverTransactions() { + if (kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { + return producerStateManager + .recover(this) + .thenApply(___ -> this); + } else { + return CompletableFuture + .completedFuture(this); + } } private CompletableFuture getEntryFormatter( @@ -159,12 +182,12 @@ private CompletableFuture getEntryFormatter( }); result.exceptionally(ex -> { - // this error will happen in a separate thread, and during the execution of - // accumulateAndGet - // the only thing we can do is to clear the cache - log.error("Cannot create the EntryFormatter for {}", fullPartitionName, ex); - entryFormatter.set(null); - return null; + // this error will happen in a separate thread, and during the execution of + // accumulateAndGet + // the only thing we can do is to clear the cache + log.error("Cannot create the EntryFormatter for {}", fullPartitionName, ex); + entryFormatter.set(null); + return null; }); return result; }); @@ -224,6 +247,8 @@ protected ReadRecordsResult newObject(Handle handle) { private Position lastPosition; private Errors errors; + private PartitionLog partitionLog; + private ReadRecordsResult(Recycler.Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; } @@ -236,14 +261,16 @@ public static ReadRecordsResult get(DecodeResult decodeResult, List abortedTransactions, long highWatermark, long lastStableOffset, - Position lastPosition) { + Position lastPosition, + PartitionLog partitionLog) { return ReadRecordsResult.get( decodeResult, abortedTransactions, highWatermark, lastStableOffset, lastPosition, - null); + null, + partitionLog); } public static ReadRecordsResult get(DecodeResult decodeResult, @@ -251,7 +278,8 @@ public static ReadRecordsResult get(DecodeResult decodeResult, long highWatermark, long lastStableOffset, Position lastPosition, - Errors errors) { + Errors errors, + PartitionLog partitionLog) { ReadRecordsResult readRecordsResult = RECYCLER.get(); readRecordsResult.decodeResult = decodeResult; readRecordsResult.abortedTransactions = abortedTransactions; @@ -259,31 +287,35 @@ public static ReadRecordsResult get(DecodeResult decodeResult, readRecordsResult.lastStableOffset = lastStableOffset; readRecordsResult.lastPosition = lastPosition; readRecordsResult.errors = errors; + readRecordsResult.partitionLog = partitionLog; return readRecordsResult; } public static ReadRecordsResult empty(long highWatermark, - long lastStableOffset, - Position lastPosition) { + long lastStableOffset, + Position lastPosition, + PartitionLog partitionLog) { return ReadRecordsResult.get( DecodeResult.get(MemoryRecords.EMPTY), Collections.emptyList(), highWatermark, lastStableOffset, - lastPosition); + lastPosition, + partitionLog); } - public static ReadRecordsResult error(Errors errors) { - return ReadRecordsResult.error(PositionImpl.EARLIEST, errors); + public static ReadRecordsResult error(Errors errors, PartitionLog partitionLog) { + return ReadRecordsResult.error(PositionImpl.EARLIEST, errors, partitionLog); } - public static ReadRecordsResult error(Position position, Errors errors) { + public static ReadRecordsResult error(Position position, Errors errors, PartitionLog partitionLog) { return ReadRecordsResult.get(null, null, -1, -1, position, - errors); + errors, + partitionLog); } public FetchResponse.PartitionData toPartitionData() { @@ -320,6 +352,7 @@ public void recycle() { this.lastStableOffset = -1; this.highWatermark = -1; this.abortedTransactions = null; + this.partitionLog = null; if (this.decodeResult != null) { this.decodeResult.recycle(); this.decodeResult = null; @@ -500,11 +533,11 @@ public CompletableFuture readRecords(final FetchRequest.Parti log.debug("Fetch for {}: no tcm for topic {} return NOT_LEADER_FOR_PARTITION.", topicPartition, fullPartitionName); } - future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER)); + future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER, this)); return; } if (checkOffsetOutOfRange(tcm, offset, topicPartition, startPrepareMetadataNanos)) { - future.complete(ReadRecordsResult.error(Errors.OFFSET_OUT_OF_RANGE)); + future.complete(ReadRecordsResult.error(Errors.OFFSET_OUT_OF_RANGE, this)); return; } @@ -519,16 +552,16 @@ public CompletableFuture readRecords(final FetchRequest.Parti log.warn("KafkaTopicConsumerManager is closed, remove TCM of {}", fullPartitionName); registerPrepareMetadataFailedEvent(startPrepareMetadataNanos); context.getSharedState().getKafkaTopicConsumerManagerCache().removeAndCloseByTopic(fullPartitionName); - future.complete(ReadRecordsResult.error(Errors.NONE)); + future.complete(ReadRecordsResult.error(Errors.NONE, this)); return; } cursorFuture.thenAccept((cursorLongPair) -> { if (cursorLongPair == null) { log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. " - + "Fetch for topic return error.", offset, topicPartition); + + "Fetch for topic return error.", offset, topicPartition); registerPrepareMetadataFailedEvent(startPrepareMetadataNanos); - future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER)); + future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER, this)); return; } final ManagedCursor cursor = cursorLongPair.getLeft(); @@ -545,37 +578,39 @@ public CompletableFuture readRecords(final FetchRequest.Parti ReadRecordsResult.empty( highWaterMark, firstUndecidedOffset, - tcm.getManagedLedger().getLastConfirmedEntry() + tcm.getManagedLedger().getLastConfirmedEntry(), this ) ); return; } } - readEntries(cursor, topicPartition, cursorOffset, maxReadEntriesNum, adjustedMaxBytes, topicManager) - .whenComplete((entries, throwable) -> { - if (throwable != null) { - tcm.deleteOneCursorAsync(cursorLongPair.getLeft(), - "cursor.readEntry fail. deleteCursor"); - if (throwable instanceof ManagedLedgerException.CursorAlreadyClosedException - || throwable instanceof ManagedLedgerException.ManagedLedgerFencedException) { - future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER)); - return; - } - log.error("Read entry error on {}", partitionData, throwable); - future.complete(ReadRecordsResult.error(Errors.UNKNOWN_SERVER_ERROR)); - return; - } - long readSize = entries.stream().mapToLong(Entry::getLength).sum(); - limitBytes.addAndGet(-1 * readSize); - // Add new offset back to TCM after entries are read successfully - tcm.add(cursorOffset.get(), Pair.of(cursor, cursorOffset.get())); - handleEntries(future, entries, partitionData, tcm, cursor, readCommitted, context); - }); + readEntries(cursor, topicPartition, cursorOffset, maxReadEntriesNum, adjustedMaxBytes, + fullPartitionName -> { + topicManager.invalidateCacheForFencedManagerLedgerOnTopic(fullPartitionName); + }).whenComplete((entries, throwable) -> { + if (throwable != null) { + tcm.deleteOneCursorAsync(cursorLongPair.getLeft(), + "cursor.readEntry fail. deleteCursor"); + if (throwable instanceof ManagedLedgerException.CursorAlreadyClosedException + || throwable instanceof ManagedLedgerException.ManagedLedgerFencedException) { + future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER, this)); + return; + } + log.error("Read entry error on {}", partitionData, throwable); + future.complete(ReadRecordsResult.error(Errors.UNKNOWN_SERVER_ERROR, this)); + return; + } + long readSize = entries.stream().mapToLong(Entry::getLength).sum(); + limitBytes.addAndGet(-1 * readSize); + // Add new offset back to TCM after entries are read successfully + tcm.add(cursorOffset.get(), Pair.of(cursor, cursorOffset.get())); + handleEntries(future, entries, partitionData, tcm, cursor, readCommitted, context); + }); }).exceptionally(ex -> { registerPrepareMetadataFailedEvent(startPrepareMetadataNanos); context.getSharedState() .getKafkaTopicConsumerManagerCache().removeAndCloseByTopic(fullPartitionName); - future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER)); + future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER, this)); return null; }); }); @@ -601,7 +636,9 @@ private boolean checkOffsetOutOfRange(KafkaTopicConsumerManager tcm, log.error("Received request for offset {} for partition {}, " + "but we only have entries less than {}.", offset, topicPartition, logEndOffset); - registerPrepareMetadataFailedEvent(startPrepareMetadataNanos); + if (startPrepareMetadataNanos > 0) { + registerPrepareMetadataFailedEvent(startPrepareMetadataNanos); + } return true; } return false; @@ -629,7 +666,8 @@ private void handleEntries(final CompletableFuture future, entries.size(), committedEntries.size()); } if (committedEntries.isEmpty()) { - future.complete(ReadRecordsResult.error(tcm.getManagedLedger().getLastConfirmedEntry(), Errors.NONE)); + future.complete(ReadRecordsResult.error(tcm.getManagedLedger().getLastConfirmedEntry(), Errors.NONE, + this)); return; } @@ -644,40 +682,41 @@ private void handleEntries(final CompletableFuture future, getEntryFormatter(context.getTopicManager().getTopic(fullPartitionName)); entryFormatterHandle.whenComplete((entryFormatter, ee) -> { - if (ee != null) { - future.complete(ReadRecordsResult.error(Errors.KAFKA_STORAGE_ERROR)); - return; - } - groupNameFuture.whenCompleteAsync((groupName, ex) -> { - if (ex != null) { - log.error("Get groupId failed.", ex); - groupName = ""; + if (ee != null) { + future.complete(ReadRecordsResult.error(Errors.KAFKA_STORAGE_ERROR, this)); + return; } - final long startDecodingEntriesNanos = MathUtils.nowInNano(); + groupNameFuture.whenCompleteAsync((groupName, ex) -> { + if (ex != null) { + log.error("Get groupId failed.", ex); + groupName = ""; + } + final long startDecodingEntriesNanos = MathUtils.nowInNano(); - // Get the last entry position for delayed fetch. - Position lastPosition = this.getLastPositionFromEntries(committedEntries); - final DecodeResult decodeResult = entryFormatter.decode(committedEntries, magic); - requestStats.getFetchDecodeStats().registerSuccessfulEvent( - MathUtils.elapsedNanos(startDecodingEntriesNanos), TimeUnit.NANOSECONDS); - - // collect consumer metrics - decodeResult.updateConsumerStats(topicPartition, committedEntries.size(), groupName, requestStats); - List abortedTransactions = null; - if (readCommitted) { - abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset); - } - if (log.isDebugEnabled()) { - log.debug("Partition {} read entry completed in {} ns", - topicPartition, MathUtils.nowInNano() - startDecodingEntriesNanos); - } + // Get the last entry position for delayed fetch. + Position lastPosition = this.getLastPositionFromEntries(committedEntries); + final DecodeResult decodeResult = entryFormatter.decode(committedEntries, magic); + requestStats.getFetchDecodeStats().registerSuccessfulEvent( + MathUtils.elapsedNanos(startDecodingEntriesNanos), TimeUnit.NANOSECONDS); - future.complete(ReadRecordsResult.get(decodeResult, abortedTransactions, highWatermark, lso, lastPosition)); - }, context.getDecodeExecutor()).exceptionally(ex -> { - log.error("Partition {} read entry exceptionally. ", topicPartition, ex); - future.complete(ReadRecordsResult.error(Errors.KAFKA_STORAGE_ERROR)); - return null; - }); + // collect consumer metrics + decodeResult.updateConsumerStats(topicPartition, committedEntries.size(), groupName, requestStats); + List abortedTransactions = null; + if (readCommitted) { + abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset); + } + if (log.isDebugEnabled()) { + log.debug("Partition {} read entry completed in {} ns", + topicPartition, MathUtils.nowInNano() - startDecodingEntriesNanos); + } + + future.complete(ReadRecordsResult + .get(decodeResult, abortedTransactions, highWatermark, lso, lastPosition, this)); + }, context.getDecodeExecutor()).exceptionally(ex -> { + log.error("Partition {} read entry exceptionally. ", topicPartition, ex); + future.complete(ReadRecordsResult.error(Errors.KAFKA_STORAGE_ERROR, this)); + return null; + }); }); @@ -735,7 +774,7 @@ private CompletableFuture> readEntries(final ManagedCursor cursor, final AtomicLong cursorOffset, final int maxReadEntriesNum, final long adjustedMaxBytes, - final KafkaTopicManager topicManager) { + final Consumer invalidateCacheOnTopic) { final OpStatsLogger messageReadStats = requestStats.getMessageReadStats(); // read readeEntryNum size entry. final long startReadingMessagesNanos = MathUtils.nowInNano(); @@ -792,7 +831,7 @@ public void readEntriesComplete(List entries, Object ctx) { public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { log.error("Error read entry for topic: {}", fullPartitionName); if (exception instanceof ManagedLedgerException.ManagedLedgerFencedException) { - topicManager.invalidateCacheForFencedManagerLedgerOnTopic(fullPartitionName); + invalidateCacheOnTopic.accept(fullPartitionName); } messageReadStats.registerFailedEvent( MathUtils.elapsedNanos(startReadingMessagesNanos), TimeUnit.NANOSECONDS); @@ -857,43 +896,30 @@ private void publishMessages(final Optional persistentTopicOpt, publishMessage(persistentTopic, byteBuf, appendInfo) .whenComplete((offset, e) -> { - appendRecordsContext.getCompleteSendOperationForThrottling().accept(byteBuf.readableBytes()); - - if (e == null) { - requestStats.getMessagePublishStats().registerSuccessfulEvent( - time.nanoseconds() - beforePublish, TimeUnit.NANOSECONDS); - final long lastOffset = offset + numMessages - 1; - - AnalyzeResult analyzeResult = analyzeAndValidateProducerState( - encodeResult.getRecords(), Optional.of(offset), AppendOrigin.Client); - analyzeResult.updatedProducers().forEach((pid, producerAppendInfo) -> { - if (log.isDebugEnabled()) { - log.debug("Append pid: [{}], appendInfo: [{}], lastOffset: [{}]", - pid, producerAppendInfo, lastOffset); + appendRecordsContext.getCompleteSendOperationForThrottling().accept(byteBuf.readableBytes()); + + if (e == null) { + requestStats.getMessagePublishStats().registerSuccessfulEvent( + time.nanoseconds() - beforePublish, TimeUnit.NANOSECONDS); + final long lastOffset = offset + numMessages - 1; + + AnalyzeResult analyzeResult = analyzeAndValidateProducerState( + encodeResult.getRecords(), Optional.of(offset), AppendOrigin.Client); + updateProducerStateManager(lastOffset, analyzeResult); + + appendFuture.complete(offset); + } else { + log.error("publishMessages for topic partition: {} failed when write.", fullPartitionName, e); + requestStats.getMessagePublishStats().registerFailedEvent( + time.nanoseconds() - beforePublish, TimeUnit.NANOSECONDS); + appendFuture.completeExceptionally(e); } - producerStateManager.update(producerAppendInfo); - }); - analyzeResult.completedTxns().forEach(completedTxn -> { - // update to real last offset - completedTxn.lastOffset(lastOffset - 1); - long lastStableOffset = producerStateManager.lastStableOffset(completedTxn); - producerStateManager.updateTxnIndex(completedTxn, lastStableOffset); - producerStateManager.completeTxn(completedTxn); + encodeResult.recycle(); }); - - appendFuture.complete(offset); - } else { - log.error("publishMessages for topic partition: {} failed when write.", fullPartitionName, e); - requestStats.getMessagePublishStats().registerFailedEvent( - time.nanoseconds() - beforePublish, TimeUnit.NANOSECONDS); - appendFuture.completeExceptionally(e); - } - encodeResult.recycle(); - }); } private void checkAndRecordPublishQuota(Topic topic, int msgSize, int numMessages, - AppendRecordsContext appendRecordsContext) { + AppendRecordsContext appendRecordsContext) { final boolean isPublishRateExceeded; if (preciseTopicPublishRateLimitingEnable) { boolean isPreciseTopicPublishRateExceeded = @@ -1043,4 +1069,200 @@ private MemoryRecords trimInvalidBytes(MemoryRecords records, LogAppendInfo info return MemoryRecords.readableRecords(validByteBuffer); } } + + public CompletableFuture recoverTxEntries( + long offset, + ProducerStateManager producerStateManager) { + if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { + // no need to scan the topic, because transactions are disabled + return CompletableFuture.completedFuture(Long.valueOf(0)); + } + return kafkaTopicLookupService + .getTopic(fullPartitionName, this).thenCompose(topic -> { + if (!topic.isPresent()) { + log.info("Topic {} not owned by this broker, cannot recover now", fullPartitionName); + return FutureUtil.failedFuture(new NotLeaderOrFollowerException()); + } + final CompletableFuture future = new CompletableFuture<>(); + + // The future that is returned by getTopicConsumerManager is always completed normally + KafkaTopicConsumerManager tcm = new KafkaTopicConsumerManager("recover-tx", + true, topic.get()); + future.whenComplete((___, error) -> { + // release resources in any case + try { + tcm.close(); + } catch (Exception err) { + log.error("Cannot safely close the temporary KafkaTopicConsumerManager for {}", + fullPartitionName, err); + } + }); + + if (checkOffsetOutOfRange(tcm, offset, topicPartition, -1)) { + future.completeExceptionally(new OffsetOutOfRangeException("")); + return future; + } + + if (log.isDebugEnabled()) { + log.debug("recoverTxEntries for {}: remove tcm to get cursor for fetch offset: {} .", + topicPartition, offset); + } + + final CompletableFuture> cursorFuture = tcm.removeCursorFuture(offset); + + if (cursorFuture == null) { + // tcm is closed, just return a NONE error because the channel may be still active + log.warn("KafkaTopicConsumerManager is closed, remove TCM of {}", fullPartitionName); + future.completeExceptionally(new NotLeaderOrFollowerException()); + return future; + } + cursorFuture.thenAccept((cursorLongPair) -> { + + if (cursorLongPair == null) { + log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. " + + "Fetch for topic return error.", offset, topicPartition); + future.completeExceptionally(new NotLeaderOrFollowerException()); + return; + } + final ManagedCursor cursor = cursorLongPair.getLeft(); + final AtomicLong cursorOffset = new AtomicLong(cursorLongPair.getRight()); + + AtomicLong entryCounter = new AtomicLong(); + readNextEntriesForRecovery(cursor, cursorOffset, tcm, topic.get(), entryCounter, future); + + }).exceptionally(ex -> { + future.completeExceptionally(new NotLeaderOrFollowerException()); + return null; + }); + return future; + }); + } + + + private void readNextEntriesForRecovery(ManagedCursor cursor, AtomicLong cursorOffset, + KafkaTopicConsumerManager tcm, + PersistentTopic topic, + AtomicLong entryCounter, + CompletableFuture future) { + log.info("readNextEntriesForRecovery {} cursorOffset {}", fullPartitionName, cursorOffset); + int maxReadEntriesNum = 2; + long adjustedMaxBytes = Long.MAX_VALUE; + readEntries(cursor, topicPartition, cursorOffset, maxReadEntriesNum, adjustedMaxBytes, + (partitionName) -> {}) + .whenComplete((entries, throwable) -> { + if (throwable != null) { + tcm.deleteOneCursorAsync(cursor, + "cursor.readEntry fail. deleteCursor"); + if (throwable instanceof ManagedLedgerException.CursorAlreadyClosedException + || throwable instanceof ManagedLedgerException.ManagedLedgerFencedException) { + future.completeExceptionally(new NotLeaderOrFollowerException()); + return; + } + log.error("Read entry error on {}", fullPartitionName, throwable); + future.completeExceptionally(new UnknownServerException(throwable)); + return; + } + + // Add new offset back to TCM after entries are read successfully + tcm.add(cursorOffset.get(), Pair.of(cursor, cursorOffset.get())); + + if (entries.isEmpty()) { + log.info("No more entries to recover for {}", fullPartitionName); + future.complete(entryCounter.get()); + return; + } + + CompletableFuture decodedEntries = new CompletableFuture<>(); + decodeEntriesForRecovery(decodedEntries, entries, topic); + + decodedEntries.thenAccept((decodeResult) -> { + try { + + MemoryRecords records = decodeResult.getRecords(); + Optional firstOffset = Optional + .ofNullable(records.firstBatch()) + .map(batch -> batch.baseOffset()); + + long[] lastOffSetHolder = {-1L}; + records.batches().forEach(batch -> { + batch.forEach(record -> { + if (lastOffSetHolder[0] < record.offset()) { + lastOffSetHolder[0] = record.offset(); + } + entryCounter.incrementAndGet(); + }); + }); + long lastOffset = lastOffSetHolder[0]; + + if (log.isDebugEnabled()) { + log.debug("Read some entries while recovering {} firstOffSet {} lastOffset {}", + fullPartitionName, + firstOffset.orElse(null), lastOffset); + } + + AnalyzeResult analyzeResult = analyzeAndValidateProducerState(records, + firstOffset, AppendOrigin.Log); + + updateProducerStateManager(lastOffset, analyzeResult); + if (log.isDebugEnabled()) { + log.debug("Completed recovery of batch {} {}", analyzeResult, fullPartitionName); + } + + readNextEntriesForRecovery(cursor, cursorOffset, tcm, topic, entryCounter, future); + + } finally { + decodeResult.recycle(); + } + }).exceptionally(error -> { + log.error("Bad error while recovering {}", fullPartitionName, error); + future.completeExceptionally(error); + return null; + }); + }); + } + + private void updateProducerStateManager(long lastOffset, AnalyzeResult analyzeResult) { + analyzeResult.updatedProducers().forEach((pid, producerAppendInfo) -> { + if (log.isDebugEnabled()) { + log.debug("Append pid: [{}], appendInfo: [{}], lastOffset: [{}]", + pid, producerAppendInfo, lastOffset); + } + producerStateManager.update(producerAppendInfo); + }); + analyzeResult.completedTxns().forEach(completedTxn -> { + // update to real last offset + completedTxn.lastOffset(lastOffset - 1); + long lastStableOffset = producerStateManager.lastStableOffset(completedTxn); + producerStateManager.updateTxnIndex(completedTxn, lastStableOffset); + producerStateManager.completeTxn(completedTxn); + }); + producerStateManager.updateMapEndOffset(lastOffset); + } + + private void decodeEntriesForRecovery(final CompletableFuture future, + final List entries, PersistentTopic topic) { + + if (log.isDebugEnabled()) { + log.debug("Read {} entries", entries.size()); + } + final byte magic = RecordBatch.CURRENT_MAGIC_VALUE; + final CompletableFuture entryFormatterHandle = + getEntryFormatter(CompletableFuture.completedFuture(Optional.of(topic))); + entryFormatterHandle.whenComplete((entryFormatter, ee) -> { + if (ee != null) { + future.completeExceptionally(new KafkaStorageException()); + return; + } + final long startDecodingEntriesNanos = MathUtils.nowInNano(); + try { + DecodeResult decodeResult = entryFormatter.decode(entries, magic); + requestStats.getFetchDecodeStats().registerSuccessfulEvent( + MathUtils.elapsedNanos(startDecodingEntriesNanos), TimeUnit.NANOSECONDS); + future.complete(decodeResult); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); + } + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java index 1cfd5c0e39..d7097bc105 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java @@ -15,53 +15,100 @@ import com.google.common.collect.Maps; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; import io.streamnative.pulsar.handlers.kop.RequestStats; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; import org.apache.pulsar.broker.service.plugin.EntryFilter; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; /** * Manage {@link PartitionLog}. */ @AllArgsConstructor +@Slf4j public class PartitionLogManager { private final KafkaServiceConfiguration kafkaConfig; private final RequestStats requestStats; - private final Map logMap; + private final Map> logMap; private final Time time; private final List entryFilters; + private final KafkaTopicLookupService kafkaTopicLookupService; + + private final Function producerStateManagerSnapshotBuffer; + public PartitionLogManager(KafkaServiceConfiguration kafkaConfig, RequestStats requestStats, final List entryFilters, - Time time) { + Time time, + KafkaTopicLookupService kafkaTopicLookupService, + Function + producerStateManagerSnapshotBuffer) { this.kafkaConfig = kafkaConfig; this.requestStats = requestStats; this.logMap = Maps.newConcurrentMap(); this.entryFilters = entryFilters; this.time = time; + this.kafkaTopicLookupService = kafkaTopicLookupService; + this.producerStateManagerSnapshotBuffer = producerStateManagerSnapshotBuffer; } - public PartitionLog getLog(TopicPartition topicPartition, String namespacePrefix) { + public CompletableFuture getLog(TopicPartition topicPartition, String namespacePrefix) { String kopTopic = KopTopic.toString(topicPartition, namespacePrefix); - + String tenant = TopicName.get(kopTopic).getTenant(); + ProducerStateManagerSnapshotBuffer prodPerTenant = producerStateManagerSnapshotBuffer.apply(tenant); return logMap.computeIfAbsent(kopTopic, key -> { - return new PartitionLog(kafkaConfig, requestStats, time, topicPartition, kopTopic, entryFilters, - new ProducerStateManager(kopTopic)); + CompletableFuture result = new PartitionLog(kafkaConfig, requestStats, + time, topicPartition, kopTopic, entryFilters, + kafkaTopicLookupService, + prodPerTenant) + .recoverTransactions(); + + result.exceptionally(error -> { + // in case of failure we have to remove the CompletableFuture from the map + log.error("Recovery of {} failed", key, error); + logMap.remove(key, result); + return null; + }); + + return result; }); } - public PartitionLog removeLog(String topicName) { + public CompletableFuture removeLog(String topicName) { + log.info("removeLog {}", topicName); return logMap.remove(topicName); } public int size() { return logMap.size(); } + + public CompletableFuture takeProducerStateSnapshots() { + List> handles = new ArrayList<>(); + logMap.values().forEach(log -> { + if (log.isDone() && !log.isCompletedExceptionally()) { + PartitionLog partitionLog = log.getNow(null); + if (partitionLog != null) { + handles.add(partitionLog + .getProducerStateManager() + .takeSnapshot() + .thenApply(___ -> null)); + } + } + }); + return FutureUtil.waitForAll(handles); + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerAppendInfo.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerAppendInfo.java index 0629d99196..2f10af9f77 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerAppendInfo.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerAppendInfo.java @@ -40,7 +40,7 @@ public class ProducerAppendInfo { private final String topicPartition; // The id of the producer appending to the log - private final Long producerId; + private final long producerId; // The current entry associated with the producer id which contains metadata for a fixed number of // the most recent appends made by the producer. Validation of the first incoming append will @@ -69,8 +69,9 @@ public ProducerAppendInfo(String topicPartition, initUpdatedEntry(); } - private void checkProducerEpoch(Short producerEpoch) { - if (producerEpoch < updatedEntry.producerEpoch()) { + private void checkProducerEpoch(short producerEpoch) { + if (updatedEntry.producerEpoch() != null + && producerEpoch < updatedEntry.producerEpoch()) { String message = String.format("Producer's epoch in %s is %s, which is smaller than the last seen " + "epoch %s", topicPartition, producerEpoch, currentEntry.producerEpoch()); throw new IllegalArgumentException(message); @@ -126,9 +127,9 @@ public void updateCurrentTxnFirstOffset(Boolean isTransactional, long firstOffse public Optional appendEndTxnMarker( EndTransactionMarker endTxnMarker, - Short producerEpoch, - Long offset, - Long timestamp) { + short producerEpoch, + long offset, + long timestamp) { checkProducerEpoch(producerEpoch); // Only emit the `CompletedTxn` for non-empty transactions. A transaction marker diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateEntry.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateEntry.java index b45b8630c3..735ce9987d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateEntry.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateEntry.java @@ -30,7 +30,7 @@ @AllArgsConstructor public class ProducerStateEntry { - private Long producerId; + private long producerId; private Short producerEpoch; private Integer coordinatorEpoch; private Long lastTimestamp; @@ -38,7 +38,8 @@ public class ProducerStateEntry { public boolean maybeUpdateProducerEpoch(Short producerEpoch) { - if (!this.producerEpoch.equals(producerEpoch)) { + if (this.producerEpoch == null + || !this.producerEpoch.equals(producerEpoch)) { this.producerEpoch = producerEpoch; return true; } else { @@ -52,7 +53,7 @@ public void update(ProducerStateEntry nextEntry) { this.lastTimestamp(nextEntry.lastTimestamp); } - public static ProducerStateEntry empty(Long producerId){ + public static ProducerStateEntry empty(long producerId){ return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, Optional.empty()); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java index 08ab544038..6d41cbd29a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java @@ -14,89 +14,25 @@ package io.streamnative.pulsar.handlers.kop.storage; import com.google.common.collect.Maps; -import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.experimental.Accessors; +import java.util.concurrent.CompletableFuture; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.FetchResponse; -/** - * AbortedTxn is used cache the aborted index. - */ -@Data -@Accessors(fluent = true) -@AllArgsConstructor -class AbortedTxn { - - private static final int VersionOffset = 0; - private static final int VersionSize = 2; - private static final int ProducerIdOffset = VersionOffset + VersionSize; - private static final int ProducerIdSize = 8; - private static final int FirstOffsetOffset = ProducerIdOffset + ProducerIdSize; - private static final int FirstOffsetSize = 8; - private static final int LastOffsetOffset = FirstOffsetOffset + FirstOffsetSize; - private static final int LastOffsetSize = 8; - private static final int LastStableOffsetOffset = LastOffsetOffset + LastOffsetSize; - private static final int LastStableOffsetSize = 8; - private static final int TotalSize = LastStableOffsetOffset + LastStableOffsetSize; - - private static final Short CurrentVersion = 0; - - private final Long producerId; - private final Long firstOffset; - private final Long lastOffset; - private final Long lastStableOffset; - - protected ByteBuffer toByteBuffer() { - ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TotalSize); - buffer.putShort(CurrentVersion); - buffer.putLong(producerId); - buffer.putLong(firstOffset); - buffer.putLong(lastOffset); - buffer.putLong(lastStableOffset); - buffer.flip(); - return buffer; - } -} - -@Data -@Accessors(fluent = true) -@AllArgsConstructor -class CompletedTxn { - private Long producerId; - private Long firstOffset; - private Long lastOffset; - private Boolean isAborted; -} - -@Data -@Accessors(fluent = true) -@EqualsAndHashCode -class TxnMetadata { - private final long producerId; - private final long firstOffset; - private long lastOffset; - - public TxnMetadata(long producerId, long firstOffset) { - this.producerId = producerId; - this.firstOffset = firstOffset; - } -} - /** * Producer state manager. */ @Slf4j public class ProducerStateManager { + @Getter private final String topicPartition; private final Map producers = Maps.newConcurrentMap(); @@ -104,8 +40,65 @@ public class ProducerStateManager { private final TreeMap ongoingTxns = Maps.newTreeMap(); private final List abortedIndexList = new ArrayList<>(); - public ProducerStateManager(String topicPartition) { + private final ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer; + + private volatile long mapEndOffset = -1; + + public ProducerStateManager(String topicPartition, + ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer) { this.topicPartition = topicPartition; + this.producerStateManagerSnapshotBuffer = producerStateManagerSnapshotBuffer; + } + + public CompletableFuture recover(PartitionLog partitionLog) { + return producerStateManagerSnapshotBuffer + .readLatestSnapshot(topicPartition) + .thenCompose(snapshot -> applySnapshotAndRecover(snapshot, partitionLog)); + } + + private CompletableFuture applySnapshotAndRecover(ProducerStateManagerSnapshot snapshot, + PartitionLog partitionLog) { + this.abortedIndexList.clear(); + this.producers.clear(); + this.ongoingTxns.clear(); + long offSetPosition = 0; + if (snapshot != null) { + this.abortedIndexList.addAll(snapshot.getAbortedIndexList()); + this.producers.putAll(snapshot.getProducers()); + this.ongoingTxns.putAll(snapshot.getOngoingTxns()); + offSetPosition = snapshot.getOffset(); + log.info("Recover topic {} from offset {}", topicPartition, offSetPosition); + } else { + log.info("No snapshot found for topic {}, recovering from the beginning", topicPartition); + } + long startRecovery = System.currentTimeMillis(); + // recover from log + return partitionLog + .recoverTxEntries(offSetPosition, this) + .thenCompose(numEntries -> { + log.info("Recovery of {} finished. Scanned {} entries, time {} ms, new mapEndOffset {}", + topicPartition, + numEntries, + System.currentTimeMillis() - startRecovery, + mapEndOffset); + return takeSnapshot() + .thenApply(____ -> (Void) null); + }); + } + + public CompletableFuture takeSnapshot() { + log.info("Taking snapshot for {} mapEndOffset is {}", topicPartition, mapEndOffset); + ProducerStateManagerSnapshot snapshot = new ProducerStateManagerSnapshot(topicPartition, + mapEndOffset, + new HashMap<>(producers), + new TreeMap<>(ongoingTxns), + new ArrayList<>(abortedIndexList)); + return producerStateManagerSnapshotBuffer + .write(snapshot) + .thenApply(___ -> { + log.info("Snapshot for {} taken", topicPartition); + return snapshot; + }); } public ProducerAppendInfo prepareUpdate(Long producerId, PartitionLog.AppendOrigin origin) { @@ -120,7 +113,7 @@ public ProducerAppendInfo prepareUpdate(Long producerId, PartitionLog.AppendOrig */ public long lastStableOffset(CompletedTxn completedTxn) { for (TxnMetadata txnMetadata : ongoingTxns.values()) { - if (!completedTxn.producerId().equals(txnMetadata.producerId())) { + if (completedTxn.producerId() != txnMetadata.producerId()) { return txnMetadata.firstOffset(); } } @@ -173,6 +166,10 @@ public void update(ProducerAppendInfo appendInfo) { } } + public void updateMapEndOffset(long mapEndOffset) { + this.mapEndOffset = mapEndOffset; + } + public void updateTxnIndex(CompletedTxn completedTxn, long lastStableOffset) { if (completedTxn.isAborted()) { abortedIndexList.add(new AbortedTxn(completedTxn.producerId(), completedTxn.firstOffset(), diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerSnapshot.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerSnapshot.java new file mode 100644 index 0000000000..4c492af9e6 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerSnapshot.java @@ -0,0 +1,33 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.storage; + +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public final class ProducerStateManagerSnapshot { + private final String topicPartition; + private final long offset; + private final Map producers; + + // ongoing transactions sorted by the first offset of the transaction + private final TreeMap ongoingTxns; + private final List abortedIndexList; + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerSnapshotBuffer.java new file mode 100644 index 0000000000..622561cf93 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerSnapshotBuffer.java @@ -0,0 +1,43 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.storage; + +import java.util.concurrent.CompletableFuture; + +/** + * Stores snapshots of the state of ProducerStateManagers. + * One ProducerStateManagerSnapshotBuffer handles all the topics for a Tenant. + */ +public interface ProducerStateManagerSnapshotBuffer { + + /** + * Writes a snapshot to the storage. + * @param snapshot + * @return a handle to the operation + */ + CompletableFuture write(ProducerStateManagerSnapshot snapshot); + + /** + * Reads the latest available snapshot for a given partition. + * @param topicPartition + * @return + */ + CompletableFuture readLatestSnapshot(String topicPartition); + + /** + * Shutdown and release resources. + */ + default void shutdown() {} + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java new file mode 100644 index 0000000000..941614ca1e --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java @@ -0,0 +1,335 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.storage; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.Unpooled; +import io.streamnative.pulsar.handlers.kop.SystemTopicClient; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.common.util.FutureUtil; + +@Slf4j +public class PulsarTopicProducerStateManagerSnapshotBuffer implements ProducerStateManagerSnapshotBuffer { + + private Map latestSnapshots = new ConcurrentHashMap<>(); + private final String topic; + private final SystemTopicClient pulsarClient; + private CompletableFuture> reader; + private CompletableFuture currentReadHandle; + + private synchronized CompletableFuture> ensureReaderHandle() { + if (reader == null) { + reader = pulsarClient.newReaderBuilder() + .topic(topic) + .startMessageId(MessageId.earliest) + .readCompacted(true) + .createAsync(); + } + return reader; + } + + private CompletableFuture readNextMessageIfAvailable(Reader reader) { + return reader + .hasMessageAvailableAsync() + .thenCompose(hasMessageAvailable -> { + if (hasMessageAvailable == null + || !hasMessageAvailable) { + return CompletableFuture.completedFuture(null); + } else { + CompletableFuture> opMessage = reader.readNextAsync(); + return opMessage.thenCompose(msg -> { + processMessage(msg); + return readNextMessageIfAvailable(reader); + }); + } + }); + } + + + private synchronized CompletableFuture ensureLatestData(boolean beforeWrite) { + if (currentReadHandle != null) { + if (beforeWrite) { + // we are inside a write loop, so + // we must ensure that we start to read now + // otherwise the write would use non up-to-date data + // so let's finish the current loop + if (log.isDebugEnabled()) { + log.debug("A read was already pending, starting a new one in order to ensure consistency"); + } + return currentReadHandle + .thenCompose(___ -> ensureLatestData(false)); + } + // if there is an ongoing read operation then complete it + return currentReadHandle; + } + // please note that the read operation is async, + // and it is not execute inside this synchronized block + CompletableFuture> readerHandle = ensureReaderHandle(); + final CompletableFuture newReadHandle = + readerHandle.thenCompose(this::readNextMessageIfAvailable); + currentReadHandle = newReadHandle; + return newReadHandle.thenApply((__) -> { + endReadLoop(newReadHandle); + return null; + }); + } + + private synchronized void endReadLoop(CompletableFuture handle) { + if (handle == currentReadHandle) { + currentReadHandle = null; + } + } + + @Override + public CompletableFuture write(ProducerStateManagerSnapshot snapshot) { + ByteBuffer serialized = serialize(snapshot); + if (serialized == null) { + // cannot serialise, skip + return CompletableFuture.completedFuture(null); + } + CompletableFuture> producerHandle = pulsarClient.newProducerBuilder() + .enableBatching(false) + .topic(topic) + .blockIfQueueFull(true) + .createAsync(); + return producerHandle.thenCompose(opProducer -> { + // nobody can write now to the topic + // wait for local cache to be up-to-date + CompletableFuture dummy = ensureLatestData(true) + .thenCompose((___) -> { + ProducerStateManagerSnapshot latest = latestSnapshots.get(snapshot.getTopicPartition()); + if (latest != null && latest.getOffset() > snapshot.getOffset()) { + log.error("Topic ownership changed for {}. Found a snapshot at {} " + + "while trying to write the snapshot at {}", snapshot.getTopicPartition(), + latest.getOffset(), snapshot.getOffset()); + return FutureUtil.failedFuture(new NotLeaderOrFollowerException("No more owner of " + + "ProducerState for topic " + topic)); + } + return opProducer + .newMessage() + .key(snapshot.getTopicPartition()) // leverage compaction + .value(serialized) + .sendAsync() + .thenApply((msgId) -> { + if (log.isDebugEnabled()) { + log.debug("{} written {} as {}", this, snapshot, msgId); + } + latestSnapshots.put(snapshot.getTopicPartition(), snapshot); + return null; + }); + }); + // ensure that we release the exclusive producer in any case + return dummy.whenComplete((___, err) -> { + opProducer.closeAsync().whenComplete((____, errorClose) -> { + if (errorClose != null) { + log.error("Error closing producer for {}", topic, errorClose); + } + }); + }); + }); + } + + private static ByteBuffer serialize(ProducerStateManagerSnapshot snapshot) { + + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(); + try (DataOutputStream dataOutputStream = + new DataOutputStream(new ByteBufOutputStream(byteBuf));) { + + dataOutputStream.writeUTF(snapshot.getTopicPartition()); + dataOutputStream.writeLong(snapshot.getOffset()); + + dataOutputStream.writeInt(snapshot.getProducers().size()); + for (Map.Entry entry : snapshot.getProducers().entrySet()) { + ProducerStateEntry producer = entry.getValue(); + dataOutputStream.writeLong(producer.producerId()); + if (producer.producerEpoch() != null) { + dataOutputStream.writeInt(producer.producerEpoch()); + } else { + dataOutputStream.writeInt(-1); + } + if (producer.coordinatorEpoch() != null) { + dataOutputStream.writeInt(producer.coordinatorEpoch()); + } else { + dataOutputStream.writeInt(-1); + } + if (producer.lastTimestamp() != null) { + dataOutputStream.writeLong(producer.lastTimestamp()); + } else { + dataOutputStream.writeLong(-1L); + } + if (producer.currentTxnFirstOffset().isPresent()) { + dataOutputStream.writeLong(producer.currentTxnFirstOffset().get()); + } else { + dataOutputStream.writeLong(-1); + } + } + + dataOutputStream.writeInt(snapshot.getOngoingTxns().size()); + for (Map.Entry entry : snapshot.getOngoingTxns().entrySet()) { + TxnMetadata tx = entry.getValue(); + dataOutputStream.writeLong(tx.producerId()); + dataOutputStream.writeLong(tx.firstOffset()); + dataOutputStream.writeLong(tx.lastOffset()); + } + + dataOutputStream.writeInt(snapshot.getAbortedIndexList().size()); + for (AbortedTxn tx : snapshot.getAbortedIndexList()) { + dataOutputStream.writeLong(tx.producerId()); + dataOutputStream.writeLong(tx.firstOffset()); + dataOutputStream.writeLong(tx.lastOffset()); + dataOutputStream.writeLong(tx.lastStableOffset()); + } + + dataOutputStream.flush(); + + return byteBuf.nioBuffer(); + + } catch (IOException err) { + log.error("Cannot serialise snapshot {}", snapshot, err); + return null; + } + } + + private static ProducerStateManagerSnapshot deserialize(ByteBuffer buffer) { + + try (DataInputStream dataInputStream = + new DataInputStream(new ByteBufInputStream(Unpooled.wrappedBuffer(buffer)));) { + String topicPartition = dataInputStream.readUTF(); + long offset = dataInputStream.readLong(); + + int numProducers = dataInputStream.readInt(); + Map producers = new HashMap<>(); + for (int i = 0; i < numProducers; i++) { + long producerId = dataInputStream.readLong(); + Integer producerEpoch = dataInputStream.readInt(); + if (producerEpoch == -1) { + producerEpoch = null; + } + Integer coordinatorEpoch = dataInputStream.readInt(); + if (coordinatorEpoch == -1) { + coordinatorEpoch = null; + } + Long lastTimestamp = dataInputStream.readLong(); + if (lastTimestamp == -1) { + lastTimestamp = null; + } + Long currentTxFirstOffset = dataInputStream.readLong(); + if (currentTxFirstOffset == -1) { + currentTxFirstOffset = null; + } + ProducerStateEntry entry = ProducerStateEntry.empty(producerId) + .producerEpoch(producerEpoch != null ? producerEpoch.shortValue() : null) + .coordinatorEpoch(coordinatorEpoch) + .lastTimestamp(lastTimestamp) + .currentTxnFirstOffset(Optional.ofNullable(currentTxFirstOffset)); + producers.put(producerId, entry); + } + + int numOngoingTxns = dataInputStream.readInt(); + TreeMap ongoingTxns = new TreeMap<>(); + for (int i = 0; i < numOngoingTxns; i++) { + long producerId = dataInputStream.readLong(); + long firstOffset = dataInputStream.readLong(); + long lastOffset = dataInputStream.readLong(); + ongoingTxns.put(firstOffset, new TxnMetadata(producerId, firstOffset) + .lastOffset(lastOffset)); + } + + int numAbortedIndexList = dataInputStream.readInt(); + List abortedTxnList = new ArrayList<>(); + for (int i = 0; i < numAbortedIndexList; i++) { + long producerId = dataInputStream.readLong(); + long firstOffset = dataInputStream.readLong(); + long lastOffset = dataInputStream.readLong(); + long lastStableOffset = dataInputStream.readLong(); + abortedTxnList.add(new AbortedTxn(producerId, firstOffset, lastOffset, lastStableOffset)); + } + + return new ProducerStateManagerSnapshot(topicPartition, offset, + producers, ongoingTxns, abortedTxnList); + + } catch (IOException err) { + log.error("Cannot deserialize snapshot", err); + return null; + } + } + + private void processMessage(Message msg) { + ProducerStateManagerSnapshot deserialize = deserialize(msg.getValue()); + if (deserialize != null) { + String key = msg.hasKey() ? msg.getKey() : null; + if (Objects.equals(key, deserialize.getTopicPartition())) { + if (log.isDebugEnabled()) { + log.info("found snapshot for {} : {}", deserialize.getTopicPartition(), deserialize); + } + latestSnapshots.put(deserialize.getTopicPartition(), deserialize); + } else { + log.error("Found erroneous snapshot with key {} but for topic {}: {}", + key, deserialize.getTopicPartition(), deserialize); + } + } + } + + @Override + public CompletableFuture readLatestSnapshot(String topicPartition) { + log.info("Reading latest snapshot for {}", topicPartition); + return ensureLatestData(false).thenApply(__ -> { + ProducerStateManagerSnapshot result = latestSnapshots.get(topicPartition); + log.info("Latest snapshot for {} is {}", topicPartition, result); + return result; + }); + } + + public PulsarTopicProducerStateManagerSnapshotBuffer(String topicName, SystemTopicClient pulsarClient) { + this.topic = topicName; + this.pulsarClient = pulsarClient; + } + + + @Override + public synchronized void shutdown() { + if (reader != null) { + reader.whenComplete((r, e) -> { + if (r != null) { + r.closeAsync().whenComplete((___, err) -> { + if (err != null) { + log.error("Error closing reader for {}", topic, err); + } + }); + } + }); + } + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index aad7dbd41b..f98887ddd5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -17,6 +17,7 @@ import io.streamnative.pulsar.handlers.kop.DelayedFetch; import io.streamnative.pulsar.handlers.kop.DelayedProduceAndFetch; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; import io.streamnative.pulsar.handlers.kop.MessageFetchContext; import io.streamnative.pulsar.handlers.kop.RequestStats; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; @@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -54,6 +56,7 @@ public class ReplicaManager { private final PartitionLogManager logManager; private final DelayedOperationPurgatory producePurgatory; private final DelayedOperationPurgatory fetchPurgatory; + private final String metadataNamespace; public ReplicaManager(KafkaServiceConfiguration kafkaConfig, @@ -61,19 +64,22 @@ public ReplicaManager(KafkaServiceConfiguration kafkaConfig, Time time, List entryFilters, DelayedOperationPurgatory producePurgatory, - DelayedOperationPurgatory fetchPurgatory) { - this.logManager = new PartitionLogManager(kafkaConfig, requestStats, entryFilters, time); + DelayedOperationPurgatory fetchPurgatory, + KafkaTopicLookupService kafkaTopicLookupService, + Function producerStateManagerSnapshotBuffer) { + this.logManager = new PartitionLogManager(kafkaConfig, requestStats, entryFilters, + time, kafkaTopicLookupService, producerStateManagerSnapshotBuffer); this.producePurgatory = producePurgatory; this.fetchPurgatory = fetchPurgatory; this.metadataNamespace = kafkaConfig.getKafkaMetadataNamespace(); } - public PartitionLog getPartitionLog(TopicPartition topicPartition, String namespacePrefix) { + public CompletableFuture getPartitionLog(TopicPartition topicPartition, String namespacePrefix) { return logManager.getLog(topicPartition, namespacePrefix); } public void removePartitionLog(String topicName) { - PartitionLog partitionLog = logManager.removeLog(topicName); + CompletableFuture partitionLog = logManager.removeLog(topicName); if (log.isDebugEnabled() && partitionLog != null) { log.debug("PartitionLog: {} has bean removed.", partitionLog); } @@ -134,18 +140,19 @@ public CompletableFuture> new PendingProduceCallback(topicPartitionNum, responseMap, completableFuture, entriesPerPartition); BiConsumer addPartitionResponse = (topicPartition, response) -> { - responseMap.put(topicPartition, response); - // reset topicPartitionNum - int restTopicPartitionNum = topicPartitionNum.decrementAndGet(); - if (restTopicPartitionNum < 0) { - return; - } - if (restTopicPartitionNum == 0) { - // If all tasks are sent, cancel the timer tasks to avoid full gc or oom - producePurgatory.checkAndComplete(new DelayedOperationKey.TopicPartitionOperationKey(topicPartition)); - complete.run(); - } - }; + responseMap.put(topicPartition, response); + // reset topicPartitionNum + int restTopicPartitionNum = topicPartitionNum.decrementAndGet(); + if (restTopicPartitionNum < 0) { + return; + } + if (restTopicPartitionNum == 0) { + // If all tasks are sent, cancel the timer tasks to avoid full gc or oom + producePurgatory.checkAndComplete( + new DelayedOperationKey.TopicPartitionOperationKey(topicPartition)); + complete.run(); + } + }; entriesPerPartition.forEach((topicPartition, memoryRecords) -> { String fullPartitionName = KopTopic.toString(topicPartition, namespacePrefix); // reject appending to internal topics if it is not allowed @@ -154,15 +161,16 @@ public CompletableFuture> Errors.forException(new InvalidTopicException( String.format("Cannot append to internal topic %s", topicPartition.topic()))))); } else { - PartitionLog partitionLog = getPartitionLog(topicPartition, namespacePrefix); - partitionLog.appendRecords(memoryRecords, origin, appendRecordsContext) - .thenAccept(offset -> addPartitionResponse.accept(topicPartition, - new ProduceResponse.PartitionResponse(Errors.NONE, offset, -1L, -1L))) - .exceptionally(ex -> { - addPartitionResponse.accept(topicPartition, - new ProduceResponse.PartitionResponse(Errors.forException(ex.getCause()))); - return null; - }); + getPartitionLog(topicPartition, namespacePrefix).thenAccept(partitionLog -> { + partitionLog.appendRecords(memoryRecords, origin, appendRecordsContext) + .thenAccept(offset -> addPartitionResponse.accept(topicPartition, + new ProduceResponse.PartitionResponse(Errors.NONE, offset, -1L, -1L))) + .exceptionally(ex -> { + addPartitionResponse.accept(topicPartition, + new ProduceResponse.PartitionResponse(Errors.forException(ex.getCause()))); + return null; + }); + }); } }); // delay produce @@ -253,12 +261,17 @@ public CompletableFuture> re }; readPartitionInfo.forEach((tp, fetchInfo) -> { getPartitionLog(tp, context.getNamespacePrefix()) - .readRecords(fetchInfo, readCommitted, - limitBytes, maxReadEntriesNum, context) - .thenAccept(readResult -> { - result.put(tp, readResult); - complete.run(); + .thenCompose(partitionLog ->{ + return partitionLog + .readRecords(fetchInfo, readCommitted, + limitBytes, maxReadEntriesNum, context + ) + .thenAccept(readResult -> { + result.put(tp, readResult); + complete.run(); + }); }); + }); return resultFuture; } @@ -270,4 +283,9 @@ public void tryCompleteDelayedFetch(DelayedOperationKey key) { } } + + public CompletableFuture takeProducerStateSnapshots() { + return logManager.takeProducerStateSnapshots(); + } + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/TxnMetadata.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/TxnMetadata.java new file mode 100644 index 0000000000..b512d9139b --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/TxnMetadata.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.storage; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.Accessors; + +@Data +@Accessors(fluent = true) +@EqualsAndHashCode +public final class TxnMetadata { + private final long producerId; + private final long firstOffset; + private long lastOffset; + + public TxnMetadata(long producerId, long firstOffset) { + this.producerId = producerId; + this.firstOffset = firstOffset; + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java index f1dcd8c06b..934bdc5965 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java @@ -40,7 +40,7 @@ public class MetadataUtils { public static String constructOffsetsTopicBaseName(String tenant, KafkaServiceConfiguration conf) { return tenant + "/" + conf.getKafkaMetadataNamespace() - + "/" + Topic.GROUP_METADATA_TOPIC_NAME; + + "/" + Topic.GROUP_METADATA_TOPIC_NAME; } public static String constructTxnLogTopicBaseName(String tenant, KafkaServiceConfiguration conf) { @@ -48,6 +48,11 @@ public static String constructTxnLogTopicBaseName(String tenant, KafkaServiceCon + "/" + Topic.TRANSACTION_STATE_TOPIC_NAME; } + public static String constructTxProducerStateTopicBaseName(String tenant, KafkaServiceConfiguration conf) { + return tenant + "/" + conf.getKafkaMetadataNamespace() + + "/__transaction_producer_state"; + } + public static String constructTxnProducerIdTopicBaseName(String tenant, KafkaServiceConfiguration conf) { return tenant + "/" + conf.getKafkaMetadataNamespace() + "/__transaction_producerid_generator"; @@ -62,6 +67,10 @@ public static String constructMetadataNamespace(String tenant, KafkaServiceConfi return tenant + "/" + conf.getKafkaMetadataNamespace(); } + public static String constructProducerIdTopicNamespace(String tenant, KafkaServiceConfiguration conf) { + return tenant + "/" + conf.getKafkaTransactionProducerIdsNamespace(); + } + public static String constructUserTopicsNamespace(String tenant, KafkaServiceConfiguration conf) { return tenant + "/" + conf.getKafkaNamespace(); } @@ -73,7 +82,7 @@ public static void createOffsetMetadataIfMissing(String tenant, PulsarAdmin puls KopTopic kopTopic = new KopTopic(constructOffsetsTopicBaseName(tenant, conf), constructMetadataNamespace(tenant, conf)); createKafkaMetadataIfMissing(tenant, conf.getKafkaMetadataNamespace(), pulsarAdmin, clusterData, conf, kopTopic, - conf.getOffsetsTopicNumPartitions(), false); + conf.getOffsetsTopicNumPartitions(), true, false); } public static void createTxnMetadataIfMissing(String tenant, @@ -84,11 +93,17 @@ public static void createTxnMetadataIfMissing(String tenant, KopTopic kopTopic = new KopTopic(constructTxnLogTopicBaseName(tenant, conf), constructMetadataNamespace(tenant, conf)); createKafkaMetadataIfMissing(tenant, conf.getKafkaMetadataNamespace(), pulsarAdmin, clusterData, conf, kopTopic, - conf.getKafkaTxnLogTopicNumPartitions(), false); + conf.getKafkaTxnLogTopicNumPartitions(), true, false); + KopTopic kopTopicProducerState = new KopTopic(constructTxProducerStateTopicBaseName(tenant, conf), + constructMetadataNamespace(tenant, conf)); + createKafkaMetadataIfMissing(tenant, conf.getKafkaMetadataNamespace(), pulsarAdmin, clusterData, conf, + kopTopicProducerState, conf.getKafkaTxnProducerStateTopicNumPartitions(), true, false); if (conf.isKafkaTransactionProducerIdsStoredOnPulsar()) { KopTopic producerIdKopTopic = new KopTopic(constructTxnProducerIdTopicBaseName(tenant, conf), - constructMetadataNamespace(tenant, conf)); - createTopicIfNotExist(pulsarAdmin, producerIdKopTopic.getFullName(), 1); + constructProducerIdTopicNamespace(tenant, conf)); + createKafkaMetadataIfMissing(tenant, conf.getKafkaTransactionProducerIdsNamespace(), + pulsarAdmin, clusterData, conf, producerIdKopTopic, + conf.getKafkaTxnLogTopicNumPartitions(), false, true); } } @@ -113,8 +128,9 @@ private static void createKafkaMetadataIfMissing(String tenant, KafkaServiceConfiguration conf, KopTopic kopTopic, int partitionNum, + boolean partitioned, boolean infiniteRetention) - throws PulsarAdminException { + throws PulsarAdminException { if (!conf.isKafkaManageSystemNamespaces()) { log.info("Skipping initialization of topic {} for tenant {}", kopTopic.getFullName(), tenant); return; @@ -159,7 +175,7 @@ private static void createKafkaMetadataIfMissing(String tenant, namespaceExists = true; // Check if the offsets topic exists and create it if not - createTopicIfNotExist(pulsarAdmin, kopTopic.getFullName(), partitionNum); + createTopicIfNotExist(conf, pulsarAdmin, kopTopic.getFullName(), partitionNum, partitioned); offsetsTopicExists = true; } catch (PulsarAdminException e) { if (e instanceof ConflictException) { @@ -168,7 +184,7 @@ private static void createKafkaMetadataIfMissing(String tenant, } log.error("Failed to successfully initialize Kafka Metadata {}", - kafkaMetadataNamespace, e); + kafkaMetadataNamespace, e); throw e; } finally { log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}," @@ -311,18 +327,30 @@ public static void createKafkaNamespaceIfMissing(PulsarAdmin pulsarAdmin, } } - private static void createTopicIfNotExist(final PulsarAdmin admin, + private static void createTopicIfNotExist(final KafkaServiceConfiguration conf, + final PulsarAdmin admin, final String topic, - final int numPartitions) throws PulsarAdminException { - try { - admin.topics().createPartitionedTopic(topic, numPartitions); - } catch (PulsarAdminException.ConflictException e) { - log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage()); - } - try { - // Ensure all partitions are created - admin.topics().createMissedPartitions(topic); - } catch (PulsarAdminException ignored) { + final int numPartitions, + final boolean partitioned) throws PulsarAdminException { + if (partitioned) { + log.info("Creating partitioned topic {} (with {} partitions) if it does not exist", topic, numPartitions); + try { + admin.topics().createPartitionedTopic(topic, numPartitions); + } catch (PulsarAdminException.ConflictException e) { + log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage()); + } + try { + // Ensure all partitions are created + admin.topics().createMissedPartitions(topic); + } catch (PulsarAdminException ignored) { + } + } else { + log.info("Creating non-partitioned topic {}-{} if it does not exist", topic, numPartitions); + try { + admin.topics().createNonPartitionedTopic(topic); + } catch (PulsarAdminException.ConflictException e) { + log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage()); + } } } @@ -334,6 +362,6 @@ public static void createSchemaRegistryMetadataIfMissing(String tenant, KopTopic kopTopic = new KopTopic(constructSchemaRegistryTopicName(tenant, conf), constructMetadataNamespace(tenant, conf)); createKafkaMetadataIfMissing(tenant, conf.getKopSchemaRegistryNamespace(), pulsarAdmin, clusterData, - conf, kopTopic, 1, true); + conf, kopTopic, 1, false, true); } } diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java index 3126872d0b..b43cf3e7a8 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java @@ -13,9 +13,12 @@ */ package io.streamnative.pulsar.handlers.kop.format; +import static org.mockito.Mockito.mock; + import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; +import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.storage.PartitionLog; -import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManager; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Optional; @@ -29,7 +32,6 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Time; - /** * The performance test for {@link EntryFormatter#encode(EncodeRequest)}. */ @@ -48,7 +50,8 @@ public class EncodePerformanceTest { new TopicPartition("test", 1), "test", null, - new ProducerStateManager("test")); + mock(KafkaTopicLookupService.class), + new MemoryProducerStateManagerSnapshotBuffer()); public static void main(String[] args) { pulsarServiceConfiguration.setEntryFormat("pulsar"); diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java index 41690a38ef..923027687d 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java @@ -19,8 +19,9 @@ import static org.mockito.Mockito.mock; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; +import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.storage.PartitionLog; -import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManager; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -79,7 +80,8 @@ public class EntryFormatterTest { new TopicPartition("test", 1), "test", null, - new ProducerStateManager("test")); + mock(KafkaTopicLookupService.class), + new MemoryProducerStateManagerSnapshotBuffer()); private void init() { pulsarServiceConfiguration.setEntryFormat("pulsar"); diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java index 442f94034c..1ed3f7607c 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java @@ -66,6 +66,8 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { .constructOffsetsTopicBaseName(conf.getKafkaMetadataTenant(), conf), namespacePrefix); final KopTopic txnTopic = new KopTopic(MetadataUtils .constructTxnLogTopicBaseName(conf.getKafkaMetadataTenant(), conf), namespacePrefix); + final KopTopic txnProducerStateTopic = new KopTopic(MetadataUtils + .constructTxProducerStateTopicBaseName(conf.getKafkaMetadataTenant(), conf), namespacePrefix); List emptyList = Lists.newArrayList(); @@ -83,6 +85,8 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { Topics mockTopics = mock(Topics.class); doReturn(offsetTopicMetadata).when(mockTopics).getPartitionedTopicMetadata(eq(offsetsTopic.getFullName())); doReturn(offsetTopicMetadata).when(mockTopics).getPartitionedTopicMetadata(eq(txnTopic.getFullName())); + doReturn(offsetTopicMetadata).when(mockTopics) + .getPartitionedTopicMetadata(eq(txnProducerStateTopic.getFullName())); PulsarAdmin mockPulsarAdmin = mock(PulsarAdmin.class); @@ -125,6 +129,8 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { eq(offsetsTopic.getFullName()), eq(conf.getOffsetsTopicNumPartitions())); verify(mockTopics, times(1)).createPartitionedTopic( eq(txnTopic.getFullName()), eq(conf.getKafkaTxnLogTopicNumPartitions())); + verify(mockTopics, times(1)).createPartitionedTopic( + eq(txnProducerStateTopic.getFullName()), eq(conf.getKafkaTxnProducerStateTopicNumPartitions())); // check user topics namespace doesn't set the policy verify(mockNamespaces, times(1)).createNamespace(eq(conf.getKafkaTenant() + "/" + conf.getKafkaNamespace()), any(Set.class)); @@ -172,11 +178,16 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { for (int i = 0; i < conf.getKafkaTxnLogTopicNumPartitions() - 2; i++) { incompletePartitionList.add(txnTopic.getPartitionName(i)); } + for (int i = 0; i < conf.getKafkaTxnProducerStateTopicNumPartitions() - 2; i++) { + incompletePartitionList.add(txnProducerStateTopic.getPartitionName(i)); + } doReturn(new PartitionedTopicMetadata(8)).when(mockTopics) .getPartitionedTopicMetadata(eq(offsetsTopic.getFullName())); doReturn(new PartitionedTopicMetadata(8)).when(mockTopics) .getPartitionedTopicMetadata(eq(txnTopic.getFullName())); + doReturn(new PartitionedTopicMetadata(8)).when(mockTopics) + .getPartitionedTopicMetadata(eq(txnProducerStateTopic.getFullName())); doReturn(incompletePartitionList).when(mockTopics).getList(eq(conf.getKafkaMetadataTenant() + "/" + conf.getKafkaMetadataNamespace())); @@ -184,10 +195,11 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { MetadataUtils.createTxnMetadataIfMissing(conf.getKafkaMetadataTenant(), mockPulsarAdmin, clusterData, conf); verify(mockTenants, times(1)).updateTenant(eq(conf.getKafkaMetadataTenant()), any(TenantInfo.class)); - verify(mockNamespaces, times(2)).setNamespaceReplicationClusters(eq(conf.getKafkaMetadataTenant() + verify(mockNamespaces, times(3)).setNamespaceReplicationClusters(eq(conf.getKafkaMetadataTenant() + "/" + conf.getKafkaMetadataNamespace()), any(Set.class)); verify(mockTopics, times(1)).createMissedPartitions(contains(offsetsTopic.getOriginalName())); verify(mockTopics, times(1)).createMissedPartitions(contains(txnTopic.getOriginalName())); + verify(mockTopics, times(1)).createMissedPartitions(contains(txnProducerStateTopic.getOriginalName())); } @Test(timeOut = 30000) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java index b7e1c55c08..945f73bf3b 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java @@ -19,6 +19,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; +import static org.testng.Assert.fail; import com.google.common.collect.ImmutableMap; import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState; @@ -53,6 +54,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.pulsar.common.naming.TopicName; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -74,6 +76,12 @@ protected void setup() throws Exception { this.conf.setKafkaTxnLogTopicNumPartitions(50); this.conf.setKafkaTransactionCoordinatorEnabled(true); this.conf.setBrokerDeduplicationEnabled(true); + + // enable tx expiration, but producers have + // a very long TRANSACTION_TIMEOUT_CONFIG + // so they won't expire by default + this.conf.setKafkaTransactionalIdExpirationMs(5000); + this.conf.setKafkaTransactionalIdExpirationEnable(true); super.internalSetup(); log.info("success internal setup"); } @@ -141,7 +149,7 @@ public void testMultiCommits() throws Exception { }); } - public void basicProduceAndConsumeTest(String topicName, + private void basicProduceAndConsumeTest(String topicName, String transactionalId, String isolation, boolean isBatch) throws Exception { @@ -183,7 +191,19 @@ public void basicProduceAndConsumeTest(String topicName, } } - consumeTxnMessage(topicName, totalTxnCount * messageCountPerTxn, lastMessage, isolation); + final int expected; + switch (isolation) { + case "read_committed": + expected = totalTxnCount * messageCountPerTxn / 2; + break; + case "read_uncommitted": + expected = totalTxnCount * messageCountPerTxn; + break; + default: + expected = -1; + fail(); + } + consumeTxnMessage(topicName, expected, lastMessage, isolation); } private void consumeTxnMessage(String topicName, @@ -203,7 +223,7 @@ private void consumeTxnMessage(String topicName, boolean readFinish = false; for (ConsumerRecord record : consumerRecords) { if (isolation.equals("read_committed")) { - assertFalse(record.value().contains("abort msg txnIndex")); + assertFalse(record.value().contains("abort")); } log.info("Fetch for receive record offset: {}, key: {}, value: {}", record.offset(), record.key(), record.value()); @@ -220,12 +240,7 @@ private void consumeTxnMessage(String topicName, } } log.info("Fetch for receive message finish. isolation: {}, receive count: {}", isolation, receiveCount.get()); - - if (isolation.equals("read_committed")) { - Assert.assertEquals(receiveCount.get(), totalMessageCount / 2); - } else { - Assert.assertEquals(receiveCount.get(), totalMessageCount); - } + Assert.assertEquals(receiveCount.get(), totalMessageCount); log.info("Fetch for finish consume messages. isolation: {}", isolation); } @@ -289,7 +304,7 @@ public void txnOffsetTest(String topic, int messageCnt, boolean isCommit) throws if (records.isEmpty()) { msgCnt.decrementAndGet(); } else { - Assert.fail("The transaction was committed, the consumer shouldn't receive any more messages."); + fail("The transaction was committed, the consumer shouldn't receive any more messages."); } } else { for (ConsumerRecord record : records) { @@ -301,6 +316,341 @@ public void txnOffsetTest(String topic, int messageCnt, boolean isCommit) throws } } + @DataProvider(name = "basicRecoveryTestAfterTopicUnloadNumTransactions") + protected static Object[][] basicRecoveryTestAfterTopicUnloadNumTransactions() { + // isBatch + return new Object[][]{ + {0}, + {3}, + {5} + }; + } + + @Test(timeOut = 1000 * 20, dataProvider = "basicRecoveryTestAfterTopicUnloadNumTransactions") + public void basicRecoveryTestAfterTopicUnload(int numTransactionsBetweenSnapshots) throws Exception { + + String topicName = "basicRecoveryTestAfterTopicUnload_" + numTransactionsBetweenSnapshots; + String transactionalId = "myProducer_" + numTransactionsBetweenSnapshots; + String isolation = "read_committed"; + boolean isBatch = false; + + String namespace = TopicName.get(topicName).getNamespace(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + + producer.initTransactions(); + + int totalTxnCount = 10; + int messageCountPerTxn = 20; + + KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler) + pulsar.getProtocolHandlers().protocol("kafka"); + + String lastMessage = ""; + for (int txnIndex = 0; txnIndex < totalTxnCount; txnIndex++) { + producer.beginTransaction(); + + String contentBase; + if (txnIndex % 2 != 0) { + contentBase = "commit msg txnIndex %s messageIndex %s"; + } else { + contentBase = "abort msg txnIndex %s messageIndex %s"; + } + + for (int messageIndex = 0; messageIndex < messageCountPerTxn; messageIndex++) { + String msgContent = String.format(contentBase, txnIndex, messageIndex); + log.info("send txn message {}", msgContent); + lastMessage = msgContent; + if (isBatch) { + producer.send(new ProducerRecord<>(topicName, messageIndex, msgContent)); + } else { + producer.send(new ProducerRecord<>(topicName, messageIndex, msgContent)).get(); + } + } + producer.flush(); + + if (numTransactionsBetweenSnapshots > 0 + && (txnIndex % numTransactionsBetweenSnapshots) == 0) { + // force take snapshot + protocolHandler + .getReplicaManager() + .takeProducerStateSnapshots() + .get(); + } + + if (txnIndex % 2 != 0) { + producer.commitTransaction(); + } else { + producer.abortTransaction(); + } + } + + // unload the namespace, this will force a recovery + pulsar.getAdminClient().namespaces().unload(namespace); + + final int expected = totalTxnCount * messageCountPerTxn / 2; + consumeTxnMessage(topicName, expected, lastMessage, isolation); + } + + + @Test(timeOut = 1000 * 20, dataProvider = "basicRecoveryTestAfterTopicUnloadNumTransactions") + public void basicTestWithTopicUnload(int numTransactionsBetweenUnloads) throws Exception { + + String topicName = "basicRecoveryTestAfterTopicUnload_" + numTransactionsBetweenUnloads; + String transactionalId = "myProducer_" + numTransactionsBetweenUnloads; + String isolation = "read_committed"; + boolean isBatch = false; + + String namespace = TopicName.get(topicName).getNamespace(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + + producer.initTransactions(); + + int totalTxnCount = 10; + int messageCountPerTxn = 20; + + KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler) + pulsar.getProtocolHandlers().protocol("kafka"); + + String lastMessage = ""; + for (int txnIndex = 0; txnIndex < totalTxnCount; txnIndex++) { + producer.beginTransaction(); + + String contentBase; + if (txnIndex % 2 != 0) { + contentBase = "commit msg txnIndex %s messageIndex %s"; + } else { + contentBase = "abort msg txnIndex %s messageIndex %s"; + } + + for (int messageIndex = 0; messageIndex < messageCountPerTxn; messageIndex++) { + String msgContent = String.format(contentBase, txnIndex, messageIndex); + log.info("send txn message {}", msgContent); + lastMessage = msgContent; + if (isBatch) { + producer.send(new ProducerRecord<>(topicName, messageIndex, msgContent)); + } else { + producer.send(new ProducerRecord<>(topicName, messageIndex, msgContent)).get(); + } + } + producer.flush(); + + if (numTransactionsBetweenUnloads > 0 + && (txnIndex % numTransactionsBetweenUnloads) == 0) { + + // unload the namespace, this will force a recovery + pulsar.getAdminClient().namespaces().unload(namespace); + } + + if (txnIndex % 2 != 0) { + producer.commitTransaction(); + } else { + producer.abortTransaction(); + } + } + + + final int expected = totalTxnCount * messageCountPerTxn / 2; + consumeTxnMessage(topicName, expected, lastMessage, isolation); + } + + @DataProvider(name = "takeSnapshotBeforeRecovery") + protected static Object[][] takeSnapshotBeforeRecovery() { + // isBatch + return new Object[][]{ + {true}, + {false} + }; + } + + @Test(timeOut = 1000 * 20, dataProvider = "takeSnapshotBeforeRecovery") + public void basicRecoveryAbortedTransaction(boolean takeSnapshotBeforeRecovery) throws Exception { + + String topicName = "basicRecoveryAbortedTransaction_" + takeSnapshotBeforeRecovery; + String transactionalId = "myProducer"; + String isolation = "read_committed"; + + String namespace = TopicName.get(topicName).getNamespace(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + + producer.initTransactions(); + + KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler) + pulsar.getProtocolHandlers().protocol("kafka"); + + producer.beginTransaction(); + + String firstMessage = "aborted msg 1"; + + producer.send(new ProducerRecord<>(topicName, 0, firstMessage)).get(); + producer.flush(); + // force take snapshot + protocolHandler + .getReplicaManager() + .takeProducerStateSnapshots() + .get(); + + // recovery will re-process the topic from this point onwards + String secondMessage = "aborted msg 2"; + producer.send(new ProducerRecord<>(topicName, 0, secondMessage)).get(); + + producer.abortTransaction(); + + producer.beginTransaction(); + String lastMessage = "committed mgs"; + producer.send(new ProducerRecord<>(topicName, 0, lastMessage)).get(); + producer.send(new ProducerRecord<>(topicName, 0, lastMessage)).get(); + producer.commitTransaction(); + + if (takeSnapshotBeforeRecovery) { + protocolHandler + .getReplicaManager() + .takeProducerStateSnapshots() + .get(); + } + + // unload the namespace, this will force a recovery + pulsar.getAdminClient().namespaces().unload(namespace); + + consumeTxnMessage(topicName, 2, lastMessage, isolation); + } + + @Test(timeOut = 1000 * 20, dataProvider = "takeSnapshotBeforeRecovery") + public void basicRecoveryAbortedTransactionDueToProducerFenced(boolean takeSnapshotBeforeRecovery) + throws Exception { + + String topicName = "basicRecoveryAbortedTransactionDueToProducerFenced_" + takeSnapshotBeforeRecovery; + String transactionalId = "myProducer"; + String isolation = "read_committed"; + + String namespace = TopicName.get(topicName).getNamespace(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + + producer.initTransactions(); + + KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler) + pulsar.getProtocolHandlers().protocol("kafka"); + + producer.beginTransaction(); + + String firstMessage = "aborted msg 1"; + + producer.send(new ProducerRecord<>(topicName, 0, firstMessage)).get(); + producer.flush(); + // force take snapshot + protocolHandler + .getReplicaManager() + .takeProducerStateSnapshots() + .get(); + + // recovery will re-process the topic from this point onwards + String secondMessage = "aborted msg 2"; + producer.send(new ProducerRecord<>(topicName, 0, secondMessage)).get(); + + + @Cleanup + KafkaProducer producer2 = buildTransactionProducer(transactionalId); + producer2.initTransactions(); + + // the transaction is automatically aborted, because the first instance of the + // producer has been fenced + expectThrows(ProducerFencedException.class, () -> { + producer.commitTransaction(); + }); + + + producer2.beginTransaction(); + String lastMessage = "committed mgs"; + producer2.send(new ProducerRecord<>(topicName, 0, lastMessage)).get(); + producer2.send(new ProducerRecord<>(topicName, 0, lastMessage)).get(); + producer2.commitTransaction(); + + if (takeSnapshotBeforeRecovery) { + // force take snapshot + protocolHandler + .getReplicaManager() + .takeProducerStateSnapshots() + .get(); + } + + // unload the namespace, this will force a recovery + pulsar.getAdminClient().namespaces().unload(namespace); + + consumeTxnMessage(topicName, 2, lastMessage, isolation); + } + + + @Test(timeOut = 1000 * 20, dataProvider = "takeSnapshotBeforeRecovery") + public void basicRecoveryAbortedTransactionDueToProducerTimedOut(boolean takeSnapshotBeforeRecovery) + throws Exception { + + String topicName = "basicRecoveryAbortedTransactionDueToProducerTimedOut_" + takeSnapshotBeforeRecovery; + String transactionalId = "myProducer"; + String isolation = "read_committed"; + + String namespace = TopicName.get(topicName).getNamespace(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId, 1000); + + producer.initTransactions(); + + KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler) + pulsar.getProtocolHandlers().protocol("kafka"); + + producer.beginTransaction(); + + String firstMessage = "aborted msg 1"; + + producer.send(new ProducerRecord<>(topicName, 0, firstMessage)).get(); + producer.flush(); + // force take snapshot + protocolHandler + .getReplicaManager() + .takeProducerStateSnapshots() + .get(); + + // recovery will re-process the topic from this point onwards + String secondMessage = "aborted msg 2"; + producer.send(new ProducerRecord<>(topicName, 0, secondMessage)).get(); + + Thread.sleep(conf.getKafkaTransactionalIdExpirationMs() + 5000); + + // the transaction is automatically aborted, because of producer timeout + expectThrows(ProducerFencedException.class, () -> { + producer.commitTransaction(); + }); + + @Cleanup + KafkaProducer producer2 = buildTransactionProducer(transactionalId, 1000); + producer2.initTransactions(); + producer2.beginTransaction(); + String lastMessage = "committed mgs"; + producer2.send(new ProducerRecord<>(topicName, 0, lastMessage)).get(); + producer2.send(new ProducerRecord<>(topicName, 0, lastMessage)).get(); + producer2.commitTransaction(); + + if (takeSnapshotBeforeRecovery) { + // force take snapshot + protocolHandler + .getReplicaManager() + .takeProducerStateSnapshots() + .get(); + } + + // unload the namespace, this will force a recovery + pulsar.getAdminClient().namespaces().unload(namespace); + + consumeTxnMessage(topicName, 2, lastMessage, isolation); + } + private List prepareData(String sourceTopicName, String messageContent, int messageCount) throws ExecutionException, InterruptedException { @@ -333,7 +683,7 @@ private void waitForTxnMarkerWriteComplete(Map } private KafkaProducer buildTransactionProducer(String transactionalId) { + return buildTransactionProducer(transactionalId, -1); + } + + private KafkaProducer buildTransactionProducer(String transactionalId, int txTimeout) { Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaServerAdder()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 10); producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + if (txTimeout > 0) { + producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, txTimeout); + } else { + // very long time-out + producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 600 * 1000); + } addCustomizeProps(producerProps); return new KafkaProducer<>(producerProps); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionWithOAuthBearerAuthTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionWithOAuthBearerAuthTest.java index bf46f34d36..79e1a00c01 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionWithOAuthBearerAuthTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionWithOAuthBearerAuthTest.java @@ -14,6 +14,7 @@ package io.streamnative.pulsar.handlers.kop; import com.google.common.collect.Sets; +import io.streamnative.pulsar.handlers.kop.security.auth.KafkaMockAuthorizationProvider; import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler; import io.streamnative.pulsar.handlers.kop.security.oauth.OauthValidatorCallbackHandler; import java.net.URL; @@ -25,6 +26,7 @@ import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; @Slf4j public class TransactionWithOAuthBearerAuthTest extends TransactionTest { @@ -99,4 +101,19 @@ protected void addCustomizeProps(Properties properties) { )); } + @Test(enabled = false) + @Override + public void basicRecoveryAbortedTransactionDueToProducerTimedOut(boolean takeSnapshotBeforeRecovery) { + // this test is disabled in this suite because the token expires + } + + + public static class OauthMockAuthorizationProvider extends KafkaMockAuthorizationProvider { + + @Override + public boolean roleAuthorized(String role) { + return role.equals(ADMIN_USER); + } + } + } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinatorTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinatorTest.java index c37613ec1c..bbf5703fa4 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinatorTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinatorTest.java @@ -32,6 +32,7 @@ import io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler; import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; import io.streamnative.pulsar.handlers.kop.scala.Either; +import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch; import io.streamnative.pulsar.handlers.kop.utils.timer.MockTime; import java.util.Collections; @@ -134,7 +135,9 @@ protected void initializeState() { transactionManager, time, METADATA_NAMESPACE_PREFIX, - NAMESPACE_PREFIX); + NAMESPACE_PREFIX, + (config) -> new MemoryProducerStateManagerSnapshotBuffer() + ); result = null; error = Errors.NONE; capturedTxn = ArgumentCaptor.forClass(TransactionMetadata.class); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogTest.java index 65d3de08f6..55bec4293b 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogTest.java @@ -13,7 +13,10 @@ */ package io.streamnative.pulsar.handlers.kop.storage; +import static org.mockito.Mockito.mock; + import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; import java.nio.ByteBuffer; import java.util.Arrays; import lombok.extern.slf4j.Slf4j; @@ -46,7 +49,8 @@ public class PartitionLogTest { new TopicPartition("test", 1), "test", null, - new ProducerStateManager("test")); + mock(KafkaTopicLookupService.class), + new MemoryProducerStateManagerSnapshotBuffer()); @DataProvider(name = "compressionTypes") Object[] allCompressionTypes() { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerTest.java index 6b3123bd46..cd394b5515 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerTest.java @@ -46,6 +46,7 @@ public class ProducerStateManagerTest extends KopProtocolHandlerTestBase { private final Long producerId = 1L; private final MockTime time = new MockTime(); private ProducerStateManager stateManager; + private ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer; @BeforeClass @Override @@ -59,7 +60,8 @@ protected void setup() throws Exception { @BeforeMethod protected void setUp() { - stateManager = new ProducerStateManager(partition.toString()); + producerStateManagerSnapshotBuffer = new MemoryProducerStateManagerSnapshotBuffer(); + stateManager = new ProducerStateManager(partition.toString(), producerStateManagerSnapshotBuffer); } @AfterMethod @@ -208,7 +210,7 @@ public void testNonTransactionalAppendWithOngoingTransaction() { @Test(timeOut = defaultTestTimeout) public void testSequenceNotValidatedForGroupMetadataTopic() { TopicPartition partition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0); - stateManager = new ProducerStateManager(partition.toString()); + stateManager = new ProducerStateManager(partition.toString(), producerStateManagerSnapshotBuffer); short epoch = 0; append(stateManager, producerId, epoch, 99L, time.milliseconds(), true, PartitionLog.AppendOrigin.Coordinator);