From fb442793f264e4abd82f8bbb4870d9de028c3a50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Fri, 21 Jun 2024 11:53:53 +0200 Subject: [PATCH] Stop receiving messages on draining --- lib/broadway_cloud_pub_sub/producer.ex | 15 +++++++-------- mix.lock | 2 +- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/lib/broadway_cloud_pub_sub/producer.ex b/lib/broadway_cloud_pub_sub/producer.ex index fbd58f1..6a24f33 100644 --- a/lib/broadway_cloud_pub_sub/producer.ex +++ b/lib/broadway_cloud_pub_sub/producer.ex @@ -145,6 +145,7 @@ defmodule BroadwayCloudPubSub.Producer do {:producer, %{ demand: 0, + draining: false, receive_timer: nil, receive_interval: receive_interval, client: {client, config}, @@ -193,10 +194,6 @@ defmodule BroadwayCloudPubSub.Producer do end @impl true - def handle_info(:receive_messages, %{receive_timer: nil} = state) do - {:noreply, [], state} - end - def handle_info(:receive_messages, state) do handle_receive_messages(%{state | receive_timer: nil}) end @@ -226,14 +223,16 @@ defmodule BroadwayCloudPubSub.Producer do end @impl Producer - def prepare_for_draining(%{receive_timer: receive_timer} = state) do - receive_timer && Process.cancel_timer(receive_timer) - + def prepare_for_draining(state) do if state.worker_task do Task.shutdown(state.worker_task, :brutal_kill) end - {:noreply, [], %{state | receive_timer: nil, worker_task: nil}} + {:noreply, [], %{state | worker_task: nil, draining: true}} + end + + defp handle_receive_messages(%{draining: true} = state) do + {:noreply, [], state} end defp handle_receive_messages(%{receive_timer: nil, demand: demand, worker_task: nil} = state) diff --git a/mix.lock b/mix.lock index bcd19b7..6b08479 100644 --- a/mix.lock +++ b/mix.lock @@ -8,7 +8,7 @@ "earmark_parser": {:hex, :earmark_parser, "1.4.29", "149d50dcb3a93d9f3d6f3ecf18c918fb5a2d3c001b5d3305c926cddfbd33355b", [:mix], [], "hexpm", "4902af1b3eb139016aed210888748db8070b8125c2342ce3dcae4f38dcc63503"}, "ex_doc": {:hex, :ex_doc, "0.29.0", "4a1cb903ce746aceef9c1f9ae8a6c12b742a5461e6959b9d3b24d813ffbea146", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "f096adb8bbca677d35d278223361c7792d496b3fc0d0224c9d4bc2f651af5db1"}, "finch": {:hex, :finch, "0.9.0", "8b772324aebafcaba763f1dffaa3e7f52f8c4e52485f50f48bbb2f42219a2e87", [:mix], [{:castore, "~> 0.1", [hex: :castore, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.5", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a93bfcad9ca50fa3cb2d459f27667d9a87cfbb7fecf9b29b2e78a50bc2ab445d"}, - "gen_stage": {:hex, :gen_stage, "1.1.1", "78d83b14ca742f4c252770bcdf674d83378ca41579c387c57e2f06d70f596317", [:mix], [], "hexpm", "eb90d2d72609050a66ce42b7d4a69323a60c892a09ead0680d5d8ef16b9a034e"}, + "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "goth": {:hex, :goth, "1.4.2", "a598dfbce6fe65db3f5f43b1ab2ce8fbe3b2fe20a7569ad62d71c11c0ddc3f41", [:mix], [{:finch, "~> 0.9", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "d51bb6544dc551fe5754ab72e6cf194120b3c06d924282aaa3321a516ed3b98a"}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},