Skip to content

Commit

Permalink
expose partition_count for custom partitioners (#445)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Jan 29, 2024
1 parent 87eba2c commit 29d564f
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# WaterDrop changelog

## 2.6.13 (Unreleased)
- [Enhancement] Expose `#partition_count` for building custom partitioners that need to be aware of number of partitions on a given topic.

## 2.6.12 (2024-01-03)
- [Enhancement] Provide ability to label message dispatches for increased observability.
- [Enhancement] Provide ability to commit offset during the transaction with a consumer provided.
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
waterdrop (2.6.12)
waterdrop (2.6.13)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)

Expand Down
10 changes: 10 additions & 0 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ def client
@client
end

# Fetches and caches the partition count of a topic
#
# @param topic [String] topic for which we want to get the number of partitions
# @return [Integer] number of partitions of the requested topic
#
# @note It uses the underlying `rdkafka-ruby` partition count cache.
def partition_count(topic)
client.partition_count(topic.to_s)
end

# Purges data from both the buffer queue as well as the librdkafka queue.
#
# @note This is an operation that can cause data loss. Keep that in mind. It will not only
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.6.12'
VERSION = '2.6.13'
end
24 changes: 24 additions & 0 deletions spec/lib/waterdrop/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,30 @@
end
end

describe '#partition_count' do
subject(:producer) do
described_class.new do |config|
config.kafka = { 'bootstrap.servers': 'localhost:9092' }
end
end

let(:count) { producer.partition_count(topic) }

context 'when topic does not exist' do
let(:topic) { SecureRandom.uuid }

it { expect { count }.to raise_error(Rdkafka::RdkafkaError, /unknown_topic_or_part/) }
end

context 'when topic exists' do
let(:topic) { SecureRandom.uuid }

before { producer.produce_sync(topic: topic, payload: '') }

it { expect(count).to eq(1) }
end
end

describe '#close' do
before { allow(producer).to receive(:client).and_call_original }

Expand Down

0 comments on commit 29d564f

Please sign in to comment.