diff --git a/lib/broadway_cloud_pub_sub/options.ex b/lib/broadway_cloud_pub_sub/options.ex index 0289879..d2681ec 100644 --- a/lib/broadway_cloud_pub_sub/options.ex +++ b/lib/broadway_cloud_pub_sub/options.ex @@ -74,6 +74,16 @@ defmodule BroadwayCloudPubSub.Options do """, default: :ack ], + on_draining: [ + type: + {:custom, __MODULE__, :type_atom_action_or_nack_with_bounded_integer, + [[{:name, :on_success}, {:min, 0}, {:max, 600}]]}, + doc: """ + Configures the acking behaviour for the non consumed messages when draining. + See the "Acknowledgements" section below for all the possible values. + """, + default: {:nack, 0} + ], receive_interval: [ type: :integer, default: @default_receive_interval, diff --git a/lib/broadway_cloud_pub_sub/producer.ex b/lib/broadway_cloud_pub_sub/producer.ex index 7050c90..1177461 100644 --- a/lib/broadway_cloud_pub_sub/producer.ex +++ b/lib/broadway_cloud_pub_sub/producer.ex @@ -138,6 +138,7 @@ defmodule BroadwayCloudPubSub.Producer do def init(opts) do receive_interval = opts[:receive_interval] client = opts[:client] + on_draining = opts[:on_draining] {:ok, config} = client.init(opts) ack_ref = opts[:broadway][:name] @@ -146,6 +147,7 @@ defmodule BroadwayCloudPubSub.Producer do %{ demand: 0, draining: false, + on_draining: on_draining, receive_timer: nil, receive_interval: receive_interval, client: {client, config}, @@ -205,6 +207,18 @@ defmodule BroadwayCloudPubSub.Producer do handle_receive_messages(%{state | receive_timer: nil}) end + def handle_info( + {ref, messages}, + %{draining: true, on_draining: on_draining, worker_task: %{ref: ref}} = state + ) do + # Drain messages, not acknowledging them and setting the ack to draining_deadline + messages + |> Enum.map(&Broadway.Message.configure_ack(&1, on_success: on_draining)) + |> Broadway.Message.ack_immediately() + + {:noreply, [], %{state | worker_task: nil}} + end + def handle_info({ref, messages}, %{demand: demand, worker_task: %{ref: ref}} = state) do new_demand = demand - length(messages) @@ -231,11 +245,7 @@ defmodule BroadwayCloudPubSub.Producer do @impl Producer def prepare_for_draining(state) do - if state.worker_task do - Task.shutdown(state.worker_task, :brutal_kill) - end - - {:noreply, [], %{state | worker_task: nil, draining: true}} + {:noreply, [], %{state | draining: true}} end defp handle_receive_messages(%{draining: true} = state) do