Skip to content

Commit

Permalink
[STAFF-31] Add proper shutdown handling to Amqpx generic producers an…
Browse files Browse the repository at this point in the history
…d consumers (#138)

* feat: implement a terminate for producers and consumers

which turns off the channel normally when reason is
`:normal`, `:shutdown`, or `{:shutdown, term}`

* chore: extract the opts for the genserver from the start_link param

* fixup! chore: extract the opts for the genserver from the start_link param
  • Loading branch information
claudio-dalicandro authored Mar 17, 2023
1 parent 9541b98 commit 19769b0
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 9 deletions.
29 changes: 25 additions & 4 deletions lib/amqp/gen/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@ defmodule Amqpx.Gen.Consumer do
@callback handle_message_rejection(message :: any(), error :: any()) :: :ok | {:error, any()}
@optional_callbacks handle_message_rejection: 2

@gen_server_opts [:name, :timeout, :debug, :spawn_opt, :hibernate_after]

@spec start_link(opts :: map()) :: GenServer.server()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
{gen_server_opts, opts} = Map.split(opts, @gen_server_opts)

GenServer.start_link(__MODULE__, opts, Map.to_list(gen_server_opts))
end

def init(opts) do
Process.flag(:trap_exit, true)

state = struct(__MODULE__, opts)
Process.send(self(), :setup, [])
{:ok, state}
Expand Down Expand Up @@ -178,14 +185,28 @@ defmodule Amqpx.Gen.Consumer do

def terminate(_, %__MODULE__{channel: nil}), do: nil

def terminate(_, %__MODULE__{channel: channel}) do
if Process.alive?(channel.pid) do
Channel.close(channel)
def terminate(reason, %__MODULE__{channel: channel}) do
case reason do
:normal -> close_channel(channel)
:shutdown -> close_channel(channel)
{:shutdown, _} -> close_channel(channel)
_ -> :ok
end
end

# Private functions

defp close_channel(%{pid: pid} = channel) do
if Process.alive?(pid) do
Channel.close(channel)

receive do
{:DOWN, _, :process, ^pid, _reason} ->
:ok
end
end
end

defp handle_message(
message,
%{delivery_tag: tag, redelivered: redelivered} = meta,
Expand Down
35 changes: 30 additions & 5 deletions lib/amqp/gen/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ defmodule Amqpx.Gen.Producer do

@default_backoff [base_ms: 10, max_ms: 5_000]

@gen_server_opts [:name, :timeout, :debug, :spawn_opt, :hibernate_after]

defstruct [
:channel,
:publisher_confirms,
Expand All @@ -29,12 +31,20 @@ defmodule Amqpx.Gen.Producer do

# Public API

@spec start_link(opts :: map()) :: GenServer.server()
def start_link(opts) do
name = Map.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, opts, name: name)
gen_server_opts =
opts
|> Map.take(@gen_server_opts)
|> Map.to_list()
|> Keyword.put_new(:name, __MODULE__)

GenServer.start_link(__MODULE__, opts, gen_server_opts)
end

def init(opts) do
Process.flag(:trap_exit, true)

case validate_configuration(opts) do
{:ok, state} ->
Process.send(self(), :setup, [])
Expand Down Expand Up @@ -123,9 +133,12 @@ defmodule Amqpx.Gen.Producer do

def terminate(_, %__MODULE__{channel: nil}), do: nil

def terminate(_, %__MODULE__{channel: channel}) do
if Process.alive?(channel.pid) do
Channel.close(channel)
def terminate(reason, %__MODULE__{channel: channel}) do
case reason do
:normal -> close_channel(channel)
:shutdown -> close_channel(channel)
{:shutdown, _} -> close_channel(channel)
_ -> :ok
end
end

Expand Down Expand Up @@ -183,6 +196,18 @@ defmodule Amqpx.Gen.Producer do
end

# Private functions

defp close_channel(%{pid: pid} = channel) do
if Process.alive?(pid) do
Channel.close(channel)

receive do
{:DOWN, _, :process, ^pid, _reason} ->
:ok
end
end
end

def do_publish_by(producer_name, exchange_name, routing_key, payload, options, attempt) do
case GenServer.call(producer_name, {:publish, {exchange_name, routing_key, payload, options, attempt}}) do
:ok ->
Expand Down
44 changes: 44 additions & 0 deletions test/gen_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -548,4 +548,48 @@ defmodule Amqpx.Test.AmqpxTest do
end
end
end

test "the consumer should stop gracefully" do
tmp_ex = %{name: "tmp_ex", type: :topic, routing_keys: ["amqpx.tmp_ex"], opts: [durable: true]}

consumer_config = %{
queue: "tmp_q",
exchanges: [tmp_ex],
opts: [durable: true]
}

assert {:ok, test_chan} =
Amqpx.Gen.ConnectionManager
|> GenServer.call(:get_connection)
|> Amqpx.Channel.open(self())

assert :ok = Amqpx.Helper.setup_exchange(test_chan, tmp_ex)
assert :ok = Amqpx.Helper.setup_queue(test_chan, consumer_config)

test = self()

with_mock(Consumer1,
handle_message: fn _, _, s -> {:ok, s} end,
setup: fn channel ->
send(test, {:channel, channel})
{:ok, %{}}
end
) do
{:ok, consumer_pid} = Amqpx.Gen.Consumer.start_link(%{handler_module: Consumer1})
Process.unlink(consumer_pid)

assert_receive {:channel, %{pid: channel_pid}}

Process.monitor(consumer_pid)
Process.monitor(channel_pid)

Process.exit(consumer_pid, {:shutdown, :die_gracefully})

assert_receive {:DOWN, _, :process, pid_1, _}
assert_receive {:DOWN, _, :process, pid_2, _}

assert pid_1 in [consumer_pid, channel_pid]
assert pid_2 in [consumer_pid, channel_pid]
end
end
end

0 comments on commit 19769b0

Please sign in to comment.