Skip to content

Commit

Permalink
[Transaction] Support producer state manager
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored and Demogorgon314 committed May 29, 2023
1 parent 0d2ed74 commit 49d3dc7
Show file tree
Hide file tree
Showing 30 changed files with 1,663 additions and 296 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public boolean tryComplete() {
for (Map.Entry<TopicPartition, PartitionLog.ReadRecordsResult> entry : readRecordsResult.entrySet()) {
TopicPartition tp = entry.getKey();
PartitionLog.ReadRecordsResult result = entry.getValue();
PartitionLog partitionLog = replicaManager.getPartitionLog(tp, context.getNamespacePrefix());
PartitionLog partitionLog = result.partitionLog();
PositionImpl currLastPosition = (PositionImpl) partitionLog.getLastPosition(context.getTopicManager());
if (currLastPosition.compareTo(PositionImpl.EARLIEST) == 0) {
HAS_ERROR_UPDATER.set(this, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.streamnative.pulsar.handlers.kop.schemaregistry.SchemaRegistryChannelInitializer;
import io.streamnative.pulsar.handlers.kop.stats.PrometheusMetricsProvider;
import io.streamnative.pulsar.handlers.kop.stats.StatsLogger;
import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer;
import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManagerSnapshotBuffer;
import io.streamnative.pulsar.handlers.kop.storage.ReplicaManager;
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
Expand All @@ -45,8 +47,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedScheduler;
Expand Down Expand Up @@ -87,6 +91,8 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
private LookupClient lookupClient;

private KafkaTopicLookupService kafkaTopicLookupService;
@VisibleForTesting
@Getter
private Map<InetSocketAddress, ChannelInitializer<SocketChannel>> channelInitializerMap;
Expand All @@ -110,6 +116,8 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
private MigrationManager migrationManager;
private ReplicaManager replicaManager;

private ScheduledFuture<?> txnProducerStateSnapshotsTimeHandle;

private final Map<String, GroupCoordinator> groupCoordinatorsByTenant = new ConcurrentHashMap<>();
private final Map<String, TransactionCoordinator> transactionCoordinatorByTenant = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -282,6 +290,16 @@ private void invalidatePartitionLog(TopicName topicName) {
schemaRegistryManager = new SchemaRegistryManager(kafkaConfig, brokerService.getPulsar(),
brokerService.getAuthenticationService());
migrationManager = new MigrationManager(kafkaConfig, brokerService.getPulsar());

if (kafkaConfig.isKafkaTransactionCoordinatorEnabled()
&& kafkaConfig.getKafkaTxnProducerStateTopicSnapshotIntervalSeconds() > 0) {
txnProducerStateSnapshotsTimeHandle = service.getPulsar().getExecutor().scheduleWithFixedDelay(() -> {
getReplicaManager().takeProducerStateSnapshots();
},
kafkaConfig.getKafkaTxnProducerStateTopicSnapshotIntervalSeconds(),
kafkaConfig.getKafkaTxnProducerStateTopicSnapshotIntervalSeconds(),
TimeUnit.SECONDS);
}
}

private TransactionCoordinator createAndBootTransactionCoordinator(String tenant) {
Expand Down Expand Up @@ -411,6 +429,20 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi
lookupClient);
}

class ProducerStateManagerSnapshotProvider implements Function<String, ProducerStateManagerSnapshotBuffer> {
@Override
public ProducerStateManagerSnapshotBuffer apply(String tenant) {
if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
return new MemoryProducerStateManagerSnapshotBuffer();
}
return getTransactionCoordinator(tenant)
.getProducerStateManagerSnapshotBuffer();
}
}

private Function<String, ProducerStateManagerSnapshotBuffer> getProducerStateManagerSnapshotBufferByTenant =
new ProducerStateManagerSnapshotProvider();

// this is called after initialize, and with kafkaConfig, brokerService all set.
@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
Expand All @@ -426,13 +458,18 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
.timeoutTimer(SystemTimer.builder().executorName("fetch").build())
.build();

kafkaTopicLookupService = new KafkaTopicLookupService(brokerService);

replicaManager = new ReplicaManager(
kafkaConfig,
requestStats,
Time.SYSTEM,
brokerService.getEntryFilterProvider().getBrokerEntryFilters(),
producePurgatory,
fetchPurgatory);
fetchPurgatory,
kafkaTopicLookupService,
getProducerStateManagerSnapshotBufferByTenant
);

try {
ImmutableMap.Builder<InetSocketAddress, ChannelInitializer<SocketChannel>> builder =
Expand Down Expand Up @@ -462,6 +499,19 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti

@Override
public void close() {
if (txnProducerStateSnapshotsTimeHandle != null) {
txnProducerStateSnapshotsTimeHandle.cancel(false);
}

if (offsetTopicClient != null) {
offsetTopicClient.close();
}
if (txnTopicClient != null) {
txnTopicClient.close();
}
if (adminManager != null) {
adminManager.shutdown();
}
if (producePurgatory != null) {
producePurgatory.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
private static final int OffsetsMessageTTL = 3 * 24 * 3600;
// txn configuration
public static final int DefaultTxnLogTopicNumPartitions = 50;
public static final int DefaultTxnProducerStateLogTopicNumPartitions = 8;
public static final int DefaultTxnCoordinatorSchedulerNum = 1;
public static final int DefaultTxnStateManagerSchedulerNum = 1;
public static final long DefaultAbortTimedOutTransactionsIntervalMs = TimeUnit.SECONDS.toMillis(10);
Expand Down Expand Up @@ -407,6 +408,13 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private boolean kafkaTransactionProducerIdsStoredOnPulsar = false;

@FieldContext(
category = CATEGORY_KOP,
required = true,
doc = "The namespace used for storing Kafka Producer Id"
)
private String kafkaTransactionProducerIdsNamespace = "__kafka_producerid";

@FieldContext(
category = CATEGORY_KOP_TRANSACTION,
doc = "Flag to enable transaction coordinator"
Expand All @@ -419,6 +427,18 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private int kafkaTxnLogTopicNumPartitions = DefaultTxnLogTopicNumPartitions;

@FieldContext(
category = CATEGORY_KOP_TRANSACTION,
doc = "Number of partitions for the transaction producer state topic"
)
private int kafkaTxnProducerStateTopicNumPartitions = DefaultTxnProducerStateLogTopicNumPartitions;

@FieldContext(
category = CATEGORY_KOP_TRANSACTION,
doc = "Interval for taking snapshots of the status of pending transactions"
)
private int kafkaTxnProducerStateTopicSnapshotIntervalSeconds = 60;

@FieldContext(
category = CATEGORY_KOP_TRANSACTION,
doc = "The interval in milliseconds at which to rollback transactions that have timed out."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
public class KafkaTopicConsumerManager implements Closeable {

private final PersistentTopic topic;
private final KafkaRequestHandler requestHandler;

private final AtomicBoolean closed = new AtomicBoolean(false);

Expand All @@ -67,13 +66,21 @@ public class KafkaTopicConsumerManager implements Closeable {

private final boolean skipMessagesWithoutIndex;

private final String description;

KafkaTopicConsumerManager(KafkaRequestHandler requestHandler, PersistentTopic topic) {
this(requestHandler.ctx.channel() + "",
requestHandler.isSkipMessagesWithoutIndex(),
topic);
}

public KafkaTopicConsumerManager(String description, boolean skipMessagesWithoutIndex, PersistentTopic topic) {
this.topic = topic;
this.cursors = new ConcurrentHashMap<>();
this.createdCursors = new ConcurrentHashMap<>();
this.lastAccessTimes = new ConcurrentHashMap<>();
this.requestHandler = requestHandler;
this.skipMessagesWithoutIndex = requestHandler.isSkipMessagesWithoutIndex();
this.description = description;
this.skipMessagesWithoutIndex = skipMessagesWithoutIndex;
}

// delete expired cursors, so backlog can be cleared.
Expand All @@ -96,7 +103,7 @@ void deleteOneExpiredCursor(long offset) {
if (cursorFuture != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor timed out for offset: {}, cursors cache size: {}",
requestHandler.ctx.channel(), offset, cursors.size());
description, offset, cursors.size());
}

// TODO: Should we just cancel this future?
Expand All @@ -118,14 +125,14 @@ public void deleteOneCursorAsync(ManagedCursor cursor, String reason) {
public void deleteCursorComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} for topic {} deleted successfully for reason: {}.",
requestHandler.ctx.channel(), cursor.getName(), topic.getName(), reason);
description, cursor.getName(), topic.getName(), reason);
}
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Error deleting cursor {} for topic {} for reason: {}.",
requestHandler.ctx.channel(), cursor.getName(), topic.getName(), reason, exception);
description, cursor.getName(), topic.getName(), reason, exception);
}
}, null);
createdCursors.remove(cursor.getName());
Expand All @@ -148,7 +155,7 @@ public CompletableFuture<Pair<ManagedCursor, Long>> removeCursorFuture(long offs

if (log.isDebugEnabled()) {
log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}",
requestHandler.ctx.channel(), offset, cursors.size());
description, offset, cursors.size());
}
return cursorFuture;
}
Expand Down Expand Up @@ -182,7 +189,7 @@ public void add(long offset, Pair<ManagedCursor, Long> pair) {

if (log.isDebugEnabled()) {
log.debug("[{}] Add cursor back {} for offset: {}",
requestHandler.ctx.channel(), pair.getLeft().getName(), offset);
description, pair.getLeft().getName(), offset);
}
}

Expand All @@ -194,7 +201,7 @@ public void close() {
}
if (log.isDebugEnabled()) {
log.debug("[{}] Close TCM for topic {}.",
requestHandler.ctx.channel(), topic.getName());
description, topic.getName());
}
final List<CompletableFuture<Pair<ManagedCursor, Long>>> cursorFuturesToClose = new ArrayList<>();
cursors.forEach((ignored, cursorFuture) -> cursorFuturesToClose.add(cursorFuture));
Expand Down Expand Up @@ -231,7 +238,7 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
if (((ManagedLedgerImpl) ledger).getState() == ManagedLedgerImpl.State.Closed) {
log.error("[{}] Async get cursor for offset {} for topic {} failed, "
+ "because current managedLedger has been closed",
requestHandler.ctx.channel(), offset, topic.getName());
description, offset, topic.getName());
CompletableFuture<Pair<ManagedCursor, Long>> future = new CompletableFuture<>();
future.completeExceptionally(new Exception("Current managedLedger for "
+ topic.getName() + " has been closed."));
Expand All @@ -250,7 +257,7 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
final PositionImpl previous = ((ManagedLedgerImpl) ledger).getPreviousPosition((PositionImpl) position);
if (log.isDebugEnabled()) {
log.debug("[{}] Create cursor {} for offset: {}. position: {}, previousPosition: {}",
requestHandler.ctx.channel(), cursorName, offset, position, previous);
description, cursorName, offset, position, previous);
}
try {
final ManagedCursor newCursor = ledger.newNonDurableCursor(previous, cursorName);
Expand All @@ -259,7 +266,7 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
return Pair.of(newCursor, offset);
} catch (ManagedLedgerException e) {
log.error("[{}] Error new cursor for topic {} at offset {} - {}. will cause fetch data error.",
requestHandler.ctx.channel(), topic.getName(), offset, previous, e);
description, topic.getName(), offset, previous, e);
return null;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.streamnative.pulsar.handlers.kop;

import io.netty.channel.Channel;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.NonNull;
Expand All @@ -37,7 +36,7 @@ public KafkaTopicLookupService(BrokerService brokerService) {
}

// A wrapper of `BrokerService#getTopic` that is to find the topic's associated `PersistentTopic` instance
public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName, Channel channel) {
public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName, Object requestor) {
CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture = new CompletableFuture<>();
brokerService.getTopicIfExists(topicName).whenComplete((t2, throwable) -> {
TopicName topicNameObject = TopicName.get(topicName);
Expand All @@ -47,7 +46,7 @@ public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName, C
if (topicNameObject.getPartitionIndex() == 0) {
log.warn("Get partition-0 error [{}].", throwable.getMessage());
} else {
handleGetTopicException(topicName, topicCompletableFuture, throwable, channel);
handleGetTopicException(topicName, topicCompletableFuture, throwable, requestor);
return;
}
}
Expand All @@ -60,11 +59,11 @@ public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName, C
String nonPartitionedTopicName = topicNameObject.getPartitionedTopicName();
if (log.isDebugEnabled()) {
log.debug("[{}]Try to get non-partitioned topic for name {}",
channel, nonPartitionedTopicName);
requestor, nonPartitionedTopicName);
}
brokerService.getTopicIfExists(nonPartitionedTopicName).whenComplete((nonPartitionedTopic, ex) -> {
if (ex != null) {
handleGetTopicException(nonPartitionedTopicName, topicCompletableFuture, ex, channel);
handleGetTopicException(nonPartitionedTopicName, topicCompletableFuture, ex, requestor);
// Failed to getTopic from current broker, remove non-partitioned topic cache,
// which added in getTopicBroker.
KopBrokerLookupManager.removeTopicManagerCache(nonPartitionedTopicName);
Expand All @@ -75,14 +74,14 @@ public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName, C
topicCompletableFuture.complete(Optional.of(persistentTopic));
} else {
log.error("[{}]Get empty non-partitioned topic for name {}",
channel, nonPartitionedTopicName);
requestor, nonPartitionedTopicName);
KopBrokerLookupManager.removeTopicManagerCache(nonPartitionedTopicName);
topicCompletableFuture.complete(Optional.empty());
}
});
return;
}
log.error("[{}]Get empty topic for name {}", channel, topicName);
log.error("[{}]Get empty topic for name {}", requestor, topicName);
KopBrokerLookupManager.removeTopicManagerCache(topicName);
topicCompletableFuture.complete(Optional.empty());
});
Expand All @@ -93,15 +92,15 @@ private void handleGetTopicException(@NonNull final String topicName,
@NonNull
final CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture,
@NonNull final Throwable ex,
@NonNull final Channel channel) {
@NonNull final Object requestor) {
// The ServiceUnitNotReadyException is retryable, so we should print a warning log instead of error log
if (ex instanceof BrokerServiceException.ServiceUnitNotReadyException) {
log.warn("[{}] Failed to getTopic {}: {}",
channel, topicName, ex.getMessage());
requestor, topicName, ex.getMessage());
topicCompletableFuture.complete(Optional.empty());
} else {
log.error("[{}] Failed to getTopic {}. exception:",
channel, topicName, ex);
requestor, topicName, ex);
topicCompletableFuture.completeExceptionally(ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static GroupCoordinator of(
Time time
) {
ScheduledExecutorService coordinatorExecutor = OrderedScheduler.newSchedulerBuilder()
.name("group-coordinator-executor")
.name("group-coordinator-executor-" + tenant)
.numThreads(1)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public class TransactionConfig {

public static final String DefaultTransactionMetadataTopicName = "public/default/__transaction_state";
public static final String DefaultProducerStateSnapshotTopicName = "public/default/__transaction_state";
public static final String DefaultProducerIdTopicName = "public/default/__transaction_producerid_generator";
public static final long DefaultTransactionsMaxTimeoutMs = TimeUnit.MINUTES.toMillis(15);
public static final long DefaultTransactionalIdExpirationMs = TimeUnit.DAYS.toMillis(7);
Expand All @@ -42,6 +43,8 @@ public class TransactionConfig {
@Default
private String transactionMetadataTopicName = DefaultTransactionMetadataTopicName;
@Default
private String transactionProducerStateSnapshotTopicName = DefaultProducerStateSnapshotTopicName;
@Default
private long transactionMaxTimeoutMs = DefaultTransactionsMaxTimeoutMs;
@Default
private long transactionalIdExpirationMs = DefaultTransactionalIdExpirationMs;
Expand Down
Loading

0 comments on commit 49d3dc7

Please sign in to comment.