This gem is used to produce Kafka messages. It is a wrapper over the waterdrop gem, and it is recommended for use as a transport with the sbmt-outbox gem.
Add this line to your app's Gemfile:
gem "sbmt-kafka_producer"
And then execute:
bundle install
Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/Kuper-Tech/outbox-example-apps
We recommend going through the configuration and file creation process using the following Rails generators. Each generator can be run by using the --help
option to learn more about the available arguments.
If you plug the gem into your application for the first time, you can generate the initial configuration:
rails g kafka_producer:install
As a result, the config/kafka_producer.yml
file will be created.
A producer class can be generated with the following command:
rails g kafka_producer:producer MaybeNamespaced::Name sync topic
As the result, a sync producer will be created.
To generate an Outbox producer for use with Gem sbmt-Outbox, run the following command:
rails g kafka_producer:outbox_producer SomeOutboxItem
The config/kafka_producer.yml
file is the main configuration for this gem.
default: &default
deliver: true
# see more options at https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/config.rb
wait_on_queue_full: true
max_payload_size: 1000012
max_wait_timeout: 60000
auth:
kind: plaintext
kafka:
servers: "kafka:9092" # required
max_retries: 2 # optional, default: 2
required_acks: -1 # optional, default: -1
ack_timeout: 1000 # in milliseconds, optional, default: 1000
retry_backoff: 1000 # in milliseconds, optional, default: 1000
connect_timeout: 2000 # in milliseconds, optional, default: 2000
message_timeout: 55000 # in milliseconds, optional, default: 55000
# kafka_config: # optional, low-level custom Kafka options (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
# queue.buffering.max.messages: 100000
# queue.buffering.max.ms: 5
development:
<<: *default
test:
<<: *default
deliver: false
wait_on_queue_full: false
production:
<<: *default
The gem supports 2 variants: plaintext (default) and SASL-plaintext
SASL-plaintext:
auth:
kind: sasl_plaintext
sasl_username: user
sasl_password: pwd
sasl_mechanism: SCRAM-SHA-512
If you need to use another variant, use the low-level custom Kafka options kafka_config:
of config/kafka_producer.yml
. These options will overwrite the options in the auth section.
Example of SASL_SSL protocol auth via kafka_config
:
kafka_config:
security.protocol: SASL_SSL
sasl.username: user
sasl.password: pwd
ssl.ca.pem: ca_cert
sasl.mechanism: SCRAM-SHA-512
The servers
key is required and should be in rdkafka format: without kafka://
prefix, for example: srv1:port1,srv2:port2,...
.
The kafka_config
section may contain any rdkafka option
To create a producer that will be responsible for sending messages to Kafka, copy the following code:
# app/producers/some_producer.rb
class SomeProducer < Sbmt::KafkaProducer::BaseProducer
option :topic, default: -> { "topic" }
def publish(payload, **options)
sync_publish(payload, options)
# async_publish(payload, options)
end
end
Add the following lines to your config/outbox.yml
file in the transports
section:
outbox_items:
some_outbox_item:
transports:
sbmt/kafka_producer:
topic: 'topic'
kafka: # optional kafka options
required_acks: -1
To send a message to a Kafka topic, execute the following command:
SomeProducer.new.publish(payload, key: "123", headers: {"some-header" => "some-value"})
The gem collects base producing metrics using Yabeda. See metrics at YabedaConfigurer.
To stub a producer request to real Kafka broker, you can use a fake class. To do this, please add require "sbmt/kafka_producer/testing"
to the spec/rails_helper.rb
.
Install dip.
And run:
dip provision
dip rspec