From 2b793ce527a8179138eb16d71eec8c19305ba3a7 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Wed, 28 Aug 2024 14:44:29 +0200 Subject: [PATCH] tests,schema-reader: kafka message handling error tests We add tests to validate the possible error cases while parsing the message key: - we test for invalid JSON for key - missing `keytype` in the key data - we test for invalid `keytype` - we test for invalid config value - we test for invalid subject delete value - we test for invalid version number within schema value - we test for generic invalid protobuf schema --- karapace/protobuf/exception.py | 18 +-- karapace/schema_reader.py | 6 +- tests/unit/test_schema_reader.py | 197 +++++++++++++++++++++++++++++++ tests/utils.py | 16 +++ 4 files changed, 225 insertions(+), 12 deletions(-) diff --git a/karapace/protobuf/exception.py b/karapace/protobuf/exception.py index f37686256..58569bac9 100644 --- a/karapace/protobuf/exception.py +++ b/karapace/protobuf/exception.py @@ -12,14 +12,6 @@ from karapace.protobuf.schema import ProtobufSchema -class IllegalStateException(Exception): - pass - - -class IllegalArgumentException(Exception): - pass - - class Error(Exception): """Base class for errors in this module.""" @@ -28,10 +20,18 @@ class ProtobufException(Error): """Generic Protobuf schema error.""" -class ProtobufTypeException(Error): +class ProtobufTypeException(ProtobufException): """Generic Protobuf type error.""" +class IllegalStateException(ProtobufException): + pass + + +class IllegalArgumentException(ProtobufException): + pass + + class ProtobufUnresolvedDependencyException(ProtobufException): """a Protobuf schema has unresolved dependency""" diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 9ded95651..5939f4cb5 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -28,7 +28,7 @@ from karapace.config import Config from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.dependency import Dependency -from karapace.errors import InvalidReferences, InvalidSchema, ShutdownException +from karapace.errors import InvalidReferences, InvalidSchema, InvalidVersion, ShutdownException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.common import translate_from_kafkaerror @@ -399,7 +399,7 @@ def handle_messages(self) -> None: try: self.handle_msg(key, value) - except (InvalidSchema, TypeError) as exc: + except (InvalidSchema, InvalidVersion, TypeError) as exc: self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) continue finally: @@ -487,7 +487,7 @@ def _handle_msg_config(self, key: dict, value: dict | None) -> None: def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument if value is None: LOG.warning("DELETE_SUBJECT record doesnt have a value, should have") - return + raise ValueError("DELETE_SUBJECT record doesnt have a value, should have") subject = value["subject"] version = Version(value["version"]) diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index afbcbb976..bf92cd2d4 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -10,6 +10,7 @@ from confluent_kafka import Message from dataclasses import dataclass from karapace.config import DEFAULTS +from karapace.errors import CorruptKafkaRecordException, ShutdownException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import KeyFormatter @@ -18,11 +19,15 @@ KafkaSchemaReader, MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP, MAX_MESSAGES_TO_CONSUME_ON_STARTUP, + MessageType, OFFSET_EMPTY, OFFSET_UNINITIALIZED, ) +from karapace.schema_type import SchemaType from karapace.typing import SchemaId, Version from tests.base_testcase import BaseTestCase +from tests.utils import schema_protobuf_invalid +from typing import Callable from unittest.mock import Mock import confluent_kafka @@ -318,3 +323,195 @@ def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture) -> None: assert log.name == "karapace.schema_reader" assert log.levelname == "WARNING" assert log.message == "Hard delete: version: Version(2) for subject: 'test-subject' did not exist, should have" + + +@dataclass +class KafkaMessageHandlingErrorTestCase(BaseTestCase): + key: bytes + value: bytes + schema_type: SchemaType + message_type: MessageType + expected_error: ShutdownException + expected_log_message: str + + +@pytest.fixture(name="schema_reader_with_consumer_messages_factory") +def fixture_schema_reader_with_consumer_messages_factory() -> Callable[[tuple[list[Message]]], KafkaSchemaReader]: + def factory(consumer_messages: tuple[list[Message]]) -> KafkaSchemaReader: + key_formatter_mock = Mock(spec=KeyFormatter) + consumer_mock = Mock(spec=KafkaConsumer) + + consumer_mock.consume.side_effect = consumer_messages + # Return tuple (beginning, end), end offset is the next upcoming record offset + consumer_mock.get_watermark_offsets.return_value = (0, 4) + + # Update the config to run the schema reader in strict mode so errors can be raised + config = DEFAULTS.copy() + config["kafka_schema_reader_strict_mode"] = True + + offset_watcher = OffsetWatcher() + schema_reader = KafkaSchemaReader( + config=config, + offset_watcher=offset_watcher, + key_formatter=key_formatter_mock, + master_coordinator=None, + database=InMemoryDatabase(), + ) + schema_reader.consumer = consumer_mock + schema_reader.offset = 0 + assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP + return schema_reader + + return factory + + +@pytest.fixture(name="message_factory") +def fixture_message_factory() -> Callable[[bytes, bytes, int], Message]: + def factory(key: bytes, value: bytes, offset: int = 1) -> Message: + message = Mock(spec=Message) + message.key.return_value = key + message.value.return_value = value + message.offset.return_value = offset + message.error.return_value = None + return message + + return factory + + +@pytest.mark.parametrize( + "test_case", + [ + KafkaMessageHandlingErrorTestCase( + test_name="Message key is not valid JSON", + key=b'{subject1::::"test""version":1"magic":1}', + value=b'{"value": "value does not matter at this stage, just correct JSON"}', + schema_type=None, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message="Invalid JSON in msg.key() at offset 1", + ), + KafkaMessageHandlingErrorTestCase( + test_name="Keytype is missing from message key", + key=b'{"subject":"test","version":1,"magic":1}', + value=b'{"value": "value does not matter at this stage, just correct JSON"}', + schema_type=None, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'subject': 'test', 'version': 1, 'magic': 1}-" + "{'value': 'value does not matter at this stage, just correct JSON'} " + "has been discarded because doesn't contain the `keytype` key in the key" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Keytype is invalid on message key", + key=b'{"keytype":"NOT_A_VALID_KEY_TYPE","subject":"test","version":1,"magic":1}', + value=b'{"value": "value does not matter at this stage, just correct JSON"}', + schema_type=None, + message_type=None, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'NOT_A_VALID_KEY_TYPE', 'subject': 'test', 'version': 1, 'magic': 1}-" + "{'value': 'value does not matter at this stage, just correct JSON'} " + "has been discarded because the NOT_A_VALID_KEY_TYPE is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Config message value is not valid JSON", + key=b'{"keytype":"CONFIG","subject":null,"magic":0}', + value=(b'no-valid-jason"compatibilityLevel": "BACKWARD""'), + schema_type=None, + message_type=MessageType.config, + expected_error=CorruptKafkaRecordException, + expected_log_message="Invalid JSON in msg.value() at offset 1", + ), + KafkaMessageHandlingErrorTestCase( + test_name="Config message value is not valid config setting", + key=b'{"keytype":"CONFIG","subject":null,"magic":0}', + value=b'{"not_the_key_name":"INVALID_CONFIG"}', + schema_type=None, + message_type=MessageType.config, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'CONFIG', 'subject': None, 'magic': 0}-" + "{'not_the_key_name': 'INVALID_CONFIG'} has been discarded because the CONFIG is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Version in schema message value is not valid", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'{"subject": "test", "version": "invalid-version", "id": 1, "deleted": false,' + b'"schema": "{\\"name\\": \\"test\\", \\"type\\": \\"record\\", \\"fields\\": ' + b'[{\\"name\\": \\"test_field\\", \\"type\\": [\\"string\\", \\"int\\"]}]}"}' + ), + schema_type=SchemaType.AVRO, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'SCHEMA', 'subject': 'test', 'version': 1, 'magic': 1}-" + "{'subject': 'test', 'version': 'invalid-version', 'id': 1, 'deleted': False, 'schema': " + '\'{"name": "test", "type": "record", "fields": [{"name": "test_field", "type": ["string", "int"]}]}\'} ' + "has been discarded because the SCHEMA is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Message value is not valid JSON", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'no-valid-json"version": 1, "id": 1, "deleted": false,' + b'"schema": "{\\"name\\": \\"test\\", \\"type\\": \\"record\\", \\"fields\\": ' + b'[{\\"name\\": \\"test_field\\", \\"type\\": [\\"string\\", \\"int\\"]}]}"}' + ), + schema_type=SchemaType.AVRO, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message="Invalid JSON in msg.value() at offset 1", + ), + KafkaMessageHandlingErrorTestCase( + test_name="Delete subject message value is missing `subject` field", + key=b'{"keytype":"DELETE_SUBJECT","subject":"test","version":1,"magic":1}', + value=b'{"not-subject-key":"test","version":1}', + schema_type=None, + message_type=MessageType.delete_subject, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'DELETE_SUBJECT', 'subject': 'test', 'version': 1, 'magic': 1}-" + "{'not-subject-key': 'test', 'version': 1} has been discarded because the DELETE_SUBJECT is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Protobuf schema is invalid", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false, "schema":' + + json.dumps(schema_protobuf_invalid).encode() + + b"}" + ), + schema_type=SchemaType.PROTOBUF, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message="Schema is not valid ProtoBuf definition", + ), + ], +) +def test_message_error_handling( + caplog: LogCaptureFixture, + test_case: KafkaMessageHandlingErrorTestCase, + schema_reader_with_consumer_messages_factory: Callable[[tuple[list[Message]]], KafkaSchemaReader], + message_factory: Callable[[bytes, bytes, int], Message], +) -> None: + message = message_factory(key=test_case.key, value=test_case.value) + consumer_messages = ([message],) + schema_reader = schema_reader_with_consumer_messages_factory(consumer_messages) + + with caplog.at_level(logging.WARNING, logger="karapace.schema_reader"): + with pytest.raises(test_case.expected_error): + schema_reader.handle_messages() + + assert schema_reader.offset == 1 + assert not schema_reader.ready + for log in caplog.records: + assert log.name == "karapace.schema_reader" + assert log.levelname == "WARNING" + assert log.message == test_case.expected_log_message diff --git a/tests/utils.py b/tests/utils.py index f38097858..352745f94 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -148,6 +148,22 @@ {"q": 3, "sensor_type": "L1", "nums": [3, 4], "order": {"item": "ABC01223"}}, ] +schema_protobuf_invalid = """ +|o3" +| +|opti -- om.codingharbour.protobuf"; +|option java_outer_classname = "TestEnumOrder"; +| +|message Message { +| int32 +| speed =; +|} +|Enum +| HIGH = 0 +| MIDDLE = ; +""" +schema_protobuf_invalid = trim_margin(schema_protobuf_invalid) + schema_data_second = {"protobuf": (schema_protobuf_second, test_objects_protobuf_second)} second_schema_json = json.dumps(