Skip to content

Commit

Permalink
Stop receiving messages on draining
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim committed Jun 21, 2024
1 parent 5c432c0 commit fb44279
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
15 changes: 7 additions & 8 deletions lib/broadway_cloud_pub_sub/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ defmodule BroadwayCloudPubSub.Producer do
{:producer,
%{
demand: 0,
draining: false,
receive_timer: nil,
receive_interval: receive_interval,
client: {client, config},
Expand Down Expand Up @@ -193,10 +194,6 @@ defmodule BroadwayCloudPubSub.Producer do
end

@impl true
def handle_info(:receive_messages, %{receive_timer: nil} = state) do
{:noreply, [], state}
end

def handle_info(:receive_messages, state) do
handle_receive_messages(%{state | receive_timer: nil})
end
Expand Down Expand Up @@ -226,14 +223,16 @@ defmodule BroadwayCloudPubSub.Producer do
end

@impl Producer
def prepare_for_draining(%{receive_timer: receive_timer} = state) do
receive_timer && Process.cancel_timer(receive_timer)

def prepare_for_draining(state) do
if state.worker_task do
Task.shutdown(state.worker_task, :brutal_kill)
end

{:noreply, [], %{state | receive_timer: nil, worker_task: nil}}
{:noreply, [], %{state | worker_task: nil, draining: true}}
end

defp handle_receive_messages(%{draining: true} = state) do
{:noreply, [], state}
end

defp handle_receive_messages(%{receive_timer: nil, demand: demand, worker_task: nil} = state)
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"earmark_parser": {:hex, :earmark_parser, "1.4.29", "149d50dcb3a93d9f3d6f3ecf18c918fb5a2d3c001b5d3305c926cddfbd33355b", [:mix], [], "hexpm", "4902af1b3eb139016aed210888748db8070b8125c2342ce3dcae4f38dcc63503"},
"ex_doc": {:hex, :ex_doc, "0.29.0", "4a1cb903ce746aceef9c1f9ae8a6c12b742a5461e6959b9d3b24d813ffbea146", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "f096adb8bbca677d35d278223361c7792d496b3fc0d0224c9d4bc2f651af5db1"},
"finch": {:hex, :finch, "0.9.0", "8b772324aebafcaba763f1dffaa3e7f52f8c4e52485f50f48bbb2f42219a2e87", [:mix], [{:castore, "~> 0.1", [hex: :castore, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.5", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a93bfcad9ca50fa3cb2d459f27667d9a87cfbb7fecf9b29b2e78a50bc2ab445d"},
"gen_stage": {:hex, :gen_stage, "1.1.1", "78d83b14ca742f4c252770bcdf674d83378ca41579c387c57e2f06d70f596317", [:mix], [], "hexpm", "eb90d2d72609050a66ce42b7d4a69323a60c892a09ead0680d5d8ef16b9a034e"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
"goth": {:hex, :goth, "1.4.2", "a598dfbce6fe65db3f5f43b1ab2ce8fbe3b2fe20a7569ad62d71c11c0ddc3f41", [:mix], [{:finch, "~> 0.9", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "d51bb6544dc551fe5754ab72e6cf194120b3c06d924282aaa3321a516ed3b98a"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
Expand Down

0 comments on commit fb44279

Please sign in to comment.