diff --git a/karapace/coordinator/master_coordinator.py b/karapace/coordinator/master_coordinator.py index 992162bff..d9bd64e20 100644 --- a/karapace/coordinator/master_coordinator.py +++ b/karapace/coordinator/master_coordinator.py @@ -13,14 +13,17 @@ from karapace.config import Config from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS +from karapace.typing import SchemaReaderStoppper from threading import Thread from typing import Final import asyncio import logging +import time __all__ = ("MasterCoordinator",) + LOG = logging.getLogger(__name__) @@ -42,6 +45,10 @@ def __init__(self, config: Config) -> None: self._sc: SchemaCoordinator | None = None self._thread: Thread = Thread(target=self._start_loop, daemon=True) self._loop: asyncio.AbstractEventLoop | None = None + self._schema_reader_stopper: SchemaReaderStoppper | None = None + + def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None: + self._schema_reader_stopper = schema_reader_stopper @property def schema_coordinator(self) -> SchemaCoordinator | None: @@ -84,14 +91,17 @@ async def _async_loop(self) -> None: self._sc = self.init_schema_coordinator() while self._running: if self._sc.ready(): - return + break await asyncio.sleep(0.5) - + # todo: wait a condition variable or a lock. LOG.info("Closing master_coordinator") if self._sc: await self._sc.close() - if self._loop: - self._loop.close() + while self._loop is not None and not self._loop.is_closed(): + self._loop.stop() + if not self._loop.is_running(): + self._loop.close() + time.sleep(0.5) if self._kafka_client: await self._kafka_client.close() @@ -119,8 +129,10 @@ def init_kafka_client(self) -> AIOKafkaClient: def init_schema_coordinator(self) -> SchemaCoordinator: assert self._kafka_client is not None + assert self._schema_reader_stopper is not None schema_coordinator = SchemaCoordinator( client=self._kafka_client, + schema_reader_stopper=self._schema_reader_stopper, election_strategy=self._config.get("master_election_strategy", "lowest"), group_id=self._config["group_id"], hostname=self._config["advertised_hostname"], @@ -159,3 +171,4 @@ def get_master_info(self) -> tuple[bool | None, str | None]: async def close(self) -> None: self._running = False + # todo set the condition variable or lock. diff --git a/karapace/coordinator/schema_coordinator.py b/karapace/coordinator/schema_coordinator.py index adc0858c7..99445b1d9 100644 --- a/karapace/coordinator/schema_coordinator.py +++ b/karapace/coordinator/schema_coordinator.py @@ -26,7 +26,7 @@ ) from aiokafka.util import create_future, create_task from karapace.dataclasses import default_dataclass -from karapace.typing import JsonData +from karapace.typing import JsonData, SchemaReaderStoppper from karapace.utils import json_decode, json_encode from karapace.version import __version__ from typing import Any, Coroutine, Final, Sequence @@ -122,6 +122,7 @@ class SchemaCoordinator: def __init__( self, client: AIOKafkaClient, + schema_reader_stopper: SchemaReaderStoppper, hostname: str, port: int, scheme: str, @@ -146,6 +147,7 @@ def __init__( self.scheme: Final = scheme self.master_eligibility: Final = master_eligibility self.master_url: str | None = None + self._schema_reader_stopper = schema_reader_stopper self._are_we_master: bool | None = False # a value that its strictly higher than any clock, so we are sure # we are never going to consider this the leader without explictly passing @@ -211,7 +213,7 @@ def are_we_master(self) -> bool | None: LOG.warning("No new elections performed yet.") return None - if not self._ready: + if not self._ready or not self._schema_reader_stopper.ready(): return False if self._are_we_master and self._initial_election_sec is not None: @@ -223,7 +225,7 @@ def are_we_master(self) -> bool | None: self._initial_election_sec = None # this is the last point in time were we wait till to the end of the log queue for new # incoming messages. - self._ready = False # todo: wrong, its not the _ready flag we should change, we should change the same + self._schema_reader_stopper.set_not_ready() # flag that its set at startup, fix this return False @@ -484,7 +486,7 @@ async def _on_join_complete( # was a master change, the time before acting its always respect # to which was the previous master (if we were master no need # to wait more before acting) - self._ready = False # todo: wrong, its not the _ready flag we should change, we should change the same + self._schema_reader_stopper.set_not_ready() # flag that its set at startup, fix this # `time.monotonic()` because we don't want the time to go back or forward because of e.g. ntp self._initial_election_sec = time.monotonic() @@ -505,7 +507,7 @@ async def _on_join_complete( self.master_url = None self._are_we_master = False else: - LOG.info("We are not elected as master", member_id) + LOG.info("We are not elected as master") self.master_url = master_url self._are_we_master = False self._ready = True @@ -518,6 +520,7 @@ def coordinator_dead(self) -> None: """ if self._coordinator_dead_fut is not None and self.coordinator_id is not None: LOG.warning("Marking the coordinator dead (node %s)for group %s.", self.coordinator_id, self.group_id) + self._are_we_master = False self.coordinator_id = None self._coordinator_dead_fut.set_result(None) @@ -525,6 +528,7 @@ def reset_generation(self) -> None: """Coordinator did not recognize either generation or member_id. Will need to re-join the group. """ + self._are_we_master = False self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.request_rejoin() diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index f36b85f67..d85d2ecb4 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -39,9 +39,9 @@ from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient -from karapace.typing import JsonObject, SchemaId, Subject, Version +from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version from karapace.utils import json_decode, JSONDecodeError -from threading import Event, Thread +from threading import Event, Lock, Thread from typing import Final, Mapping, Sequence import json @@ -119,7 +119,7 @@ def _create_admin_client_from_config(config: Config) -> KafkaAdminClient: ) -class KafkaSchemaReader(Thread): +class KafkaSchemaReader(Thread, SchemaReaderStoppper): def __init__( self, config: Config, @@ -156,7 +156,10 @@ def __init__( # old stale version that has not been deleted yet.) self.offset = OFFSET_UNINITIALIZED self._highest_offset = OFFSET_UNINITIALIZED - self.ready = False + # when a master its elected as master we should read the last arrived messages at least + # once. This lock prevent the concurrent modification of the `ready` flag. + self._ready_lock = Lock() + self._ready = False # This event controls when the Reader should stop running, it will be # set by another thread (e.g. `KarapaceSchemaRegistry`) @@ -269,9 +272,10 @@ def _get_beginning_offset(self) -> int: return OFFSET_UNINITIALIZED def _is_ready(self) -> bool: - if self.ready: - return True - + """ + Always call `_is_ready` only if `self._ready` is False. + Removed the check since now with the Lock the lookup it's a costly operation. + """ assert self.consumer is not None, "Thread must be started" try: @@ -315,6 +319,14 @@ def _is_ready(self) -> bool: def highest_offset(self) -> int: return max(self._highest_offset, self._offset_watcher.greatest_offset()) + def ready(self) -> bool: + with self._ready_lock: + return self._ready + + def set_not_ready(self) -> None: + with self._ready_lock: + self._ready = False + @staticmethod def _parse_message_value(raw_value: str | bytes) -> JsonObject | None: value = json_decode(raw_value) @@ -326,10 +338,8 @@ def _parse_message_value(raw_value: str | bytes) -> JsonObject | None: def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" - msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s, num_messages=self.max_messages_to_process) - if self.ready is False: - self.ready = self._is_ready() + self._update_is_ready_flag() watch_offsets = False if self.master_coordinator is not None: @@ -372,9 +382,10 @@ def handle_messages(self) -> None: # Default keymode is CANONICAL and preferred unless any data consumed # has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE # the subsequent keys are omitted from detection. - if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: - if msg_keymode == KeyMode.DEPRECATED_KARAPACE: - self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) + with self._ready_lock: + if not self._ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: + if msg_keymode == KeyMode.DEPRECATED_KARAPACE: + self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) value = None message_value = msg.value() @@ -395,14 +406,28 @@ def handle_messages(self) -> None: else: schema_records_processed_keymode_deprecated_karapace += 1 - if self.ready and watch_offsets: - self._offset_watcher.offset_seen(self.offset) + with self._ready_lock: + if self._ready and watch_offsets: + self._offset_watcher.offset_seen(self.offset) self._report_schema_metrics( schema_records_processed_keymode_canonical, schema_records_processed_keymode_deprecated_karapace, ) + def _update_is_ready_flag(self) -> None: + update_ready_flag = False + + # to keep the lock as few as possible. + with self._ready_lock: + if self._ready is False: + update_ready_flag = True + + if update_ready_flag: + new_ready_flag = self._is_ready() + with self._ready_lock: + self._ready = new_ready_flag + def _report_schema_metrics( self, schema_records_processed_keymode_canonical: int, diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 6594663ad..406e6d465 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -56,6 +56,13 @@ def __init__(self, config: Config) -> None: master_coordinator=self.mc, database=self.database, ) + # very ugly, left as a placeholder, since we have a bidirectional + # dependency it means that the two objects needs to be one (aka the + # mc should create the KafkaSchemaReader and inject the stopper inside + # the schema_coordinator. Left as it is to reason together to the implementation + # since semantically it's the same, after we agree on the solution proceeding with + # the refactor) + self.mc.set_stoppper(self.schema_reader) self.schema_lock = asyncio.Lock() self._master_lock = asyncio.Lock() @@ -94,7 +101,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | are_we_master, master_url = self.mc.get_master_info() if are_we_master is None: LOG.info("No master set: %r, url: %r", are_we_master, master_url) - elif not ignore_readiness and self.schema_reader.ready is False: + elif not ignore_readiness and self.schema_reader.ready() is False: LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) else: return are_we_master, master_url diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 75ba91241..b0dff7147 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -83,6 +83,7 @@ class SchemaErrorMessages(Enum): class KarapaceSchemaRegistryController(KarapaceBase): def __init__(self, config: Config) -> None: + # the `not_ready_handler` its wrong, its not expecting an async method the receiver. super().__init__(config=config, not_ready_handler=self._forward_if_not_ready_to_serve) self._auth: HTTPAuthorizer | None = None @@ -103,7 +104,7 @@ async def schema_registry_health(self) -> JsonObject: if self._auth is not None: resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready - if self.schema_registry.schema_reader.ready: + if self.schema_registry.schema_reader.ready(): resp["schema_registry_startup_time_sec"] = ( self.schema_registry.schema_reader.last_check - self._process_start_time ) @@ -135,7 +136,7 @@ def _check_authorization(self, user: User | None, operation: Operation, resource self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN) async def _forward_if_not_ready_to_serve(self, request: HTTPRequest) -> None: - if self.schema_registry.schema_reader.ready: + if self.schema_registry.schema_reader.ready(): pass else: # Not ready, still loading the state. diff --git a/karapace/typing.py b/karapace/typing.py index 77058cce2..aa03179d7 100644 --- a/karapace/typing.py +++ b/karapace/typing.py @@ -4,6 +4,7 @@ """ from __future__ import annotations +from abc import ABC, abstractmethod from enum import Enum, unique from karapace.errors import InvalidVersion from typing import Any, ClassVar, Dict, List, Mapping, NewType, Sequence, Union @@ -101,3 +102,13 @@ def value(self) -> int: @property def is_latest(self) -> bool: return self.value == self.MINUS_1_VERSION_TAG + + +class SchemaReaderStoppper(ABC): + @abstractmethod + def ready(self) -> bool: + pass + + @abstractmethod + def set_not_ready(self) -> None: + pass diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index d6ee8b006..03c427ded 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -6,6 +6,7 @@ """ from karapace.config import set_config_defaults from karapace.coordinator.master_coordinator import MasterCoordinator +from karapace.typing import SchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers from tests.integration.utils.network import PortRangeInclusive from tests.utils import new_random_name @@ -16,8 +17,17 @@ import requests +class AlwaysAvailableSchemaReaderStoppper(SchemaReaderStoppper): + def ready(self) -> bool: + return True + + def set_not_ready(self) -> None: + pass + + async def init_admin(config): mc = MasterCoordinator(config=config) + mc.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) await mc.start() return mc diff --git a/tests/integration/test_schema_coordinator.py b/tests/integration/test_schema_coordinator.py index 71a23fd92..8b602735b 100644 --- a/tests/integration/test_schema_coordinator.py +++ b/tests/integration/test_schema_coordinator.py @@ -21,6 +21,7 @@ from karapace.coordinator.schema_coordinator import Assignment, SchemaCoordinator, SchemaCoordinatorGroupRebalance from karapace.utils import json_encode from karapace.version import __version__ +from tests.integration.test_master_coordinator import AlwaysAvailableSchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers from typing import AsyncGenerator, Iterator from unittest import mock @@ -50,6 +51,7 @@ async def fixture_admin( ) -> AsyncGenerator: coordinator = SchemaCoordinator( mocked_client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -96,6 +98,7 @@ async def test_coordinator_workflow( waiting_time_before_acting_as_master_sec = 5 coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host-1", 10101, "https", @@ -119,12 +122,14 @@ async def test_coordinator_workflow( assert not coordinator.are_we_master() # the waiting_time_before_acting_as_master_ms await asyncio.sleep(10) + assert not coordinator.are_we_master(), "last fetch before being available as master" assert coordinator.are_we_master(), f"after {waiting_time_before_acting_as_master_sec} seconds we can act as a master" # Check if adding an additional coordinator will rebalance correctly client2 = await _get_client(kafka_servers=kafka_servers) coordinator2 = SchemaCoordinator( client2, + AlwaysAvailableSchemaReaderStoppper(), "test-host-2", 10100, "https", @@ -161,6 +166,7 @@ async def test_coordinator_workflow( assert not secondary.are_we_master(), "also the second cannot be immediately a master" # after that time the primary can act as a master await asyncio.sleep(waiting_time_before_acting_as_master_sec) + assert not primary.are_we_master(), "Last fetch before being available as master" assert primary.are_we_master() assert not secondary.are_we_master() @@ -401,6 +407,7 @@ async def test_coordinator_metadata_update(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -443,6 +450,7 @@ async def test_coordinator__send_req(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -481,6 +489,7 @@ async def test_coordinator_ensure_coordinator_known(client: AIOKafkaClient) -> N try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -562,6 +571,7 @@ async def test_coordinator__do_heartbeat(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -650,6 +660,7 @@ async def test_coordinator__heartbeat_routine(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -721,6 +732,7 @@ async def test_coordinator__coordination_routine(client: AIOKafkaClient) -> None try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index 738f76498..5003320b3 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -15,6 +15,7 @@ from karapace.schema_reader import KafkaSchemaReader from karapace.utils import json_encode from tests.base_testcase import BaseTestCase +from tests.integration.test_master_coordinator import AlwaysAvailableSchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers from tests.schemas.json_schemas import FALSE_SCHEMA, TRUE_SCHEMA from tests.utils import create_group_name_factory, create_subject_name_factory, new_random_name, new_topic @@ -71,6 +72,7 @@ async def test_regression_soft_delete_schemas_should_be_registered( } ) master_coordinator = MasterCoordinator(config=config) + master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: await master_coordinator.start() database = InMemoryDatabase() @@ -163,6 +165,7 @@ async def test_regression_config_for_inexisting_object_should_not_throw( } ) master_coordinator = MasterCoordinator(config=config) + master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: await master_coordinator.start() database = InMemoryDatabase() @@ -267,6 +270,7 @@ async def test_key_format_detection( } ) master_coordinator = MasterCoordinator(config=config) + master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: await master_coordinator.start() key_formatter = KeyFormatter() diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index afbcbb976..c65506ced 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -167,7 +167,7 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None: schema_reader.offset = testcase.cur_offset schema_reader.handle_messages() - assert schema_reader.ready is testcase.expected + assert schema_reader.ready() is testcase.expected def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: @@ -190,7 +190,7 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP schema_reader.handle_messages() - assert schema_reader.ready is True + assert schema_reader.ready() is True assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP @@ -236,16 +236,16 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche schema_reader.handle_messages() assert schema_reader.offset == 1 - assert schema_reader.ready is False + assert schema_reader.ready() is False schema_reader.handle_messages() assert schema_reader.offset == 2 - assert schema_reader.ready is False + assert schema_reader.ready() is False schema_reader.handle_messages() assert schema_reader.offset == 3 - assert schema_reader.ready is False + assert schema_reader.ready() is False schema_reader.handle_messages() # call last time to call _is_ready() assert schema_reader.offset == 3 - assert schema_reader.ready is True + assert schema_reader.ready() is True assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index 6d850f5fc..41cc32ea0 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -33,7 +33,7 @@ async def test_validate_schema_request_body(): async def test_forward_when_not_ready(): with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) - ready_property_mock = PropertyMock(return_value=False) + ready_property_mock = PropertyMock(return_value=lambda: False) schema_registry = AsyncMock(spec=KarapaceSchemaRegistry) type(schema_reader_mock).ready = ready_property_mock schema_registry.schema_reader = schema_reader_mock