diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java index 438dab1820..97cf2cb469 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/CloudEventMutator.java @@ -19,7 +19,7 @@ import java.util.function.Function; /** - * A CloudEventMutator mutates a given CloudEvent. + * A CloudEventMutator mutates a given CloudEvent */ @FunctionalInterface public interface CloudEventMutator extends Function {} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java index 6313fcc0a4..a64521d410 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java @@ -23,7 +23,7 @@ /** * {@link RecordDispatcherMutatorChain} chains {@link RecordDispatcher}s and applies mutations using a provided - * {@link CloudEventMutator} before passing the {@link KafkaConsumerRecord} to the next {@link RecordDispatcher}. + * {@link CloudEventMutator} before passing the {@link ConsumerRecord} to the next {@link RecordDispatcher}. */ public class RecordDispatcherMutatorChain implements RecordDispatcher { diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/NullCloudEventInterceptor.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/NullCloudEventInterceptor.java new file mode 100644 index 0000000000..12ab37e476 --- /dev/null +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/NullCloudEventInterceptor.java @@ -0,0 +1,87 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; + +import io.cloudevents.CloudEvent; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NullCloudEventInterceptor implements ConsumerInterceptor { + + private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); + + private static final Logger logger = LoggerFactory.getLogger(NullCloudEventInterceptor.class); + + @Override + public ConsumerRecords onConsume(ConsumerRecords records) { + if (records.isEmpty()) { + return records; + } + final Map>> validRecords = + new HashMap<>(records.count()); + for (final var record : records) { + final var tp = new TopicPartition(record.topic(), record.partition()); + if (!validRecords.containsKey(tp)) { + validRecords.put(tp, new ArrayList<>()); + } + validRecords.get(tp).add(maybeDeserializeRecord(record)); + } + + return new ConsumerRecords<>(validRecords); + } + + @Override + public void onCommit(Map map) { + // Intentionally left blank + } + + @Override + public void close() { + // Intentionally left blank + } + + @Override + public void configure(Map map) { + logger.info("NullCloudEventInterceptor configured"); + } + + private ConsumerRecord maybeDeserializeRecord(ConsumerRecord record) { + if (record.value() != null) { + return record; + } + // A valid CloudEvent in the CE binary protocol binding of Kafka + // might be composed by only Headers. + // + // KafkaConsumer doesn't call the deserializer if the value + // is null. + // + // That means that we get a record with a null value and some CE + // headers even though the record is a valid CloudEvent. + logger.debug("deserializing null record"); + return KafkaConsumerRecordUtils.copyRecordAssigningValue( + record, cloudEventDeserializer.deserialize(record.topic(), record.headers(), null)); + } +} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java index 01a608b26e..3fc667d6b9 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java @@ -31,6 +31,7 @@ import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.NullCloudEventInterceptor; import io.cloudevents.kafka.CloudEventSerializer; import io.cloudevents.kafka.PartitionKeyExtensionInterceptor; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -81,7 +82,9 @@ public static void start( Properties consumerConfig = Configurations.readPropertiesSync(env.getConsumerConfigFilePath()); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KeyDeserializer.class.getName()); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class.getName()); - consumerConfig.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, InvalidCloudEventInterceptor.class.getName()); + consumerConfig.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + NullCloudEventInterceptor.class.getName() + "," + InvalidCloudEventInterceptor.class.getName()); // Read WebClient config JsonObject webClientConfig = Configurations.readPropertiesAsJsonSync(env.getWebClientConfigFilePath()); diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/main/InterceptorChainTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/main/InterceptorChainTest.java new file mode 100644 index 0000000000..5684174820 --- /dev/null +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/main/InterceptorChainTest.java @@ -0,0 +1,268 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.knative.eventing.kafka.broker.dispatcher.main; + +import static org.assertj.core.api.Assertions.assertThat; + +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.NullCloudEventInterceptor; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.data.BytesCloudEventData; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; + +public class InterceptorChainTest { + public static class InterceptorChain { + InvalidCloudEventInterceptor invalidCloudEventInterceptor; + NullCloudEventInterceptor nullCloudEventInterceptor; + + Map configs; + + public InterceptorChain() { + this.invalidCloudEventInterceptor = new InvalidCloudEventInterceptor(); + this.nullCloudEventInterceptor = new NullCloudEventInterceptor(); + + this.configs = new HashMap<>(); + withKindPlural(configs); + withSourceName(configs); + withSourceNamespace(configs); + withEnabled(configs); + + this.invalidCloudEventInterceptor.configure(this.configs); + } + + public Map getConfigs() { + return this.configs; + } + + public ConsumerRecords onConsume(final ConsumerRecords records) { + return this.invalidCloudEventInterceptor.onConsume(this.nullCloudEventInterceptor.onConsume(records)); + } + + private static void withKindPlural(final Map configs) { + with(configs, InvalidCloudEventInterceptor.KIND_PLURAL_CONFIG, "kafkasources"); + } + + private static void withSourceName(final Map configs) { + with(configs, InvalidCloudEventInterceptor.SOURCE_NAME_CONFIG, "ks"); + } + + private static void withSourceNamespace(final Map configs) { + with(configs, InvalidCloudEventInterceptor.SOURCE_NAMESPACE_CONFIG, "knative-ns"); + } + + private static void withEnabled(final Map configs) { + with(configs, CloudEventDeserializer.INVALID_CE_WRAPPER_ENABLED, "true"); + } + + private static void with(final Map configs, final String key, final T p) { + configs.put(key, p); + } + } + + @Test + public void shouldTransformEventsToValidEventsHandlingNullValues() { + var interceptor = new InterceptorChain(); + + var input = mockInvalidRecords(); + + var configs = interceptor.getConfigs(); + + var got = interceptor.onConsume(input); + + for (final var r : input) { + var tp = new TopicPartition(r.topic(), r.partition()); + var inputRecords = input.records(tp); + var expected = new ArrayList>(); + for (var i : inputRecords) { + var value = CloudEventBuilder.v1() + .withId(String.format("partition:%d/offset:%d", i.partition(), i.offset())) + .withTime(OffsetDateTime.ofInstant(Instant.ofEpochMilli(i.timestamp()), ZoneId.of("UTC"))) + .withSource(URI.create(String.format( + "/apis/v1/namespaces/%s/%s/%s#%s", + configs.get(InvalidCloudEventInterceptor.SOURCE_NAMESPACE_CONFIG), + configs.get(InvalidCloudEventInterceptor.KIND_PLURAL_CONFIG), + configs.get(InvalidCloudEventInterceptor.SOURCE_NAME_CONFIG), + i.topic()))) + .withSubject(String.format("partition:%d#%d", i.partition(), i.offset())) + .withType("dev.knative.kafka.event"); + + if (i.value() != null) { + value.withData(BytesCloudEventData.wrap(new byte[] {1})); + } + + i.headers() + .forEach(h -> value.withExtension( + "kafkaheader" + h.key(), new String(h.value(), StandardCharsets.UTF_8))); + + setKeys(value, i); + + expected.add(new ConsumerRecord<>( + i.topic(), + i.partition(), + i.offset(), + i.timestamp(), + i.timestampType(), + i.serializedKeySize(), + i.serializedValueSize(), + i.key(), + value.build(), + i.headers(), + i.leaderEpoch())); + } + + var pairs = zip(expected, got.records(tp)); + for (var p : pairs) { + assertConsumerRecordEquals(p.getKey(), p.getValue()); + } + } + } + + @Test + public void shouldTransformNullEventsToValidEvents() { + var interceptor = new InterceptorChain(); + + var input = mockValidRecords(); + + var configs = interceptor.getConfigs(); + + var got = interceptor.onConsume(input); + + for (final var r : input) { + var tp = new TopicPartition(r.topic(), r.partition()); + var inputRecords = input.records(tp); + var expected = new ArrayList>(); + for (var i : inputRecords) { + var value = i.value(); + + if (value == null) { + var builder = CloudEventBuilder.v1() + .withId(String.format("partition:%d/offset:%d", i.partition(), i.offset())) + .withTime(OffsetDateTime.ofInstant(Instant.ofEpochMilli(i.timestamp()), ZoneId.of("UTC"))) + .withSource(URI.create(String.format( + "/apis/v1/namespaces/%s/%s/%s#%s", + configs.get(InvalidCloudEventInterceptor.SOURCE_NAMESPACE_CONFIG), + configs.get(InvalidCloudEventInterceptor.KIND_PLURAL_CONFIG), + configs.get(InvalidCloudEventInterceptor.SOURCE_NAME_CONFIG), + i.topic()))) + .withSubject(String.format("partition:%d#%d", i.partition(), i.offset())) + .withType("dev.knative.kafka.event"); + + setKeys(builder, i); + + value = builder.build(); + } + + expected.add(new ConsumerRecord<>( + i.topic(), + i.partition(), + i.offset(), + i.timestamp(), + i.timestampType(), + i.serializedKeySize(), + i.serializedValueSize(), + i.key(), + value, + i.headers(), + i.leaderEpoch())); + } + + var pairs = zip(expected, got.records(tp)); + for (var p : pairs) { + assertConsumerRecordEquals(p.getKey(), p.getValue()); + } + } + } + + private void assertConsumerRecordEquals( + final ConsumerRecord actual, final ConsumerRecord expected) { + assertThat(actual.topic()).isEqualTo(expected.topic()); + assertThat(actual.partition()).isEqualTo(expected.partition()); + assertThat(actual.offset()).isEqualTo(expected.offset()); + assertThat(actual.key()).isEqualTo(expected.key()); + assertThat(actual.value()).isEqualTo(expected.value()); + assertThat(actual.serializedKeySize()).isEqualTo(expected.serializedKeySize()); + assertThat(actual.timestamp()).isEqualTo(expected.timestamp()); + assertThat(actual.timestampType()).isEqualTo(expected.timestampType()); + assertThat(actual.headers()).isEqualTo(expected.headers()); + } + + private void setKeys(io.cloudevents.core.v1.CloudEventBuilder value, ConsumerRecord i) { + if (i.key() instanceof Number) { + value.withExtension("partitionkey", (Number) i.key()); + value.withExtension("key", (Number) i.key()); + } else if (i.key() instanceof String) { + value.withExtension("partitionkey", i.key().toString()); + value.withExtension("key", i.key().toString()); + } else if (i.key() instanceof byte[]) { + value.withExtension("partitionkey", (byte[]) i.key()); + value.withExtension("key", (byte[]) i.key()); + } else if (i.key() != null) { + throw new IllegalArgumentException("unknown type for key: " + i.key()); + } + } + + public static List> zip(List as, List bs) { + if (as.size() != bs.size()) { + throw new IllegalArgumentException("List must have the same length"); + } + return IntStream.range(0, as.size()) + .mapToObj(i -> Map.entry(as.get(i), bs.get(i))) + .collect(Collectors.toList()); + } + + private static ConsumerRecords mockInvalidRecords() { + return new ConsumerRecords<>(Map.of( + new TopicPartition("t1", 0), + List.of( + new ConsumerRecord<>("t1", 0, 0, "a", new InvalidCloudEvent(new byte[] {1})), + new ConsumerRecord<>("t1", 0, 1, "a", null)))); + } + + private static ConsumerRecords mockValidRecords() { + return new ConsumerRecords<>(Map.of( + new TopicPartition("t1", 0), + List.of( + new ConsumerRecord<>( + "t1", + 0, + 0, + "a", + CloudEventBuilder.v1() + .withId("1") + .withType("example.event.type") + .withSource(URI.create("localhost")) + .build()), + new ConsumerRecord<>("t1", 0, 1, "a", null)))); + } +} diff --git a/test/e2e_new/kafka_source_test.go b/test/e2e_new/kafka_source_test.go index ca1b27b808..0cb82cfa32 100644 --- a/test/e2e_new/kafka_source_test.go +++ b/test/e2e_new/kafka_source_test.go @@ -116,6 +116,21 @@ func TestKafkaSourceBinaryEvent(t *testing.T) { env.Test(ctx, t, features.KafkaSourceBinaryEvent()) } +func TestKafkaSourceBinaryEventWithExtensions(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.WithPollTimings(PollInterval, PollTimeout), + environment.Managed(t), + ) + + env.Test(ctx, t, features.KafkaSourceBinaryEventWithExtensions()) +} + func TestKafkaSourceStructuredEvent(t *testing.T) { t.Parallel() diff --git a/test/rekt/features/kafka_source.go b/test/rekt/features/kafka_source.go index a7397ee7d0..c3fc8954ec 100644 --- a/test/rekt/features/kafka_source.go +++ b/test/rekt/features/kafka_source.go @@ -403,6 +403,41 @@ func KafkaSourceBinaryEvent() *feature.Feature { ) } +func KafkaSourceBinaryEventWithExtensions() *feature.Feature { + senderOptions := []eventshub.EventsHubOption{ + eventshub.InputHeader("ce-specversion", cloudevents.VersionV1), + eventshub.InputHeader("ce-type", "com.github.pull.create"), + eventshub.InputHeader("ce-source", "github.com/cloudevents/spec/pull"), + eventshub.InputHeader("ce-subject", "123"), + eventshub.InputHeader("ce-id", "A234-1234-1234"), + eventshub.InputHeader("content-type", "application/json"), + } + matcher := AllOf( + HasSpecVersion(cloudevents.VersionV1), + HasType("com.github.pull.create"), + HasSource("github.com/cloudevents/spec/pull"), + HasSubject("123"), + HasId("A234-1234-1234"), + HasDataContentType("application/json"), + HasExtension("comexampleextension1", "value"), + HasExtension("comexampleothervalue", "5"), + ) + + return kafkaSourceFeature("KafkaSourceBinaryEvent", + kafkaSourceConfig{ + authMech: PlainMech, + opts: []manifest.CfgFn{ + kafkasource.WithExtensions(map[string]string{ + "comexampleextension1": "value", + "comexampleothervalue": "5", + })}, + }, + kafkaSinkConfig{}, + senderOptions, + matcher, + ) +} + func KafkaSourceStructuredEvent() *feature.Feature { eventTime, _ := cetypes.ParseTime("2018-04-05T17:31:00Z") senderOptions := []eventshub.EventsHubOption{