Skip to content

Commit

Permalink
Updating broadway (bbalser#19)
Browse files Browse the repository at this point in the history
* updating deps (elsa, broadway)
* switching CI to github actions
  • Loading branch information
jeffgrunewald authored Mar 6, 2020
1 parent 35c50ee commit 5d8a4ec
Show file tree
Hide file tree
Showing 20 changed files with 197 additions and 114 deletions.
69 changes: 69 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
name: CI
on:
pull_request:
branches:
- master
jobs:
unit:
name: Unit Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-elixir@v1.2.0
with:
otp-version: 21.3
elixir-version: 1.8.2
- name: Get depedencies
run: |
mix local.rebar --force
mix local.hex --force
mix deps.get
- name: Run unit tests
run: |
mix test
integration:
name: Integration Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-elixir@v1.2.0
with:
otp-version: 21.3
elixir-version: 1.8.2
- name: Get dependencies
run: |
mix local.rebar --force
mix local.hex --force
mix deps.get
- name: Run integration tests
run: |
mix test.integration
static:
name: Static Analysis
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-elixir@v1.2.0
with:
otp-version: 21.3
elixir-version: 1.8.2
- name: Retrieve cached PLT
uses: actions/cache@v1
with:
path: .plt
key: plt-${{ github.head_ref }}
restore-keys: |
plt-${{ github.base_ref }}
plt-master
plt-
- name: Get dependencies
run: |
mix local.rebar --force
mix local.hex --force
mix deps.get
- name: Run formatter
run: |
mix format --check-formatted
- name: Run dialyzer
run: |
mix dialyzer
28 changes: 28 additions & 0 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Master
on:
push:
branches:
- master
jobs:
dialyzer:
name: Cache PLT
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-elixir@v1.2.0
with:
otp-version: 21.3
elixir-version: 1.8.2
- name: Get dependencies
run: |
mix local.rebar --force
mix local.hex --force
mix deps.get
- name: Retrieve PLT
uses: actions/cache@v1
with:
path: .plt
key: plt-master
- name: Run dialyzer
run: |
mix dialyzer
24 changes: 24 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Release
on:
release:
types: [created]
jobs:
publish:
name: Hex Publish
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-elixir@v1.2.0
with:
otp-version: 21.3
elixir-version: 1.8.2
- name: Build
run: |
mix local.rebar --force
mix local.hex --force
mix deps.get
- name: Publish
env:
HEX_API_KEY: ${{ secrets.HEX_API_KEY }}
run: |
mix hex.publish --yes
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ erl_crash.dump
off_broadway_kafka-*.tar

.elixir_ls/

/.plt/
Empty file added .plt/.gitignore
Empty file.
32 changes: 0 additions & 32 deletions .travis.yml

This file was deleted.

10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ It communicates with Kafka using the [Elsa](https://hex.pm/packages/elsa)
Elixir library, which itself uses the [Brod](https://hex.pm/packages/brod)
Erlang library.

It can dynamically create Broadway stages on a per-topic or per-partition basis
for a given Kafka topic.
It can dynamically create Broadway concurrency stages on a per-topic or per-partition
basis for a given Kafka topic.

## Installation

Expand Down Expand Up @@ -57,11 +57,11 @@ defmodule ClassicBroadway do
name: __MODULE__,
producer: [
module: {OffBroadway.Kafka.Producer, kafka_config},
stages: 1
concurrency: 1
],
processors: [
default: [
stages: 1
concurrency: 1
]
],
context: %{pid: Keyword.get(opts, :pid)}
Expand Down Expand Up @@ -107,7 +107,7 @@ defmodule ShowtimeBroadway do
name: :"broadway_per_partition_#{topic}_#{partition}",
processors: [
default: [
stages: 5
concurrency: 5
]
],
context: %{
Expand Down
2 changes: 1 addition & 1 deletion lib/off_broadway/kafka.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule OffBroadway.Kafka do
name: :"broadway_per_partition_#{topic}_#{partition}",
processors: [
default: [
stages: 5
concurrency: 5
]
],
context: %{
Expand Down
2 changes: 1 addition & 1 deletion lib/off_broadway/kafka/showtime_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule OffBroadway.Kafka.ShowtimeHandler do

producer = [
module: {OffBroadway.Kafka.Producer, [connection: connection()]},
stages: 1
concurrency: 1
]

broadway_config =
Expand Down
21 changes: 10 additions & 11 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule OffBroadwayKafka.MixProject do
def project do
[
app: :off_broadway_kafka,
version: "1.0.0",
version: "1.0.1",
elixir: "~> 1.8",
start_permanent: Mix.env() == :prod,
deps: deps(),
Expand All @@ -16,31 +16,30 @@ defmodule OffBroadwayKafka.MixProject do
homepage_url: @github,
docs: docs(),
elixirc_paths: elixirc_paths(Mix.env()),
test_paths: test_paths(Mix.env())
test_paths: test_paths(Mix.env()),
dialyzer: [plt_file: {:no_warn, ".plt/dialyzer.plt"}]
]
end

# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger]
]
end

# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:elsa, "~> 0.11.1"},
{:broadway, "~> 0.5.0"},
{:retry, "~> 0.13.0"},
{:elsa, "~> 0.12"},
{:broadway, "~> 0.6"},
{:retry, "~> 0.13"},
{:placebo, "~> 1.2", only: [:dev, :test, :integration]},
{:checkov, "~> 0.5.0", only: [:dev, :test, :integration]},
{:checkov, "~> 0.5", only: [:dev, :test, :integration]},
{:divo, "~> 1.1", only: [:dev, :integration]},
{:divo_kafka, "~> 0.1.6", only: [:dev, :integration]},
{:patiently, "~> 0.2.0", only: [:test, :integration], override: true},
{:ex_doc, "~> 0.21.2", only: [:dev], runtime: false},
{:patiently, "~> 0.2", only: [:test, :integration], override: true},
{:ex_doc, "~> 0.21", only: [:dev], runtime: false},
{:benchee, "~> 1.0", only: [:integration]},
{:dialyxir, "~> 0.5", only: [:dev, :test], runtime: false}
{:dialyxir, "~> 1.0.0-rc.7", only: [:dev], runtime: false}
]
end

Expand Down
50 changes: 26 additions & 24 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
%{
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm"},
"broadway": {:hex, :broadway, "0.5.0", "6fdbbb00f2fa06ea4352df4db88d9d8a71ee78279570ef92bef840f4b3c9340f", [:mix], [{:gen_stage, "~> 0.14", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm"},
"brod": {:hex, :brod, "3.9.3", "edc5525e1e565c58eec7e36118c74c074b67647a4ae7c154444ffe3f709b0869", [:make, :rebar, :rebar3], [{:kafka_protocol, "2.3.2", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.8", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm"},
"checkov": {:hex, :checkov, "0.5.0", "68a5e118293775386ce98b58fe14bb29e659b22a7915d1a825dfa8dad503015c", [:mix], [], "hexpm"},
"crc32cer": {:hex, :crc32cer, "0.1.3", "8984906c4b4fae6aa292c48f286a1c83b19ad44bd102287acb94d696015967ce", [:make, :rebar, :rebar3], [], "hexpm"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm"},
"divo": {:hex, :divo, "1.1.9", "6f91b0a02bd97800eb9a99abd771b4c9b67d282b67abc223eb2832b93f557b7e", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:patiently, "~> 0.2", [hex: :patiently, repo: "hexpm", optional: false]}], "hexpm"},
"divo_kafka": {:hex, :divo_kafka, "0.1.6", "dffaa5d419d75e6607b581187347e6fef18b9d06d517a0f7a49772b52f60115f", [:mix], [{:divo, "~> 1.1", [hex: :divo, repo: "hexpm", optional: false]}], "hexpm"},
"earmark": {:hex, :earmark, "1.4.2", "3aa0bd23bc4c61cf2f1e5d752d1bb470560a6f8539974f767a38923bb20e1d7f", [:mix], [], "hexpm"},
"elsa": {:hex, :elsa, "0.11.1", "afd6101142726c6007e4f28a79edec05705938a665f78c84111539eb69cecf39", [:mix], [{:brod, "~> 3.9", [hex: :brod, repo: "hexpm", optional: false]}], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.21.2", "caca5bc28ed7b3bdc0b662f8afe2bee1eedb5c3cf7b322feeeb7c6ebbde089d6", [:mix], [{:earmark, "~> 1.3.3 or ~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
"gen_stage": {:hex, :gen_stage, "0.14.3", "d0c66f1c87faa301c1a85a809a3ee9097a4264b2edf7644bf5c123237ef732bf", [:mix], [], "hexpm"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"kafka_protocol": {:hex, :kafka_protocol, "2.3.2", "b518dddcd6142f975a7de017ebb77b6dbc52ec8f50b44b49de9ebcd279951f74", [:rebar, :rebar3], [{:crc32cer, "0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "1.2.4", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm"},
"makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
"meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.2", "1d71150d5293d703a9c38d4329da57d3935faed2031d64bc19e77b654ef2d177", [:mix], [], "hexpm"},
"patiently": {:hex, :patiently, "0.2.0", "67eb139591e10c4b363ae0198e832552f191c58894731efd3bf124ec4722267a", [:mix], [], "hexpm"},
"placebo": {:hex, :placebo, "1.2.2", "a3d47906b01844bfd04ab0351a605620619fdb8f011225e406696f96a88ff380", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
"retry": {:hex, :retry, "0.13.0", "bb9b2713f70f39337837852337ad280c77662574f4fb852a8386c269f3d734c4", [:mix], [], "hexpm"},
"snappyer": {:hex, :snappyer, "1.2.4", "6d739c534cd2339633127a2b40279be71f149e5842c5363a4d88e66efb7c1fec", [:make, :rebar, :rebar3], [], "hexpm"},
"supervisor3": {:hex, :supervisor3, "1.1.8", "5cf95c95342b589ec8d74689eea0646c0a3eb92820241e0c2d0ca4c104df92bc", [:make, :rebar, :rebar3], [], "hexpm"},
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"},
"broadway": {:hex, :broadway, "0.6.0", "ef14ec31efd2044c1b280691a46e022dbfe557044da97414678e8ee996673746", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d47c11b315d50bc15a712dbaf1e8ceba8a2ec4f50b53146192f40bb733ca051"},
"brod": {:hex, :brod, "3.9.5", "b311cdb78a359fdea8998ad98ceefb3bfedfdb513a272993e2d150bc6a58108a", [:make, :rebar, :rebar3], [{:kafka_protocol, "2.3.3", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.8", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "b14727482de1d00529c36190979283fd7e90a5aa4a5bec91472406c47bacb2ee"},
"checkov": {:hex, :checkov, "0.5.0", "68a5e118293775386ce98b58fe14bb29e659b22a7915d1a825dfa8dad503015c", [:mix], [], "hexpm", "f51d97dd6964ca0279ff12d5641ae1e99a8080bf3bf25b3b8370b9418b51c9b9"},
"crc32cer": {:hex, :crc32cer, "0.1.3", "8984906c4b4fae6aa292c48f286a1c83b19ad44bd102287acb94d696015967ce", [:make, :rebar, :rebar3], [], "hexpm", "e35840bfd312192748bf177e92e85270e2bf0bbc01462da1f7afd4298edae4a7"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
"dialyxir": {:hex, :dialyxir, "1.0.0-rc.7", "6287f8f2cb45df8584317a4be1075b8c9b8a69de8eeb82b4d9e6c761cf2664cd", [:mix], [{:erlex, ">= 0.2.5", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "506294d6c543e4e5282d4852aead19ace8a35bedeb043f9256a06a6336827122"},
"divo": {:hex, :divo, "1.1.9", "6f91b0a02bd97800eb9a99abd771b4c9b67d282b67abc223eb2832b93f557b7e", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:patiently, "~> 0.2", [hex: :patiently, repo: "hexpm", optional: false]}], "hexpm", "b0edcd689089d723802c2d582bab54a77725f673445aa474eea259448910c252"},
"divo_kafka": {:hex, :divo_kafka, "0.1.6", "dffaa5d419d75e6607b581187347e6fef18b9d06d517a0f7a49772b52f60115f", [:mix], [{:divo, "~> 1.1", [hex: :divo, repo: "hexpm", optional: false]}], "hexpm", "cbc408a8b6593784524b5fee09aae0e9cc58328a174fc6d3f337d9fb34b4bc62"},
"earmark": {:hex, :earmark, "1.4.2", "3aa0bd23bc4c61cf2f1e5d752d1bb470560a6f8539974f767a38923bb20e1d7f", [:mix], [], "hexpm", "5e8806285d8a3a8999bd38e4a73c58d28534c856bc38c44818e5ba85bbda16fb"},
"elsa": {:hex, :elsa, "0.12.1", "6fa0398383ee290f7af64b4dce67848e2106d8ceec25cba6bf6b52bd80a400b2", [:mix], [{:brod, "~> 3.9", [hex: :brod, repo: "hexpm", optional: false]}], "hexpm", "ab4f3e279ace1724e11d1cca38cc1ea70b640224e801b2bbce06d7dba75d807d"},
"erlex": {:hex, :erlex, "0.2.5", "e51132f2f472e13d606d808f0574508eeea2030d487fc002b46ad97e738b0510", [:mix], [], "hexpm", "756d3e19b056339af674b715fdd752c5dac468cf9d0e2d1a03abf4574e99fbf8"},
"ex_doc": {:hex, :ex_doc, "0.21.2", "caca5bc28ed7b3bdc0b662f8afe2bee1eedb5c3cf7b322feeeb7c6ebbde089d6", [:mix], [{:earmark, "~> 1.3.3 or ~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f1155337ae17ff7a1255217b4c1ceefcd1860b7ceb1a1874031e7a861b052e39"},
"gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
"kafka_protocol": {:hex, :kafka_protocol, "2.3.3", "a43eaf1768a1a0300e81c356f543f9e6c02576c5a6a3301a1e5eea7eeede7435", [:rebar, :rebar3], [{:crc32cer, "0.1.3", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "1.2.4", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "3405854af32823697255bda9550f6d6e7267a01769a5df97923f1aa479ce53be"},
"makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"},
"meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm", "d34f013c156db51ad57cc556891b9720e6a1c1df5fe2e15af999c84d6cebeb1a"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.2", "1d71150d5293d703a9c38d4329da57d3935faed2031d64bc19e77b654ef2d177", [:mix], [], "hexpm", "51aa192e0941313c394956718bdb1e59325874f88f45871cff90345b97f60bba"},
"patiently": {:hex, :patiently, "0.2.0", "67eb139591e10c4b363ae0198e832552f191c58894731efd3bf124ec4722267a", [:mix], [], "hexpm", "c08cc5edc27def565647a9b55a0bea8025a5f81a4472e57692f28f2292c44c94"},
"placebo": {:hex, :placebo, "1.2.2", "a3d47906b01844bfd04ab0351a605620619fdb8f011225e406696f96a88ff380", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "ba29f0f3e3c5045bb7ba769db907fcedb179c95d9a334cb8ff17332965a459ca"},
"retry": {:hex, :retry, "0.13.0", "bb9b2713f70f39337837852337ad280c77662574f4fb852a8386c269f3d734c4", [:mix], [], "hexpm", "f9da7547ffe15d84f880d4762c76661df5d5a5dd38d55860081e73921ac59fa3"},
"snappyer": {:hex, :snappyer, "1.2.4", "6d739c534cd2339633127a2b40279be71f149e5842c5363a4d88e66efb7c1fec", [:make, :rebar, :rebar3], [], "hexpm", "76abb8ed503722e10ee0f587956662881af9009067efdec87798a03db3527184"},
"supervisor3": {:hex, :supervisor3, "1.1.8", "5cf95c95342b589ec8d74689eea0646c0a3eb92820241e0c2d0ca4c104df92bc", [:make, :rebar, :rebar3], [], "hexpm", "4814b4d4343e777cc724312a588061828703f05149129cda2cb30d14105b1128"},
"telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
}
4 changes: 0 additions & 4 deletions scripts/deploy.sh

This file was deleted.

13 changes: 0 additions & 13 deletions scripts/set_release_number.sh

This file was deleted.

6 changes: 3 additions & 3 deletions test/integration/acking_events_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,14 @@ defmodule AckingEventBroadway do
name: :"#{Keyword.fetch!(opts, :connection)}_broadway",
producer: [
module: {OffBroadway.Kafka.Producer, kafka_config},
stages: 1
concurrency: 1
],
processors: [
default: [stages: 8]
default: [concurrency: 8]
],
batchers: [
default: [
stages: 1,
concurrency: 1,
batch_size: 1_000,
batch_timeout: 2_000
]
Expand Down
4 changes: 2 additions & 2 deletions test/integration/off_broadway_kafka_classic_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ defmodule ClassicBroadway do
name: __MODULE__,
producer: [
module: {OffBroadway.Kafka.Producer, kafka_config},
stages: 1
concurrency: 1
],
processors: [
default: [
stages: 1
concurrency: 1
]
],
context: %{pid: Keyword.get(opts, :pid)}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/off_broadway_kafka_per_partition_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule PerPartition do
[
name: :"broadway_per_partition_#{topic}_#{partition}",
processors: [
default: [stages: 5]
default: [concurrency: 5]
],
context: %{
pid: Keyword.get(opts, :pid)
Expand Down
Loading

0 comments on commit 5d8a4ec

Please sign in to comment.