From 9ccb54f928afd7a204beffece557e2122ecd799e Mon Sep 17 00:00:00 2001 From: Gabriel Oliveira Date: Sat, 4 Nov 2023 22:34:55 -0300 Subject: [PATCH] feat: implement terminal_pool/2 callback --- lib/nimble_pool.ex | 28 ++++++++++++- test/nimble_pool_test.exs | 88 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 2 deletions(-) diff --git a/lib/nimble_pool.ex b/lib/nimble_pool.ex index b1aff0e..f4f7d1e 100644 --- a/lib/nimble_pool.ex +++ b/lib/nimble_pool.ex @@ -278,13 +278,33 @@ defmodule NimblePool do ) :: {:ok, worker_state} | {:remove, user_reason()} | {:stop, user_reason()} + @doc """ + Handle pool termination. + + The `reason` argmument is the same given to GenServer's terminate/2 callback. + + No worker terminantion need to be done here because `terminate_worker/3` + callback is already called for every worker before `terminate_pool/2` + + This should be used only for clean up extra resources that can not be + handled by `terminate_worker/3` callback. + + This callback is optional. + """ + @doc callback: :pool + @callback terminate_pool( + reason :: :DOWN | :timeout | :throw | :error | :exit | user_reason, + pool_state + ) :: :ok + @optional_callbacks init_pool: 1, handle_checkin: 4, handle_info: 2, handle_enqueue: 2, handle_update: 3, handle_ping: 2, - terminate_worker: 3 + terminate_worker: 3, + terminate_pool: 2 @doc """ Defines a pool to be started under the supervision tree. @@ -679,11 +699,15 @@ defmodule NimblePool do end @impl true - def terminate(reason, %{resources: resources} = state) do + def terminate(reason, %{worker: worker, resources: resources} = state) do for {worker_server_state, _} <- :queue.to_list(resources) do maybe_terminate_worker(reason, worker_server_state, state) end + if function_exported?(worker, :terminate_pool, 2) do + worker.terminate_pool(reason, state) + end + :ok end diff --git a/test/nimble_pool_test.exs b/test/nimble_pool_test.exs index a9c9676..afc2252 100644 --- a/test/nimble_pool_test.exs +++ b/test/nimble_pool_test.exs @@ -33,10 +33,22 @@ defmodule NimblePoolTest do next(instructions, :terminate_worker, &[reason, &1, pool_state]) end + def terminate_pool(reason, pool_state) do + # Since this callback does not receive any specific worker + # We can get the first one to read the instructions + instructions = pool_state.resources |> elem(0) |> List.first() |> elem(0) + # We always allow skip ahead on terminate + instructions = Enum.drop_while(instructions, &(elem(&1, 0) != :terminate_pool)) + next(instructions, :terminate_pool, fn _ -> [reason, pool_state] end) + end + defp next([{instruction, return} | instructions], instruction, args) do apply(return, args.(instructions)) end + # Always accept pool termination as a valid instruction + defp next([], :terminate_pool, _args), do: :ok + defp next(instructions, instruction, _args) do raise "expected #{inspect(instruction)}, state was #{inspect(instructions)}" end @@ -55,6 +67,12 @@ defmodule NimblePoolTest do [{^instruction, return} | instructions] when is_function(return) -> {return, instructions} + # Always accept terminate_pool as a valida instruction when there is no more instructions + [] = state -> + if instruction == :terminate_pool, + do: {fn _, _ -> :ok end, []}, + else: raise("expected #{inspect(instruction)}, state was #{inspect(state)}") + state -> raise "expected #{inspect(instruction)}, state was #{inspect(state)}" end) @@ -97,6 +115,10 @@ defmodule NimblePoolTest do def terminate_worker(reason, worker_state, pid) do TestAgent.next(pid, :terminate_worker, [reason, worker_state, pid]) end + + def terminate_pool(reason, pool_state) do + TestAgent.next(pool_state.state, :terminate_pool, [reason, pool_state]) + end end defp stateless_pool!(instructions, opts \\ []) do @@ -1641,4 +1663,70 @@ defmodule NimblePoolTest do assert_receive {:DOWN, _, :process, ^pool, {:shutdown, :some_reason}} end end + + describe "terminate_pool" do + test "should terminate workers and call parent when terminating" do + parent = self() + + pool = + stateless_pool!( + init_pool: fn next -> + {:ok, next} + end, + init_worker: fn next -> {:ok, next} end, + terminate_worker: fn reason, _, state -> + send(parent, {:terminate_worker, reason}) + {:ok, state} + end, + terminate_pool: fn reason, _pool_state -> + send(parent, {:terminate_pool, reason}) + :ok + end + ) + + Process.monitor(pool) + + NimblePool.stop(pool, :shutdown) + + assert_received {:terminate_worker, :shutdown} + assert_received {:terminate_pool, :shutdown} + end + + test "should terminate workers and call parent when terminating - statefull pool" do + parent = self() + + {_, pool} = + stateful_pool!( + [ + init_worker: fn next -> {:ok, :worker1, next} end, + init_worker: fn next -> {:ok, :worker2, next} end, + init_worker: fn next -> {:ok, :worker3, next} end, + terminate_worker: fn reason, worker, state -> + send(parent, {:terminated_worker, worker, reason}) + {:ok, state} + end, + terminate_worker: fn reason, worker, state -> + send(parent, {:terminated_worker, worker, reason}) + {:ok, state} + end, + terminate_worker: fn reason, worker, state -> + send(parent, {:terminated_worker, worker, reason}) + {:ok, state} + end, + terminate_pool: fn reason, _state -> + send(parent, {:terminated_pool, reason}) + :ok + end + ], + pool_size: 3 + ) + + NimblePool.stop(pool, :shutdown) + + assert_receive {:terminated_worker, :worker1, :shutdown} + assert_receive {:terminated_worker, :worker2, :shutdown} + assert_receive {:terminated_worker, :worker3, :shutdown} + assert_receive {:terminated_pool, :shutdown} + end + end end