Skip to content

Commit

Permalink
feat: implement handle_cancelled/3 worker callback (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah authored Mar 22, 2024
1 parent deae8ff commit 4c34bc5
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 2 deletions.
45 changes: 43 additions & 2 deletions lib/nimble_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,36 @@ defmodule NimblePool do
pool_state
) :: :ok

@doc """
Handle cancelled checkout requests.
This callback is executed when a checkout request is cancelled unexpectedly.
The context argument may be `:queued` or `:checked_out`:
* `:queued` means the cancellation happened before resource checkout. This may happen
when the pool is starving under load and can not serve resources.
* `:checked_out` means the cancellation happened after resource checkout. This may happen
when the function given to `checkout!/4` raises.
This callback is optional.
"""
@doc callback: :pool
@callback handle_cancelled(
context :: :queued | :checked_out,
pool_state
) :: :ok

@optional_callbacks init_pool: 1,
handle_checkin: 4,
handle_info: 2,
handle_enqueue: 2,
handle_update: 3,
handle_ping: 2,
terminate_worker: 3,
terminate_pool: 2
terminate_pool: 2,
handle_cancelled: 2

@doc """
Defines a pool to be started under the supervision tree.
Expand Down Expand Up @@ -745,19 +767,38 @@ defmodule NimblePool do
{:noreply, %{state | resources: resources, async: async, state: pool_state}}
end

defp cancel_request_ref(ref, reason, %{requests: requests} = state) do
defp cancel_request_ref(
ref,
reason,
%{requests: requests, worker: worker, state: pool_state} = state
) do
case requests do
# Exited or timed out before we could serve it
%{^ref => {_, mon_ref, :command, _command, _deadline}} ->
if function_exported?(worker, :handle_cancelled, 2) do
args = [:queued, pool_state]
apply_worker_callback(worker, :handle_cancelled, args)
end

{:noreply, remove_request(state, ref, mon_ref)}

# Exited or errored during client processing
%{^ref => {_, mon_ref, :state, worker_server_state}} ->
if function_exported?(worker, :handle_cancelled, 2) do
args = [:checked_out, pool_state]
apply_worker_callback(worker, :handle_cancelled, args)
end

state = remove_request(state, ref, mon_ref)
{:noreply, remove_worker(reason, worker_server_state, state)}

# The client timed out, sent us a message, and we dropped the deadlined request
%{} ->
if function_exported?(worker, :handle_cancelled, 2) do
args = [:queued, pool_state]
apply_worker_callback(worker, :handle_cancelled, args)
end

{:noreply, state}
end
end
Expand Down
136 changes: 136 additions & 0 deletions test/nimble_pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ defmodule NimblePoolTest do
def terminate_pool(reason, pool_state) do
TestAgent.next(pool_state.state, :terminate_pool, [reason, pool_state])
end

def handle_cancelled(context, pool_state) do
TestAgent.next(pool_state, :handle_cancelled, [context, pool_state])
end
end

defp stateless_pool!(instructions, opts \\ []) do
Expand Down Expand Up @@ -1019,6 +1023,7 @@ defmodule NimblePoolTest do
handle_checkout: fn :checkout, _from, _next, pool_state ->
{:ok, :client_state_out, :server_state_out, pool_state}
end,
handle_cancelled: fn :checked_out, _pool_state -> :ok end,
terminate_worker: fn reason, :server_state_out, state ->
send(parent, {:terminate, reason})
{:ok, state}
Expand Down Expand Up @@ -1054,6 +1059,7 @@ defmodule NimblePoolTest do
handle_update: fn :update, _next, pool_state ->
{:ok, :updated_state, pool_state}
end,
handle_cancelled: fn :checked_out, _pool_state -> :ok end,
terminate_worker: fn reason, :updated_state, pool_state ->
send(parent, {:terminate, reason})
{:ok, pool_state}
Expand Down Expand Up @@ -1680,4 +1686,134 @@ defmodule NimblePoolTest do
assert termination_time_pool > termination_time_worker
end
end

describe "handle_cancelled" do
test "should run when client raise after checkout" do
parent = self()

{_, pool} =
stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
handle_checkout: fn :checkout, _from, worker_state, pool_state ->
{:ok, :client_state_out, worker_state, pool_state}
end,
handle_cancelled: fn :checked_out, _pool_state ->
send(parent, :ping)
:ok
end,
terminate_worker: fn _reason, _, state -> {:ok, state} end
],
pool_size: 1
)

assert_raise(
RuntimeError,
fn ->
NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
raise "unexpected error"
end)
end
)

assert_receive(:ping)

NimblePool.stop(pool, :shutdown)
end

test "should run when checkout timeout and known request ref" do
parent = self()

{_, pool} =
stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
handle_checkout: fn :checkout, _from, worker_state, pool_state ->
{:ok, :client_state_out, worker_state, pool_state}
end,
handle_cancelled: fn :queued, _pool_state ->
send(parent, :ping)
:ok
end,
handle_checkin: fn :client_state_in, _from, next, pool_state ->
{:ok, next, pool_state}
end,
terminate_worker: fn _reason, _, state -> {:ok, state} end
],
pool_size: 1
)

task1 =
Task.async(fn ->
NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
send(parent, :lock)
assert_receive :release
{:result, :client_state_in}
end)
end)

assert_receive :lock

assert {:timeout, {NimblePool, :checkout, _}} =
catch_exit(
NimblePool.checkout!(
pool,
:checkout,
fn _ref, :client_state_out -> raise "should never execute this line" end,
1
)
)

send(task1.pid, :release)

assert_receive(:ping)

NimblePool.stop(pool, :shutdown)
end

test "should run when checkout timeout and unkown request ref" do
parent = self()

{_, pool} =
stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
handle_cancelled: fn :queued, _pool_state ->
send(parent, :ping)
:ok
end,
handle_checkout: fn :checkout, _from, worker_state, pool_state ->
{:ok, :client_state_out, worker_state, pool_state}
end,
handle_checkin: fn :client_state_in, _from, next, pool_state ->
{:ok, next, pool_state}
end,
terminate_worker: fn _reason, _, state -> {:ok, state} end
],
pool_size: 1
)

:sys.suspend(pool)

assert {:timeout, {NimblePool, :checkout, _}} =
catch_exit(
NimblePool.checkout!(
pool,
:checkout,
fn _ref, :client_state_out -> raise "should never execute this line" end,
1
)
)

:sys.resume(pool)

assert_receive(:ping)

assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
{:result, :client_state_in}
end) == :result

NimblePool.stop(pool, :shutdown)
end
end
end

0 comments on commit 4c34bc5

Please sign in to comment.