Skip to content

Commit

Permalink
handle critical errors (#400)
Browse files Browse the repository at this point in the history
* handle critical errors

* more specs
  • Loading branch information
mensfeld authored Oct 22, 2023
1 parent a1ed488 commit 51d9f80
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# WaterDrop changelog

## 2.6.9 (Unreleased)
- [Improvement] Introduce a `transaction.finished` event to indicate that transaction has finished whether it was aborted or committed.
- [Improvement] Use `transaction.committed` event to indicate that transaction has been committed.

## 2.6.8 (2023-10-20)
- **[Feature]** Introduce transactions support.
- [Improvement] Expand `LoggerListener` to inform about transactions (info level).
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
waterdrop (2.6.8)
waterdrop (2.6.9)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)

Expand Down
5 changes: 5 additions & 0 deletions lib/waterdrop/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ def on_transaction_committed(event)
info(event, 'Committing transaction')
end

# @param event [Dry::Events::Event] event that happened with the details
def on_transaction_finished(event)
info(event, 'Processing transaction')
end

private

# @return [Boolean] should we report the messages details in the debug mode.
Expand Down
1 change: 1 addition & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
transaction.started
transaction.committed
transaction.aborted
transaction.finished

buffer.flushed_async
buffer.flushed_sync
Expand Down
11 changes: 8 additions & 3 deletions lib/waterdrop/producer/transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def transaction
return yield if @transaction_mutex.owned?

@transaction_mutex.synchronize do
transactional_instrument(:committed) do
transactional_instrument(:finished) do
with_transactional_error_handling(:begin) do
transactional_instrument(:started) { client.begin_transaction }
end
Expand All @@ -65,11 +65,16 @@ def transaction
commit || raise(WaterDrop::Errors::AbortTransaction)

with_transactional_error_handling(:commit) do
client.commit_transaction
transactional_instrument(:committed) { client.commit_transaction }
end

result
rescue StandardError => e
# We need to handle any interrupt including critical in order not to have the transaction
# running. This will also handle things like `IRB::Abort`
#
# rubocop:disable Lint/RescueException
rescue Exception => e
# rubocop:enable Lint/RescueException
with_transactional_error_handling(:abort) do
transactional_instrument(:aborted) { client.abort_transaction }
end
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.6.8'
VERSION = '2.6.9'
end
33 changes: 32 additions & 1 deletion spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
subject(:producer) { build(:transactional_producer) }

let(:transactional_id) { SecureRandom.uuid }
let(:critical_error) { Exception }

after { producer.close }

Expand Down Expand Up @@ -197,6 +198,36 @@
end
end

context 'when we start transaction and raise a critical Exception' do
it 'expect to re-raise this error' do
expect do
producer.transaction do
producer.produce_async(topic: 'example_topic', payload: 'na')

raise critical_error
end
end.to raise_error(critical_error)
end

it 'expect to cancel the dispatch of the message' do
handler = nil

begin
producer.transaction do
handler = producer.produce_async(topic: 'example_topic', payload: 'na')

raise critical_error
end
rescue critical_error
nil
end

expect { handler.wait }.to raise_error(Rdkafka::RdkafkaError, /Purged in queue/)
end

# The rest is expected to behave the same way as StandardError so not duplicating
end

context 'when we start transaction and abort' do
it 'expect not to re-raise' do
expect do
Expand Down Expand Up @@ -321,7 +352,7 @@
end
end

context 'when trying to close a producer fron a different thread during transaction' do
context 'when trying to close a producer from a different thread during transaction' do
it 'expect to raise an error' do
expect do
producer.transaction do
Expand Down

0 comments on commit 51d9f80

Please sign in to comment.