diff --git a/CHANGELOG.md b/CHANGELOG.md index 965b1982..ac64ce78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # WaterDrop changelog +## 2.6.13 (Unreleased) +- [Enhancement] Expose `#partition_count` for building custom partitioners that need to be aware of number of partitions on a given topic. + ## 2.6.12 (2024-01-03) - [Enhancement] Provide ability to label message dispatches for increased observability. - [Enhancement] Provide ability to commit offset during the transaction with a consumer provided. diff --git a/Gemfile.lock b/Gemfile.lock index 9a051874..3a893c8a 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - waterdrop (2.6.12) + waterdrop (2.6.13) karafka-core (>= 2.2.3, < 3.0.0) zeitwerk (~> 2.3) diff --git a/lib/waterdrop/producer.rb b/lib/waterdrop/producer.rb index 4e39445d..cc983d42 100644 --- a/lib/waterdrop/producer.rb +++ b/lib/waterdrop/producer.rb @@ -122,6 +122,16 @@ def client @client end + # Fetches and caches the partition count of a topic + # + # @param topic [String] topic for which we want to get the number of partitions + # @return [Integer] number of partitions of the requested topic + # + # @note It uses the underlying `rdkafka-ruby` partition count cache. + def partition_count(topic) + client.partition_count(topic.to_s) + end + # Purges data from both the buffer queue as well as the librdkafka queue. # # @note This is an operation that can cause data loss. Keep that in mind. It will not only diff --git a/lib/waterdrop/version.rb b/lib/waterdrop/version.rb index 3eb361ea..e0df2dd2 100644 --- a/lib/waterdrop/version.rb +++ b/lib/waterdrop/version.rb @@ -3,5 +3,5 @@ # WaterDrop library module WaterDrop # Current WaterDrop version - VERSION = '2.6.12' + VERSION = '2.6.13' end diff --git a/spec/lib/waterdrop/producer_spec.rb b/spec/lib/waterdrop/producer_spec.rb index e38cd2ba..400b8f5b 100644 --- a/spec/lib/waterdrop/producer_spec.rb +++ b/spec/lib/waterdrop/producer_spec.rb @@ -97,6 +97,30 @@ end end + describe '#partition_count' do + subject(:producer) do + described_class.new do |config| + config.kafka = { 'bootstrap.servers': 'localhost:9092' } + end + end + + let(:count) { producer.partition_count(topic) } + + context 'when topic does not exist' do + let(:topic) { SecureRandom.uuid } + + it { expect { count }.to raise_error(Rdkafka::RdkafkaError, /unknown_topic_or_part/) } + end + + context 'when topic exists' do + let(:topic) { SecureRandom.uuid } + + before { producer.produce_sync(topic: topic, payload: '') } + + it { expect(count).to eq(1) } + end + end + describe '#close' do before { allow(producer).to receive(:client).and_call_original }