Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement handle_cancelled/3 worker callback #44

Merged
merged 2 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions lib/nimble_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,38 @@ defmodule NimblePool do
pool_state
) :: :ok

@doc """
Handle cancelled checkout requests.

This callback is executed when a checkout request is cancelled unexpectedly.

The context argument may be `:queued` or `:checked_out`:

* `:queued` means the cancellation happened before resource checkout. This may happen
when the pool is starving under load and can not serve resources. Since no checkout
happened the worker_state argument will be `nil`.

* `:checked_out` means the cancellation happened after resource checkout. This may happen
when the function given to `checkout!/4` raises.

This callback is optional.
"""
@doc callback: :worker
@callback handle_cancelled(
worker_state :: worker_state | nil,
oliveigah marked this conversation as resolved.
Show resolved Hide resolved
pool_state,
context :: :queued | :checked_out
) :: :ok

@optional_callbacks init_pool: 1,
handle_checkin: 4,
handle_info: 2,
handle_enqueue: 2,
handle_update: 3,
handle_ping: 2,
terminate_worker: 3,
terminate_pool: 2
terminate_pool: 2,
handle_cancelled: 3

@doc """
Defines a pool to be started under the supervision tree.
Expand Down Expand Up @@ -745,19 +769,38 @@ defmodule NimblePool do
{:noreply, %{state | resources: resources, async: async, state: pool_state}}
end

defp cancel_request_ref(ref, reason, %{requests: requests} = state) do
defp cancel_request_ref(
ref,
reason,
%{requests: requests, worker: worker, state: pool_state} = state
) do
case requests do
# Exited or timed out before we could serve it
%{^ref => {_, mon_ref, :command, _command, _deadline}} ->
if function_exported?(worker, :handle_cancelled, 3) do
args = [nil, pool_state, :queued]
apply_worker_callback(worker, :handle_cancelled, args)
end

{:noreply, remove_request(state, ref, mon_ref)}

# Exited or errored during client processing
%{^ref => {_, mon_ref, :state, worker_server_state}} ->
if function_exported?(worker, :handle_cancelled, 3) do
args = [worker_server_state, pool_state, :checked_out]
apply_worker_callback(worker, :handle_cancelled, args)
end

state = remove_request(state, ref, mon_ref)
{:noreply, remove_worker(reason, worker_server_state, state)}

# The client timed out, sent us a message, and we dropped the deadlined request
%{} ->
if function_exported?(worker, :handle_cancelled, 3) do
args = [nil, pool_state, :queued]
oliveigah marked this conversation as resolved.
Show resolved Hide resolved
apply_worker_callback(worker, :handle_cancelled, args)
end

{:noreply, state}
end
end
Expand Down
136 changes: 136 additions & 0 deletions test/nimble_pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ defmodule NimblePoolTest do
def terminate_pool(reason, pool_state) do
TestAgent.next(pool_state.state, :terminate_pool, [reason, pool_state])
end

def handle_cancelled(worker_state, pool_state, context) do
TestAgent.next(pool_state, :handle_cancelled, [worker_state, pool_state, context])
end
end

defp stateless_pool!(instructions, opts \\ []) do
Expand Down Expand Up @@ -1019,6 +1023,7 @@ defmodule NimblePoolTest do
handle_checkout: fn :checkout, _from, _next, pool_state ->
{:ok, :client_state_out, :server_state_out, pool_state}
end,
handle_cancelled: fn :server_state_out, _pool_state, :checked_out -> :ok end,
terminate_worker: fn reason, :server_state_out, state ->
send(parent, {:terminate, reason})
{:ok, state}
Expand Down Expand Up @@ -1054,6 +1059,7 @@ defmodule NimblePoolTest do
handle_update: fn :update, _next, pool_state ->
{:ok, :updated_state, pool_state}
end,
handle_cancelled: fn :server_state_out, _pool_state, :checked_out -> :ok end,
terminate_worker: fn reason, :updated_state, pool_state ->
send(parent, {:terminate, reason})
{:ok, pool_state}
Expand Down Expand Up @@ -1680,4 +1686,134 @@ defmodule NimblePoolTest do
assert termination_time_pool > termination_time_worker
end
end

describe "handle_cancelled" do
test "should run when client raise after checkout" do
parent = self()

{_, pool} =
stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
handle_checkout: fn :checkout, _from, worker_state, pool_state ->
{:ok, :client_state_out, worker_state, pool_state}
end,
handle_cancelled: fn worker_state, _pool_state, :checked_out ->
send(parent, {:ping, worker_state})
:ok
end,
terminate_worker: fn _reason, _, state -> {:ok, state} end
],
pool_size: 1
)

assert_raise(
RuntimeError,
fn ->
NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
raise "unexpected error"
end)
end
)

assert_receive({:ping, :worker1})

NimblePool.stop(pool, :shutdown)
end

test "should run when checkout timeout and known request ref" do
parent = self()

{_, pool} =
stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
handle_checkout: fn :checkout, _from, worker_state, pool_state ->
{:ok, :client_state_out, worker_state, pool_state}
end,
handle_cancelled: fn nil, _pool_state, :queued ->
send(parent, {:ping, nil})
:ok
end,
handle_checkin: fn :client_state_in, _from, next, pool_state ->
{:ok, next, pool_state}
end,
terminate_worker: fn _reason, _, state -> {:ok, state} end
],
pool_size: 1
)

task1 =
Task.async(fn ->
NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
send(parent, :lock)
assert_receive :release
{:result, :client_state_in}
end)
end)

assert_receive :lock

assert {:timeout, {NimblePool, :checkout, _}} =
catch_exit(
NimblePool.checkout!(
pool,
:checkout,
fn _ref, :client_state_out -> raise "should never execute this line" end,
1
)
)

send(task1.pid, :release)

assert_receive({:ping, nil})

NimblePool.stop(pool, :shutdown)
end

test "should run when checkout timeout and unkown request ref" do
parent = self()

{_, pool} =
stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
handle_cancelled: fn nil, _pool_state, :queued ->
send(parent, {:ping, nil})
:ok
end,
handle_checkout: fn :checkout, _from, worker_state, pool_state ->
{:ok, :client_state_out, worker_state, pool_state}
end,
handle_checkin: fn :client_state_in, _from, next, pool_state ->
{:ok, next, pool_state}
end,
terminate_worker: fn _reason, _, state -> {:ok, state} end
],
pool_size: 1
)

:sys.suspend(pool)

assert {:timeout, {NimblePool, :checkout, _}} =
catch_exit(
NimblePool.checkout!(
pool,
:checkout,
fn _ref, :client_state_out -> raise "should never execute this line" end,
1
)
)

:sys.resume(pool)

assert_receive({:ping, nil})

assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
{:result, :client_state_in}
end) == :result

NimblePool.stop(pool, :shutdown)
end
end
end
Loading