Skip to content

Commit

Permalink
raise on transactional attempt when non-transactional (#546)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Dec 9, 2024
1 parent f091721 commit 2eebe4a
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# WaterDrop changelog

## 2.8.1 (Unreleased)
- [Enhancement] Raise `WaterDrop::ProducerNotTransactionalError` when attempting to use transactions on a non-transactional producer.

## 2.8.0 (2024-09-16)

This release contains **BREAKING** changes. Make sure to read and apply upgrade notes.
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
waterdrop (2.8.0)
waterdrop (2.8.1)
karafka-core (>= 2.4.3, < 3.0.0)
karafka-rdkafka (>= 0.17.5)
zeitwerk (~> 2.3)
Expand Down
3 changes: 3 additions & 0 deletions lib/waterdrop/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ module Errors
# Raised when we want to commit transactional offset and the input is invalid
TransactionalOffsetInvalidError = Class.new(BaseError)

# Raised when transaction attempt happens on a non-transactional producer
ProducerNotTransactionalError = Class.new(BaseError)

# Raised when we've got an unexpected status. This should never happen. If it does, please
# contact us as it is an error.
StatusInvalidError = Class.new(BaseError)
Expand Down
7 changes: 7 additions & 0 deletions lib/waterdrop/producer/transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ module Transactions
#
# handler.wait
def transaction
unless transactional?
raise(
Errors::ProducerNotTransactionalError,
"#{id} is not transactional"
)
end

# This will safely allow us to support one operation transactions so a transactional
# producer can work without the transactional block if needed
return yield if @transaction_mutex.owned?
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.8.0'
VERSION = '2.8.1'
end
10 changes: 10 additions & 0 deletions spec/lib/waterdrop/clients/buffered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@
end

describe '#transaction' do
let(:producer) do
WaterDrop::Producer.new do |config|
config.deliver = false
config.kafka = {
'bootstrap.servers': 'localhost:9092',
'transactional.id': SecureRandom.uuid
}
end
end

context 'when no error and no abort' do
it 'expect to return the block value' do
expect(producer.transaction { 1 }).to eq(1)
Expand Down
10 changes: 10 additions & 0 deletions spec/lib/waterdrop/clients/dummy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@
end

describe '#transaction' do
let(:producer) do
WaterDrop::Producer.new do |config|
config.deliver = false
config.kafka = {
'bootstrap.servers': 'localhost:9092',
'transactional.id': SecureRandom.uuid
}
end
end

context 'when no error and no abort' do
it 'expect to return the block value' do
expect(producer.transaction { 1 }).to eq(1)
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/waterdrop/producer/transactions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

it 'expect to raise with info that this functionality is not configured' do
expect { producer.transaction {} }
.to raise_error(::Rdkafka::RdkafkaError, /Local: Functionality not configured/)
.to raise_error(::WaterDrop::Errors::ProducerNotTransactionalError)
end

it { expect(producer.transactional?).to eq(false) }
Expand Down

0 comments on commit 2eebe4a

Please sign in to comment.