Skip to content

Commit

Permalink
feat: implement terminal_pool/2 callback
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Nov 5, 2023
1 parent a33ec58 commit 9ccb54f
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 2 deletions.
28 changes: 26 additions & 2 deletions lib/nimble_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,33 @@ defmodule NimblePool do
) ::
{:ok, worker_state} | {:remove, user_reason()} | {:stop, user_reason()}

@doc """
Handle pool termination.
The `reason` argmument is the same given to GenServer's terminate/2 callback.
No worker terminantion need to be done here because `terminate_worker/3`
callback is already called for every worker before `terminate_pool/2`
This should be used only for clean up extra resources that can not be
handled by `terminate_worker/3` callback.
This callback is optional.
"""
@doc callback: :pool
@callback terminate_pool(
reason :: :DOWN | :timeout | :throw | :error | :exit | user_reason,
pool_state
) :: :ok

@optional_callbacks init_pool: 1,
handle_checkin: 4,
handle_info: 2,
handle_enqueue: 2,
handle_update: 3,
handle_ping: 2,
terminate_worker: 3
terminate_worker: 3,
terminate_pool: 2

@doc """
Defines a pool to be started under the supervision tree.
Expand Down Expand Up @@ -679,11 +699,15 @@ defmodule NimblePool do
end

@impl true
def terminate(reason, %{resources: resources} = state) do
def terminate(reason, %{worker: worker, resources: resources} = state) do
for {worker_server_state, _} <- :queue.to_list(resources) do
maybe_terminate_worker(reason, worker_server_state, state)
end

if function_exported?(worker, :terminate_pool, 2) do
worker.terminate_pool(reason, state)
end

:ok
end

Expand Down
88 changes: 88 additions & 0 deletions test/nimble_pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,22 @@ defmodule NimblePoolTest do
next(instructions, :terminate_worker, &[reason, &1, pool_state])
end

def terminate_pool(reason, pool_state) do
# Since this callback does not receive any specific worker
# We can get the first one to read the instructions
instructions = pool_state.resources |> elem(0) |> List.first() |> elem(0)
# We always allow skip ahead on terminate
instructions = Enum.drop_while(instructions, &(elem(&1, 0) != :terminate_pool))
next(instructions, :terminate_pool, fn _ -> [reason, pool_state] end)
end

defp next([{instruction, return} | instructions], instruction, args) do
apply(return, args.(instructions))
end

# Always accept pool termination as a valid instruction
defp next([], :terminate_pool, _args), do: :ok

defp next(instructions, instruction, _args) do
raise "expected #{inspect(instruction)}, state was #{inspect(instructions)}"
end
Expand All @@ -55,6 +67,12 @@ defmodule NimblePoolTest do
[{^instruction, return} | instructions] when is_function(return) ->
{return, instructions}

# Always accept terminate_pool as a valida instruction when there is no more instructions
[] = state ->
if instruction == :terminate_pool,
do: {fn _, _ -> :ok end, []},
else: raise("expected #{inspect(instruction)}, state was #{inspect(state)}")

state ->
raise "expected #{inspect(instruction)}, state was #{inspect(state)}"
end)
Expand Down Expand Up @@ -97,6 +115,10 @@ defmodule NimblePoolTest do
def terminate_worker(reason, worker_state, pid) do
TestAgent.next(pid, :terminate_worker, [reason, worker_state, pid])
end

def terminate_pool(reason, pool_state) do
TestAgent.next(pool_state.state, :terminate_pool, [reason, pool_state])
end
end

defp stateless_pool!(instructions, opts \\ []) do
Expand Down Expand Up @@ -1641,4 +1663,70 @@ defmodule NimblePoolTest do
assert_receive {:DOWN, _, :process, ^pool, {:shutdown, :some_reason}}
end
end

describe "terminate_pool" do
test "should terminate workers and call parent when terminating" do
parent = self()

pool =
stateless_pool!(
init_pool: fn next ->
{:ok, next}
end,
init_worker: fn next -> {:ok, next} end,
terminate_worker: fn reason, _, state ->
send(parent, {:terminate_worker, reason})
{:ok, state}
end,
terminate_pool: fn reason, _pool_state ->
send(parent, {:terminate_pool, reason})
:ok
end
)

Process.monitor(pool)

NimblePool.stop(pool, :shutdown)

assert_received {:terminate_worker, :shutdown}
assert_received {:terminate_pool, :shutdown}
end

test "should terminate workers and call parent when terminating - statefull pool" do
parent = self()

{_, pool} =
stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
init_worker: fn next -> {:ok, :worker2, next} end,
init_worker: fn next -> {:ok, :worker3, next} end,
terminate_worker: fn reason, worker, state ->
send(parent, {:terminated_worker, worker, reason})
{:ok, state}
end,
terminate_worker: fn reason, worker, state ->
send(parent, {:terminated_worker, worker, reason})
{:ok, state}
end,
terminate_worker: fn reason, worker, state ->
send(parent, {:terminated_worker, worker, reason})
{:ok, state}
end,
terminate_pool: fn reason, _state ->
send(parent, {:terminated_pool, reason})
:ok
end
],
pool_size: 3
)

NimblePool.stop(pool, :shutdown)

assert_receive {:terminated_worker, :worker1, :shutdown}
assert_receive {:terminated_worker, :worker2, :shutdown}
assert_receive {:terminated_worker, :worker3, :shutdown}
assert_receive {:terminated_pool, :shutdown}
end
end
end

0 comments on commit 9ccb54f

Please sign in to comment.