From e651541f43f7a0553bd19b9a7d2e06c991749e1b Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Thu, 26 Dec 2024 10:44:40 +0100 Subject: [PATCH] improve transactional locking --- CHANGELOG.md | 2 + lib/waterdrop/errors.rb | 3 + lib/waterdrop/producer.rb | 121 ++++++++++-------- lib/waterdrop/producer/transactions.rb | 2 + .../waterdrop/producer/transactions_spec.rb | 14 +- 5 files changed, 85 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5143726..bbad208 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## 2.8.1 (Unreleased) - [Enhancement] Raise `WaterDrop::ProducerNotTransactionalError` when attempting to use transactions on a non-transactional producer. +- [Fix] Disallow closing a producer from within a transaction. +- [Fix] WaterDrop should prevent opening a transaction using a closed producer. ## 2.8.0 (2024-09-16) diff --git a/lib/waterdrop/errors.rb b/lib/waterdrop/errors.rb index dd66b72..049596d 100644 --- a/lib/waterdrop/errors.rb +++ b/lib/waterdrop/errors.rb @@ -25,6 +25,9 @@ module Errors # Raised when there was an attempt to use a closed producer ProducerClosedError = Class.new(BaseError) + # Raised if you attempt to close the producer from within a transaction. This is not allowed. + ProducerTransactionalCloseAttemptError = Class.new(BaseError) + # Raised when we want to send a message that is invalid (impossible topic, etc) MessageInvalidError = Class.new(BaseError) diff --git a/lib/waterdrop/producer.rb b/lib/waterdrop/producer.rb index 07dd667..1c85692 100644 --- a/lib/waterdrop/producer.rb +++ b/lib/waterdrop/producer.rb @@ -85,6 +85,7 @@ def client # Don't allow to obtain a client reference for a producer that was not configured raise Errors::ProducerNotConfiguredError, id if @status.initial? + raise Errors::ProducerClosedError, id if @status.closed? @connecting_mutex.synchronize do return @client if @client && @pid == Process.pid @@ -180,65 +181,75 @@ def middleware # @param force [Boolean] should we force closing even with outstanding messages after the # max wait timeout def close(force: false) - @operating_mutex.synchronize do - return unless @status.active? - - @monitor.instrument( - 'producer.closed', - producer_id: id - ) do - @status.closing! - @monitor.instrument('producer.closing', producer_id: id) - - # No need for auto-gc if everything got closed by us - # This should be used only in case a producer was not closed properly and forgotten - ObjectSpace.undefine_finalizer(id) - - # We save this thread id because we need to bypass the activity verification on the - # producer for final flush of buffers. - @closing_thread_id = Thread.current.object_id - - # Wait until all the outgoing operations are done. Only when no one is using the - # underlying client running operations we can close - sleep(0.001) until @operations_in_progress.value.zero? - - # Flush has its own buffer mutex but even if it is blocked, flushing can still happen - # as we close the client after the flushing (even if blocked by the mutex) - flush(true) - - # We should not close the client in several threads the same time - # It is safe to run it several times but not exactly the same moment - # We also mark it as closed only if it was connected, if not, it would trigger a new - # connection that anyhow would be immediately closed - if @client - # Why do we trigger it early instead of just having `#close` do it? - # The linger.ms time will be ignored for the duration of the call, - # queued messages will be sent to the broker as soon as possible. - begin - @client.flush(current_variant.max_wait_timeout) unless @client.closed? - # We can safely ignore timeouts here because any left outstanding requests - # will anyhow force wait on close if not forced. - # If forced, we will purge the queue and just close - rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError - nil - ensure - # Purge fully the local queue in case of a forceful shutdown just to be sure, that - # there are no dangling messages. In case flush was successful, there should be - # none but we do it just in case it timed out - purge if force - end + # If we already own the transactional mutex, it means we are inside of a transaction and + # it should not we allowed to close the producer in such a case. + if @transaction_mutex.locked? && @transaction_mutex.owned? + raise Errors::ProducerTransactionalCloseAttemptError, id + end - @client.close + # The transactional mutex here can be used even when no transactions are in use + # It prevents us from closing a mutex during transactions and is irrelevant in other cases + @transaction_mutex.synchronize do + @operating_mutex.synchronize do + return unless @status.active? - @client = nil - end + @monitor.instrument( + 'producer.closed', + producer_id: id + ) do + @status.closing! + @monitor.instrument('producer.closing', producer_id: id) + + # No need for auto-gc if everything got closed by us + # This should be used only in case a producer was not closed properly and forgotten + ObjectSpace.undefine_finalizer(id) + + # We save this thread id because we need to bypass the activity verification on the + # producer for final flush of buffers. + @closing_thread_id = Thread.current.object_id + + # Wait until all the outgoing operations are done. Only when no one is using the + # underlying client running operations we can close + sleep(0.001) until @operations_in_progress.value.zero? + + # Flush has its own buffer mutex but even if it is blocked, flushing can still happen + # as we close the client after the flushing (even if blocked by the mutex) + flush(true) + + # We should not close the client in several threads the same time + # It is safe to run it several times but not exactly the same moment + # We also mark it as closed only if it was connected, if not, it would trigger a new + # connection that anyhow would be immediately closed + if @client + # Why do we trigger it early instead of just having `#close` do it? + # The linger.ms time will be ignored for the duration of the call, + # queued messages will be sent to the broker as soon as possible. + begin + @client.flush(current_variant.max_wait_timeout) unless @client.closed? + # We can safely ignore timeouts here because any left outstanding requests + # will anyhow force wait on close if not forced. + # If forced, we will purge the queue and just close + rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError + nil + ensure + # Purge fully the local queue in case of a forceful shutdown just to be sure, that + # there are no dangling messages. In case flush was successful, there should be + # none but we do it just in case it timed out + purge if force + end + + @client.close + + @client = nil + end - # Remove callbacks runners that were registered - ::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id) - ::Karafka::Core::Instrumentation.error_callbacks.delete(@id) - ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(@id) + # Remove callbacks runners that were registered + ::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id) + ::Karafka::Core::Instrumentation.error_callbacks.delete(@id) + ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(@id) - @status.closed! + @status.closed! + end end end end diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb index 985551d..2c06e21 100644 --- a/lib/waterdrop/producer/transactions.rb +++ b/lib/waterdrop/producer/transactions.rb @@ -67,6 +67,8 @@ def transaction return yield if @transaction_mutex.owned? @transaction_mutex.synchronize do + ensure_active! + transactional_instrument(:finished) do with_transactional_error_handling(:begin) do transactional_instrument(:started) { client.begin_transaction } diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index a5fbd8c..b34d957 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -386,7 +386,7 @@ producer.transaction do producer.close end - end.to raise_error(Rdkafka::RdkafkaError, /Erroneous state/) + end.to raise_error(WaterDrop::Errors::ProducerTransactionalCloseAttemptError) end end @@ -397,7 +397,7 @@ Thread.new { producer.close } sleep(1) end - end.to raise_error(Rdkafka::RdkafkaError, /Erroneous state/) + end.not_to raise_error end end @@ -766,4 +766,14 @@ def call(producer, handlers) expect { handlers.map!(&:wait) }.not_to raise_error end end + + context 'when trying to use a closed producer to start a transaction' do + before { producer.close } + + it 'expect not to allow it' do + expect do + producer.transaction {} + end.to raise_error(WaterDrop::Errors::ProducerClosedError) + end + end end