Skip to content

Commit

Permalink
KAFKA-17593; [7/N] Introduce CoordinatorExecutor (apache#17823)
Browse files Browse the repository at this point in the history
This patch introduces the `CoordinatorExecutor` construct into the `CoordinatorRuntime`. It allows scheduling asynchronous tasks from within a `CoordinatorShard` while respecting the runtime semantic. It will be used to asynchronously resolve regular expressions.

The `GroupCoordinatorService` uses a default `ExecutorService` with a single thread to back it at the moment. It seems that it should be sufficient. In the future, we could consider making the number of threads configurable.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
  • Loading branch information
dajac authored Nov 19, 2024
1 parent a334b1b commit a211ee9
Show file tree
Hide file tree
Showing 10 changed files with 801 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;

import org.apache.kafka.common.KafkaException;

/**
* An interface to schedule and cancel asynchronous tasks. The TaskRunnable
* interface defines the tasks to be executed in the executor and the
* TaskOperation defines the operation scheduled to the runtime to
* process the output of the executed task.
*
* @param <T> The record type.
*/
public interface CoordinatorExecutor<T> {
/**
* The task's runnable.
*
* @param <R> The return type.
*/
interface TaskRunnable<R> {
R run() throws Throwable;
}

/**
* The task's write operation to handle the output
* of the task.
*
* @param <T> The record type.
* @param <R> The return type of the task.
*/
interface TaskOperation<T, R> {
CoordinatorResult<Void, T> onComplete(
R result,
Throwable exception
) throws KafkaException;
}

/**
* Schedule an asynchronous task. Note that only one task for a given key can
* be executed at the time.
*
* @param key The key to identify the task.
* @param task The task itself.
* @param operation The runtime operation to handle the output of the task.
* @return True if the task was scheduled; False otherwise.
*
* @param <R> The return type of the task.
*/
<R> boolean schedule(
String key,
TaskRunnable<R> task,
TaskOperation<T, R> operation
);

/**
* Return true if the key is associated to a task; false otherwise.
*
* @param key The key to identify the task.
* @return A boolean indicating whether the task is scheduled or not.
*/
boolean isScheduled(String key);

/**
* Cancel the given task
*
* @param key The key to identify the task.
*/
void cancel(String key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.utils.LogContext;

import org.slf4j.Logger;

import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

public class CoordinatorExecutorImpl<S extends CoordinatorShard<U>, U> implements CoordinatorExecutor<U> {
private static class TaskResult<R> {
final R result;
final Throwable exception;

TaskResult(
R result,
Throwable exception
) {
this.result = result;
this.exception = exception;
}
}

private final Logger log;
private final TopicPartition shard;
private final CoordinatorRuntime<S, U> runtime;
private final ExecutorService executor;
private final Duration writeTimeout;
private final Map<String, TaskRunnable<?>> tasks = new ConcurrentHashMap<>();

public CoordinatorExecutorImpl(
LogContext logContext,
TopicPartition shard,
CoordinatorRuntime<S, U> runtime,
ExecutorService executor,
Duration writeTimeout
) {
this.log = logContext.logger(CoordinatorExecutorImpl.class);
this.shard = shard;
this.runtime = runtime;
this.executor = executor;
this.writeTimeout = writeTimeout;
}

private <R> TaskResult<R> executeTask(TaskRunnable<R> task) {
try {
return new TaskResult<>(task.run(), null);
} catch (Throwable ex) {
return new TaskResult<>(null, ex);
}
}

@Override
public <R> boolean schedule(
String key,
TaskRunnable<R> task,
TaskOperation<U, R> operation
) {
// Put the task if the key is free. Otherwise, reject it.
if (tasks.putIfAbsent(key, task) != null) return false;

// Submit the task.
executor.submit(() -> {
// If the task associated with the key is not us, it means
// that the task was either replaced or cancelled. We stop.
if (tasks.get(key) != task) return;

// Execute the task.
final TaskResult<R> result = executeTask(task);

// Schedule the operation.
runtime.scheduleWriteOperation(
key,
shard,
writeTimeout,
coordinator -> {
// If the task associated with the key is not us, it means
// that the task was either replaced or cancelled. We stop.
if (!tasks.remove(key, task)) {
throw new RejectedExecutionException(String.format("Task %s was overridden or cancelled", key));
}

// Call the underlying write operation with the result of the task.
return operation.onComplete(result.result, result.exception);
}
).exceptionally(exception -> {
// Remove the task after a failure.
tasks.remove(key, task);

if (exception instanceof RejectedExecutionException) {
log.debug("The write event for the task {} was not executed because it was " +
"cancelled or overridden.", key);
} else if (exception instanceof NotCoordinatorException || exception instanceof CoordinatorLoadInProgressException) {
log.debug("The write event for the task {} failed due to {}. Ignoring it because " +
"the coordinator is not active.", key, exception.getMessage());
} else {
log.error("The write event for the task {} failed due to {}. Ignoring it. ",
key, exception.getMessage());
}

return null;
});
});

return true;
}

@Override
public boolean isScheduled(String key) {
return tasks.containsKey(key);
}

@Override
public void cancel(String key) {
tasks.remove(key);
}

public void cancelAll() {
Iterator<String> iterator = tasks.keySet().iterator();
while (iterator.hasNext()) {
iterator.remove();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -118,6 +119,7 @@ public static class Builder<S extends CoordinatorShard<U>, U> {
private Serializer<U> serializer;
private Compression compression;
private int appendLingerMs;
private ExecutorService executorService;

public Builder<S, U> withLogPrefix(String logPrefix) {
this.logPrefix = logPrefix;
Expand Down Expand Up @@ -189,6 +191,11 @@ public Builder<S, U> withAppendLingerMs(int appendLingerMs) {
return this;
}

public Builder<S, U> withExecutorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}

public CoordinatorRuntime<S, U> build() {
if (logPrefix == null)
logPrefix = "";
Expand Down Expand Up @@ -216,6 +223,8 @@ public CoordinatorRuntime<S, U> build() {
compression = Compression.NONE;
if (appendLingerMs < 0)
throw new IllegalArgumentException("AppendLinger must be >= 0");
if (executorService == null)
throw new IllegalArgumentException("ExecutorService must be set.");

return new CoordinatorRuntime<>(
logPrefix,
Expand All @@ -231,7 +240,8 @@ public CoordinatorRuntime<S, U> build() {
coordinatorMetrics,
serializer,
compression,
appendLingerMs
appendLingerMs,
executorService
);
}
}
Expand Down Expand Up @@ -551,6 +561,11 @@ class CoordinatorContext {
*/
final EventBasedCoordinatorTimer timer;

/**
* The coordinator executor.
*/
final CoordinatorExecutorImpl<S, U> executor;

/**
* The current state.
*/
Expand Down Expand Up @@ -603,6 +618,13 @@ private CoordinatorContext(
this.epoch = -1;
this.deferredEventQueue = new DeferredEventQueue(logContext);
this.timer = new EventBasedCoordinatorTimer(tp, logContext);
this.executor = new CoordinatorExecutorImpl<>(
logContext,
tp,
CoordinatorRuntime.this,
executorService,
defaultWriteTimeout
);
this.bufferSupplier = new BufferSupplier.GrowableBufferSupplier();
}

Expand Down Expand Up @@ -633,6 +655,7 @@ private void transitionTo(
.withSnapshotRegistry(snapshotRegistry)
.withTime(time)
.withTimer(timer)
.withExecutor(executor)
.withCoordinatorMetrics(coordinatorMetrics)
.withTopicPartition(tp)
.build(),
Expand Down Expand Up @@ -714,6 +737,7 @@ private void unload() {
highWatermarklistener = null;
}
timer.cancelAll();
executor.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
Expand Down Expand Up @@ -1899,6 +1923,12 @@ public void onHighWatermarkUpdated(
*/
private final int appendLingerMs;

/**
* The executor service used by the coordinator runtime to schedule
* asynchronous tasks.
*/
private final ExecutorService executorService;

/**
* Atomic boolean indicating whether the runtime is running.
*/
Expand Down Expand Up @@ -1926,6 +1956,7 @@ public void onHighWatermarkUpdated(
* @param serializer The serializer.
* @param compression The compression codec.
* @param appendLingerMs The append linger time in ms.
* @param executorService The executor service.
*/
@SuppressWarnings("checkstyle:ParameterNumber")
private CoordinatorRuntime(
Expand All @@ -1942,7 +1973,8 @@ private CoordinatorRuntime(
CoordinatorMetrics coordinatorMetrics,
Serializer<U> serializer,
Compression compression,
int appendLingerMs
int appendLingerMs,
ExecutorService executorService
) {
this.logPrefix = logPrefix;
this.logContext = logContext;
Expand All @@ -1960,6 +1992,7 @@ private CoordinatorRuntime(
this.serializer = serializer;
this.compression = compression;
this.appendLingerMs = appendLingerMs;
this.executorService = executorService;
}

/**
Expand Down Expand Up @@ -2423,6 +2456,7 @@ public void close() throws Exception {
}
});
coordinators.clear();
executorService.shutdown();
Utils.closeQuietly(runtimeMetrics, "runtime metrics");
log.info("Coordinator runtime closed.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ CoordinatorShardBuilder<S, U> withTimer(
CoordinatorTimer<Void, U> timer
);

/**
* Sets the coordinator executor.
*
* @param executor The coordinator executor.
*
* @return The builder.
*/
CoordinatorShardBuilder<S, U> withExecutor(
CoordinatorExecutor<U> executor
);

/**
* Sets the coordinator metrics.
*
Expand Down
Loading

0 comments on commit a211ee9

Please sign in to comment.