Skip to content

Commit

Permalink
improve transactional locking
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Dec 26, 2024
1 parent a4cb50b commit e651541
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 57 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 2.8.1 (Unreleased)
- [Enhancement] Raise `WaterDrop::ProducerNotTransactionalError` when attempting to use transactions on a non-transactional producer.
- [Fix] Disallow closing a producer from within a transaction.
- [Fix] WaterDrop should prevent opening a transaction using a closed producer.

## 2.8.0 (2024-09-16)

Expand Down
3 changes: 3 additions & 0 deletions lib/waterdrop/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ module Errors
# Raised when there was an attempt to use a closed producer
ProducerClosedError = Class.new(BaseError)

# Raised if you attempt to close the producer from within a transaction. This is not allowed.
ProducerTransactionalCloseAttemptError = Class.new(BaseError)

# Raised when we want to send a message that is invalid (impossible topic, etc)
MessageInvalidError = Class.new(BaseError)

Expand Down
121 changes: 66 additions & 55 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def client

# Don't allow to obtain a client reference for a producer that was not configured
raise Errors::ProducerNotConfiguredError, id if @status.initial?
raise Errors::ProducerClosedError, id if @status.closed?

@connecting_mutex.synchronize do
return @client if @client && @pid == Process.pid
Expand Down Expand Up @@ -180,65 +181,75 @@ def middleware
# @param force [Boolean] should we force closing even with outstanding messages after the
# max wait timeout
def close(force: false)
@operating_mutex.synchronize do
return unless @status.active?

@monitor.instrument(
'producer.closed',
producer_id: id
) do
@status.closing!
@monitor.instrument('producer.closing', producer_id: id)

# No need for auto-gc if everything got closed by us
# This should be used only in case a producer was not closed properly and forgotten
ObjectSpace.undefine_finalizer(id)

# We save this thread id because we need to bypass the activity verification on the
# producer for final flush of buffers.
@closing_thread_id = Thread.current.object_id

# Wait until all the outgoing operations are done. Only when no one is using the
# underlying client running operations we can close
sleep(0.001) until @operations_in_progress.value.zero?

# Flush has its own buffer mutex but even if it is blocked, flushing can still happen
# as we close the client after the flushing (even if blocked by the mutex)
flush(true)

# We should not close the client in several threads the same time
# It is safe to run it several times but not exactly the same moment
# We also mark it as closed only if it was connected, if not, it would trigger a new
# connection that anyhow would be immediately closed
if @client
# Why do we trigger it early instead of just having `#close` do it?
# The linger.ms time will be ignored for the duration of the call,
# queued messages will be sent to the broker as soon as possible.
begin
@client.flush(current_variant.max_wait_timeout) unless @client.closed?
# We can safely ignore timeouts here because any left outstanding requests
# will anyhow force wait on close if not forced.
# If forced, we will purge the queue and just close
rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError
nil
ensure
# Purge fully the local queue in case of a forceful shutdown just to be sure, that
# there are no dangling messages. In case flush was successful, there should be
# none but we do it just in case it timed out
purge if force
end
# If we already own the transactional mutex, it means we are inside of a transaction and
# it should not we allowed to close the producer in such a case.
if @transaction_mutex.locked? && @transaction_mutex.owned?
raise Errors::ProducerTransactionalCloseAttemptError, id
end

@client.close
# The transactional mutex here can be used even when no transactions are in use
# It prevents us from closing a mutex during transactions and is irrelevant in other cases
@transaction_mutex.synchronize do
@operating_mutex.synchronize do
return unless @status.active?

@client = nil
end
@monitor.instrument(
'producer.closed',
producer_id: id
) do
@status.closing!
@monitor.instrument('producer.closing', producer_id: id)

# No need for auto-gc if everything got closed by us
# This should be used only in case a producer was not closed properly and forgotten
ObjectSpace.undefine_finalizer(id)

# We save this thread id because we need to bypass the activity verification on the
# producer for final flush of buffers.
@closing_thread_id = Thread.current.object_id

# Wait until all the outgoing operations are done. Only when no one is using the
# underlying client running operations we can close
sleep(0.001) until @operations_in_progress.value.zero?

# Flush has its own buffer mutex but even if it is blocked, flushing can still happen
# as we close the client after the flushing (even if blocked by the mutex)
flush(true)

# We should not close the client in several threads the same time
# It is safe to run it several times but not exactly the same moment
# We also mark it as closed only if it was connected, if not, it would trigger a new
# connection that anyhow would be immediately closed
if @client
# Why do we trigger it early instead of just having `#close` do it?
# The linger.ms time will be ignored for the duration of the call,
# queued messages will be sent to the broker as soon as possible.
begin
@client.flush(current_variant.max_wait_timeout) unless @client.closed?
# We can safely ignore timeouts here because any left outstanding requests
# will anyhow force wait on close if not forced.
# If forced, we will purge the queue and just close
rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError
nil
ensure
# Purge fully the local queue in case of a forceful shutdown just to be sure, that
# there are no dangling messages. In case flush was successful, there should be
# none but we do it just in case it timed out
purge if force
end

@client.close

@client = nil
end

# Remove callbacks runners that were registered
::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id)
::Karafka::Core::Instrumentation.error_callbacks.delete(@id)
::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(@id)
# Remove callbacks runners that were registered
::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id)
::Karafka::Core::Instrumentation.error_callbacks.delete(@id)
::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(@id)

@status.closed!
@status.closed!
end
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/waterdrop/producer/transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def transaction
return yield if @transaction_mutex.owned?

@transaction_mutex.synchronize do
ensure_active!

transactional_instrument(:finished) do
with_transactional_error_handling(:begin) do
transactional_instrument(:started) { client.begin_transaction }
Expand Down
14 changes: 12 additions & 2 deletions spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@
producer.transaction do
producer.close
end
end.to raise_error(Rdkafka::RdkafkaError, /Erroneous state/)
end.to raise_error(WaterDrop::Errors::ProducerTransactionalCloseAttemptError)
end
end

Expand All @@ -397,7 +397,7 @@
Thread.new { producer.close }
sleep(1)
end
end.to raise_error(Rdkafka::RdkafkaError, /Erroneous state/)
end.not_to raise_error
end
end

Expand Down Expand Up @@ -766,4 +766,14 @@ def call(producer, handlers)
expect { handlers.map!(&:wait) }.not_to raise_error
end
end

context 'when trying to use a closed producer to start a transaction' do
before { producer.close }

it 'expect not to allow it' do
expect do
producer.transaction {}
end.to raise_error(WaterDrop::Errors::ProducerClosedError)
end
end
end

0 comments on commit e651541

Please sign in to comment.