diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index 2392739..1db3833 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -535,17 +535,15 @@ end context 'when we abort the nested transaction' do + subject(:producer) { build(:transactional_producer, queue_buffering_max_ms: 5_000) } + it 'expect to abort all levels' do handlers = [] producer.transaction do handlers << producer.produce_async(topic: 'example_topic', payload: 'data') - sleep(0.1) - producer.transaction do - sleep(0.1) - handlers << producer.produce_async(topic: 'example_topic', payload: 'data') raise(WaterDrop::AbortTransaction) end diff --git a/spec/support/factories/producer.rb b/spec/support/factories/producer.rb index 2fef6f4..82f88d5 100644 --- a/spec/support/factories/producer.rb +++ b/spec/support/factories/producer.rb @@ -43,6 +43,7 @@ transaction_timeout_ms { 30_000 } request_required_acks { 'all' } idempotent { true } + queue_buffering_max_ms { 5 } end kafka do @@ -52,7 +53,8 @@ 'transactional.id': transactional_id, 'transaction.timeout.ms': transaction_timeout_ms, 'message.timeout.ms': transaction_timeout_ms, - 'enable.idempotence': idempotent + 'enable.idempotence': idempotent, + 'queue.buffering.max.ms': queue_buffering_max_ms } end end