From 4c34bc526f55a646d6c17a4ad0d2d0175e58384b Mon Sep 17 00:00:00 2001 From: Gabriel Oliveira Date: Fri, 22 Mar 2024 06:07:53 -0300 Subject: [PATCH] feat: implement handle_cancelled/3 worker callback (#44) --- lib/nimble_pool.ex | 45 ++++++++++++- test/nimble_pool_test.exs | 136 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 179 insertions(+), 2 deletions(-) diff --git a/lib/nimble_pool.ex b/lib/nimble_pool.ex index d76dafe..3920892 100644 --- a/lib/nimble_pool.ex +++ b/lib/nimble_pool.ex @@ -297,6 +297,27 @@ defmodule NimblePool do pool_state ) :: :ok + @doc """ + Handle cancelled checkout requests. + + This callback is executed when a checkout request is cancelled unexpectedly. + + The context argument may be `:queued` or `:checked_out`: + + * `:queued` means the cancellation happened before resource checkout. This may happen + when the pool is starving under load and can not serve resources. + + * `:checked_out` means the cancellation happened after resource checkout. This may happen + when the function given to `checkout!/4` raises. + + This callback is optional. + """ + @doc callback: :pool + @callback handle_cancelled( + context :: :queued | :checked_out, + pool_state + ) :: :ok + @optional_callbacks init_pool: 1, handle_checkin: 4, handle_info: 2, @@ -304,7 +325,8 @@ defmodule NimblePool do handle_update: 3, handle_ping: 2, terminate_worker: 3, - terminate_pool: 2 + terminate_pool: 2, + handle_cancelled: 2 @doc """ Defines a pool to be started under the supervision tree. @@ -745,19 +767,38 @@ defmodule NimblePool do {:noreply, %{state | resources: resources, async: async, state: pool_state}} end - defp cancel_request_ref(ref, reason, %{requests: requests} = state) do + defp cancel_request_ref( + ref, + reason, + %{requests: requests, worker: worker, state: pool_state} = state + ) do case requests do # Exited or timed out before we could serve it %{^ref => {_, mon_ref, :command, _command, _deadline}} -> + if function_exported?(worker, :handle_cancelled, 2) do + args = [:queued, pool_state] + apply_worker_callback(worker, :handle_cancelled, args) + end + {:noreply, remove_request(state, ref, mon_ref)} # Exited or errored during client processing %{^ref => {_, mon_ref, :state, worker_server_state}} -> + if function_exported?(worker, :handle_cancelled, 2) do + args = [:checked_out, pool_state] + apply_worker_callback(worker, :handle_cancelled, args) + end + state = remove_request(state, ref, mon_ref) {:noreply, remove_worker(reason, worker_server_state, state)} # The client timed out, sent us a message, and we dropped the deadlined request %{} -> + if function_exported?(worker, :handle_cancelled, 2) do + args = [:queued, pool_state] + apply_worker_callback(worker, :handle_cancelled, args) + end + {:noreply, state} end end diff --git a/test/nimble_pool_test.exs b/test/nimble_pool_test.exs index dcea5ab..cd8de78 100644 --- a/test/nimble_pool_test.exs +++ b/test/nimble_pool_test.exs @@ -107,6 +107,10 @@ defmodule NimblePoolTest do def terminate_pool(reason, pool_state) do TestAgent.next(pool_state.state, :terminate_pool, [reason, pool_state]) end + + def handle_cancelled(context, pool_state) do + TestAgent.next(pool_state, :handle_cancelled, [context, pool_state]) + end end defp stateless_pool!(instructions, opts \\ []) do @@ -1019,6 +1023,7 @@ defmodule NimblePoolTest do handle_checkout: fn :checkout, _from, _next, pool_state -> {:ok, :client_state_out, :server_state_out, pool_state} end, + handle_cancelled: fn :checked_out, _pool_state -> :ok end, terminate_worker: fn reason, :server_state_out, state -> send(parent, {:terminate, reason}) {:ok, state} @@ -1054,6 +1059,7 @@ defmodule NimblePoolTest do handle_update: fn :update, _next, pool_state -> {:ok, :updated_state, pool_state} end, + handle_cancelled: fn :checked_out, _pool_state -> :ok end, terminate_worker: fn reason, :updated_state, pool_state -> send(parent, {:terminate, reason}) {:ok, pool_state} @@ -1680,4 +1686,134 @@ defmodule NimblePoolTest do assert termination_time_pool > termination_time_worker end end + + describe "handle_cancelled" do + test "should run when client raise after checkout" do + parent = self() + + {_, pool} = + stateful_pool!( + [ + init_worker: fn next -> {:ok, :worker1, next} end, + handle_checkout: fn :checkout, _from, worker_state, pool_state -> + {:ok, :client_state_out, worker_state, pool_state} + end, + handle_cancelled: fn :checked_out, _pool_state -> + send(parent, :ping) + :ok + end, + terminate_worker: fn _reason, _, state -> {:ok, state} end + ], + pool_size: 1 + ) + + assert_raise( + RuntimeError, + fn -> + NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out -> + raise "unexpected error" + end) + end + ) + + assert_receive(:ping) + + NimblePool.stop(pool, :shutdown) + end + + test "should run when checkout timeout and known request ref" do + parent = self() + + {_, pool} = + stateful_pool!( + [ + init_worker: fn next -> {:ok, :worker1, next} end, + handle_checkout: fn :checkout, _from, worker_state, pool_state -> + {:ok, :client_state_out, worker_state, pool_state} + end, + handle_cancelled: fn :queued, _pool_state -> + send(parent, :ping) + :ok + end, + handle_checkin: fn :client_state_in, _from, next, pool_state -> + {:ok, next, pool_state} + end, + terminate_worker: fn _reason, _, state -> {:ok, state} end + ], + pool_size: 1 + ) + + task1 = + Task.async(fn -> + NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out -> + send(parent, :lock) + assert_receive :release + {:result, :client_state_in} + end) + end) + + assert_receive :lock + + assert {:timeout, {NimblePool, :checkout, _}} = + catch_exit( + NimblePool.checkout!( + pool, + :checkout, + fn _ref, :client_state_out -> raise "should never execute this line" end, + 1 + ) + ) + + send(task1.pid, :release) + + assert_receive(:ping) + + NimblePool.stop(pool, :shutdown) + end + + test "should run when checkout timeout and unkown request ref" do + parent = self() + + {_, pool} = + stateful_pool!( + [ + init_worker: fn next -> {:ok, :worker1, next} end, + handle_cancelled: fn :queued, _pool_state -> + send(parent, :ping) + :ok + end, + handle_checkout: fn :checkout, _from, worker_state, pool_state -> + {:ok, :client_state_out, worker_state, pool_state} + end, + handle_checkin: fn :client_state_in, _from, next, pool_state -> + {:ok, next, pool_state} + end, + terminate_worker: fn _reason, _, state -> {:ok, state} end + ], + pool_size: 1 + ) + + :sys.suspend(pool) + + assert {:timeout, {NimblePool, :checkout, _}} = + catch_exit( + NimblePool.checkout!( + pool, + :checkout, + fn _ref, :client_state_out -> raise "should never execute this line" end, + 1 + ) + ) + + :sys.resume(pool) + + assert_receive(:ping) + + assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out -> + {:result, :client_state_in} + end) == :result + + NimblePool.stop(pool, :shutdown) + end + end end