diff --git a/CHANGELOG.md b/CHANGELOG.md index e028d44d..c219e15c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # WaterDrop changelog +## 2.6.9 (Unreleased) +- [Improvement] Introduce a `transaction.finished` event to indicate that transaction has finished whether it was aborted or committed. +- [Improvement] Use `transaction.committed` event to indicate that transaction has been committed. + ## 2.6.8 (2023-10-20) - **[Feature]** Introduce transactions support. - [Improvement] Expand `LoggerListener` to inform about transactions (info level). diff --git a/Gemfile.lock b/Gemfile.lock index c76f307a..b7407438 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - waterdrop (2.6.8) + waterdrop (2.6.9) karafka-core (>= 2.2.3, < 3.0.0) zeitwerk (~> 2.3) diff --git a/lib/waterdrop/instrumentation/logger_listener.rb b/lib/waterdrop/instrumentation/logger_listener.rb index 08ccf4de..942a7dff 100644 --- a/lib/waterdrop/instrumentation/logger_listener.rb +++ b/lib/waterdrop/instrumentation/logger_listener.rb @@ -145,6 +145,11 @@ def on_transaction_committed(event) info(event, 'Committing transaction') end + # @param event [Dry::Events::Event] event that happened with the details + def on_transaction_finished(event) + info(event, 'Processing transaction') + end + private # @return [Boolean] should we report the messages details in the debug mode. diff --git a/lib/waterdrop/instrumentation/notifications.rb b/lib/waterdrop/instrumentation/notifications.rb index fcb14215..a0dcac30 100644 --- a/lib/waterdrop/instrumentation/notifications.rb +++ b/lib/waterdrop/instrumentation/notifications.rb @@ -21,6 +21,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications transaction.started transaction.committed transaction.aborted + transaction.finished buffer.flushed_async buffer.flushed_sync diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb index 3b5eb2bb..e042a153 100644 --- a/lib/waterdrop/producer/transactions.rb +++ b/lib/waterdrop/producer/transactions.rb @@ -49,7 +49,7 @@ def transaction return yield if @transaction_mutex.owned? @transaction_mutex.synchronize do - transactional_instrument(:committed) do + transactional_instrument(:finished) do with_transactional_error_handling(:begin) do transactional_instrument(:started) { client.begin_transaction } end @@ -65,11 +65,16 @@ def transaction commit || raise(WaterDrop::Errors::AbortTransaction) with_transactional_error_handling(:commit) do - client.commit_transaction + transactional_instrument(:committed) { client.commit_transaction } end result - rescue StandardError => e + # We need to handle any interrupt including critical in order not to have the transaction + # running. This will also handle things like `IRB::Abort` + # + # rubocop:disable Lint/RescueException + rescue Exception => e + # rubocop:enable Lint/RescueException with_transactional_error_handling(:abort) do transactional_instrument(:aborted) { client.abort_transaction } end diff --git a/lib/waterdrop/version.rb b/lib/waterdrop/version.rb index 23b9e449..72204185 100644 --- a/lib/waterdrop/version.rb +++ b/lib/waterdrop/version.rb @@ -3,5 +3,5 @@ # WaterDrop library module WaterDrop # Current WaterDrop version - VERSION = '2.6.8' + VERSION = '2.6.9' end diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index 767b3b84..7216f2d5 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -4,6 +4,7 @@ subject(:producer) { build(:transactional_producer) } let(:transactional_id) { SecureRandom.uuid } + let(:critical_error) { Exception } after { producer.close } @@ -197,6 +198,36 @@ end end + context 'when we start transaction and raise a critical Exception' do + it 'expect to re-raise this error' do + expect do + producer.transaction do + producer.produce_async(topic: 'example_topic', payload: 'na') + + raise critical_error + end + end.to raise_error(critical_error) + end + + it 'expect to cancel the dispatch of the message' do + handler = nil + + begin + producer.transaction do + handler = producer.produce_async(topic: 'example_topic', payload: 'na') + + raise critical_error + end + rescue critical_error + nil + end + + expect { handler.wait }.to raise_error(Rdkafka::RdkafkaError, /Purged in queue/) + end + + # The rest is expected to behave the same way as StandardError so not duplicating + end + context 'when we start transaction and abort' do it 'expect not to re-raise' do expect do @@ -321,7 +352,7 @@ end end - context 'when trying to close a producer fron a different thread during transaction' do + context 'when trying to close a producer from a different thread during transaction' do it 'expect to raise an error' do expect do producer.transaction do