diff --git a/CHANGELOG.md b/CHANGELOG.md index fadbdfe..5143726 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # WaterDrop changelog +## 2.8.1 (Unreleased) +- [Enhancement] Raise `WaterDrop::ProducerNotTransactionalError` when attempting to use transactions on a non-transactional producer. + ## 2.8.0 (2024-09-16) This release contains **BREAKING** changes. Make sure to read and apply upgrade notes. diff --git a/Gemfile.lock b/Gemfile.lock index d438548..d7e64eb 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - waterdrop (2.8.0) + waterdrop (2.8.1) karafka-core (>= 2.4.3, < 3.0.0) karafka-rdkafka (>= 0.17.5) zeitwerk (~> 2.3) diff --git a/lib/waterdrop/errors.rb b/lib/waterdrop/errors.rb index d55ba4b..dd66b72 100644 --- a/lib/waterdrop/errors.rb +++ b/lib/waterdrop/errors.rb @@ -31,6 +31,9 @@ module Errors # Raised when we want to commit transactional offset and the input is invalid TransactionalOffsetInvalidError = Class.new(BaseError) + # Raised when transaction attempt happens on a non-transactional producer + ProducerNotTransactionalError = Class.new(BaseError) + # Raised when we've got an unexpected status. This should never happen. If it does, please # contact us as it is an error. StatusInvalidError = Class.new(BaseError) diff --git a/lib/waterdrop/producer/transactions.rb b/lib/waterdrop/producer/transactions.rb index 2089a15..985551d 100644 --- a/lib/waterdrop/producer/transactions.rb +++ b/lib/waterdrop/producer/transactions.rb @@ -55,6 +55,13 @@ module Transactions # # handler.wait def transaction + unless transactional? + raise( + Errors::ProducerNotTransactionalError, + "#{id} is not transactional" + ) + end + # This will safely allow us to support one operation transactions so a transactional # producer can work without the transactional block if needed return yield if @transaction_mutex.owned? diff --git a/lib/waterdrop/version.rb b/lib/waterdrop/version.rb index bc9d235..f636f59 100644 --- a/lib/waterdrop/version.rb +++ b/lib/waterdrop/version.rb @@ -3,5 +3,5 @@ # WaterDrop library module WaterDrop # Current WaterDrop version - VERSION = '2.8.0' + VERSION = '2.8.1' end diff --git a/spec/lib/waterdrop/clients/buffered_spec.rb b/spec/lib/waterdrop/clients/buffered_spec.rb index 680cbd9..8219307 100644 --- a/spec/lib/waterdrop/clients/buffered_spec.rb +++ b/spec/lib/waterdrop/clients/buffered_spec.rb @@ -66,6 +66,16 @@ end describe '#transaction' do + let(:producer) do + WaterDrop::Producer.new do |config| + config.deliver = false + config.kafka = { + 'bootstrap.servers': 'localhost:9092', + 'transactional.id': SecureRandom.uuid + } + end + end + context 'when no error and no abort' do it 'expect to return the block value' do expect(producer.transaction { 1 }).to eq(1) diff --git a/spec/lib/waterdrop/clients/dummy_spec.rb b/spec/lib/waterdrop/clients/dummy_spec.rb index 645d5ee..265692c 100644 --- a/spec/lib/waterdrop/clients/dummy_spec.rb +++ b/spec/lib/waterdrop/clients/dummy_spec.rb @@ -105,6 +105,16 @@ end describe '#transaction' do + let(:producer) do + WaterDrop::Producer.new do |config| + config.deliver = false + config.kafka = { + 'bootstrap.servers': 'localhost:9092', + 'transactional.id': SecureRandom.uuid + } + end + end + context 'when no error and no abort' do it 'expect to return the block value' do expect(producer.transaction { 1 }).to eq(1) diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index 64bb745..a5fbd8c 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -35,7 +35,7 @@ it 'expect to raise with info that this functionality is not configured' do expect { producer.transaction {} } - .to raise_error(::Rdkafka::RdkafkaError, /Local: Functionality not configured/) + .to raise_error(::WaterDrop::Errors::ProducerNotTransactionalError) end it { expect(producer.transactional?).to eq(false) }