Skip to content

Commit

Permalink
MINOR: improve JavaDocs for Kafka Streams exceptions and error handle…
Browse files Browse the repository at this point in the history
…rs (apache#17856)

Reviewers: Bill Bejeck <bill@confluent.io>
  • Loading branch information
mjsax authored Nov 21, 2024
1 parent 2519e4a commit 240efbb
Show file tree
Hide file tree
Showing 26 changed files with 170 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*/
package org.apache.kafka.streams.errors;


/**
* Indicates that none of the specified {@link org.apache.kafka.streams.StreamsConfig#BOOTSTRAP_SERVERS_CONFIG brokers}
* could be found.
*
* @see org.apache.kafka.streams.StreamsConfig
*/
@SuppressWarnings("unused")
public class BrokerNotFoundException extends StreamsException {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* happens while attempting to produce result records.
*/
public class DefaultProductionExceptionHandler implements ProductionExceptionHandler {
@SuppressWarnings("deprecation")
@Deprecated
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,20 @@ public interface DeserializationExceptionHandler extends Configurable {

/**
* Inspect a record and the exception received.
* <p>
* Note, that the passed in {@link ProcessorContext} only allows to access metadata like the task ID.
*
* <p> Note, that the passed in {@link ProcessorContext} only allows to access metadata like the task ID.
* However, it cannot be used to emit records via {@link ProcessorContext#forward(Object, Object)};
* calling {@code forward()} (and some other methods) would result in a runtime exception.
*
* @param context processor context
* @param record record that failed deserialization
* @param exception the actual exception
* @param context
* Processor context.
* @param record
* Record that failed deserialization.
* @param exception
* The actual exception.
*
* @return Whether to continue or stop processing.
*
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} instead.
*/
@Deprecated
Expand All @@ -49,9 +55,14 @@ default DeserializationHandlerResponse handle(final ProcessorContext context,
/**
* Inspect a record and the exception received.
*
* @param context error handler context
* @param record record that failed deserialization
* @param exception the actual exception
* @param context
* Error handler context.
* @param record
* Record that failed deserialization.
* @param exception
* The actual exception.
*
* @return Whether to continue or stop processing.
*/
default DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
Expand All @@ -63,15 +74,19 @@ default DeserializationHandlerResponse handle(final ErrorHandlerContext context,
* Enumeration that describes the response from the exception handler.
*/
enum DeserializationHandlerResponse {
/* continue with processing */
/** Continue processing. */
CONTINUE(0, "CONTINUE"),
/* fail the processing and stop */
/** Fail processing. */
FAIL(1, "FAIL");

/** an english description of the api--this is for debugging and can change */
/**
* An english description for the used option. This is for debugging only and may change.
*/
public final String name;

/** the permanent and immutable id of an API--this can't change ever */
/**
* The permanent and immutable id for the used option. This can't change ever.
*/
public final int id;

DeserializationHandlerResponse(final int id, final String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;


/**
* This interface allows user code to inspect the context of a record that has failed during processing.
*
Expand All @@ -48,7 +47,7 @@ public interface ErrorHandlerContext {
* Additionally, when writing into a changelog topic, there is no associated input record,
* and thus no topic name is available.
*
* @return the topic name
* @return The topic name.
*/
String topic();

Expand All @@ -66,7 +65,7 @@ public interface ErrorHandlerContext {
* Additionally, when writing into a changelog topic, there is no associated input record,
* and thus no partition is available.
*
* @return the partition ID
* @return The partition ID.
*/
int partition();

Expand All @@ -84,7 +83,7 @@ public interface ErrorHandlerContext {
* Additionally, when writing into a changelog topic, there is no associated input record,
* and thus no offset is available.
*
* @return the offset
* @return The offset.
*/
long offset();

Expand All @@ -102,21 +101,21 @@ public interface ErrorHandlerContext {
* Additionally, when writing into a changelog topic, there is no associated input record,
* and thus no headers are available.
*
* @return the headers
* @return The headers.
*/
Headers headers();

/**
* Return the current processor node ID.
*
* @return the processor node ID
* @return The processor node ID.
*/
String processorNodeId();

/**
* Return the task ID.
*
* @return the task ID
* @return The task ID.
*/
TaskId taskId();

Expand All @@ -138,14 +137,14 @@ public interface ErrorHandlerContext {
* if this method is invoked from the punctuate call):
* <ul>
* <li>In case of {@link PunctuationType#STREAM_TIME} timestamp is defined as the current task's stream time,
* which is defined as the largest timestamp of any record processed by the task
* <li>In case of {@link PunctuationType#WALL_CLOCK_TIME} timestamp is defined the current system time
* which is defined as the largest timestamp of any record processed by the task</li>
* <li>In case of {@link PunctuationType#WALL_CLOCK_TIME} timestamp is defined the current system time</li>
* </ul>
*
* <p> If it is triggered from a deserialization failure, timestamp is defined as the timestamp of the
* current rawRecord {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
*
* @return the timestamp
* @return The timestamp.
*/
long timestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
*/
package org.apache.kafka.streams.errors;


/**
* Indicates that there was a problem when trying to access a {@link org.apache.kafka.streams.processor.StateStore StateStore}.
* {@code InvalidStateStoreException} is not thrown directly but only its following sub-classes.
* {@code InvalidStateStoreException} is not thrown directly but only its following subclasses.
*/
public class InvalidStateStoreException extends StreamsException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
/**
* Indicates that the specific state store being queried via
* {@link org.apache.kafka.streams.StoreQueryParameters} used a partitioning that is not assigned to this instance.
* You can use {@link KafkaStreams#metadataForAllStreamsClients()} to discover the correct instance that hosts the requested partition.
* You can use {@link KafkaStreams#metadataForAllStreamsClients()} to discover the correct instance
* that hosts the requested partition.
*/
public class InvalidStateStorePartitionException extends InvalidStateStoreException {

Expand All @@ -31,6 +32,7 @@ public InvalidStateStorePartitionException(final String message) {
super(message);
}

@SuppressWarnings("unused")
public InvalidStateStorePartitionException(final String message, final Throwable throwable) {
super(message, throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.errors;


/**
* Indicates that the state store directory lock could not be acquired because another thread holds the lock.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.errors;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;

Expand All @@ -32,16 +31,21 @@
public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);

@SuppressWarnings("deprecation")
@Deprecated
@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

log.warn("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);
log.warn(
"Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(),
record.topic(),
record.partition(),
record.offset(),
exception
);

return DeserializationHandlerResponse.CONTINUE;
}
Expand All @@ -51,10 +55,14 @@ public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

log.warn("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);
log.warn(
"Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(),
record.topic(),
record.partition(),
record.offset(),
exception
);

return DeserializationHandlerResponse.CONTINUE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ public class LogAndContinueProcessingExceptionHandler implements ProcessingExcep

@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
log.warn("Exception caught during message processing, " +
"processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(),
exception);
log.warn(
"Exception caught during message processing, processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(),
context.taskId(),
context.topic(),
context.partition(),
context.offset(),
exception
);

return ProcessingHandlerResponse.CONTINUE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,28 @@

import java.util.Map;


/**
* Deserialization handler that logs a deserialization exception and then
* signals the processing pipeline to stop processing more records and fail.
*/
public class LogAndFailExceptionHandler implements DeserializationExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class);

@Override
@SuppressWarnings("deprecation")
@Deprecated
@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

log.error("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);
log.error(
"Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(),
record.topic(),
record.partition(),
record.offset(),
exception
);

return DeserializationHandlerResponse.FAIL;
}
Expand All @@ -51,10 +55,14 @@ public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

log.error("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);
log.error(
"Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(),
record.topic(),
record.partition(),
record.offset(),
exception
);

return DeserializationHandlerResponse.FAIL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ public class LogAndFailProcessingExceptionHandler implements ProcessingException

@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
log.error("Exception caught during message processing, " +
"processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(),
exception);
log.error(
"Exception caught during message processing, processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(),
context.taskId(),
context.topic(),
context.partition(),
context.offset(),
exception
);

return ProcessingHandlerResponse.FAIL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,30 @@ public interface ProcessingExceptionHandler extends Configurable {
/**
* Inspect a record and the exception received
*
* @param context processing context metadata
* @param record record where the exception occurred
* @param exception the actual exception
* @param context
* Processing context metadata.
* @param record
* Record where the exception occurred.
* @param exception
* The actual exception.
*
* @return Whether to continue or stop processing.
*/
ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception);

enum ProcessingHandlerResponse {
/* continue with processing */
/** Continue processing. */
CONTINUE(1, "CONTINUE"),
/* fail the processing and stop */
/** Fail processing. */
FAIL(2, "FAIL");

/**
* the permanent and immutable name of processing exception response
* An english description for the used option. This is for debugging only and may change.
*/
public final String name;

/**
* the permanent and immutable id of processing exception response
* The permanent and immutable id for the used option. This can't change ever.
*/
public final int id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.errors;


/**
* Indicates a processor state operation (e.g. put, get) has failed.
*
Expand Down
Loading

0 comments on commit 240efbb

Please sign in to comment.