diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 37d7e51..fe13ef8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,8 +16,8 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - elixir: ['1.11.4', '1.12.3'] - erlang: ['22.3', '23.3', '24.1'] + elixir: ['1.15', '1.16'] + erlang: ['24', '25', '26'] services: es: @@ -62,8 +62,8 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - elixir: ['1.11.4', '1.12.3'] - erlang: ['22.3', '23.3', '24.1'] + elixir: ['1.15', '1.16'] + erlang: ['24', '25', '26'] services: es: @@ -108,8 +108,8 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - elixir: ['1.12.3', '1.13', '1.14', '1.15', '1.16', '1.17'] - erlang: ['24.3', '25.3', '26.3', '27.0'] + elixir: ['1.15', '1.16'] + erlang: ['24', '25', '26'] services: es: diff --git a/lib/extreme.ex b/lib/extreme.ex index 398a2f8..8ac6358 100644 --- a/lib/extreme.ex +++ b/lib/extreme.ex @@ -80,7 +80,7 @@ defmodule Extreme do def subscribe_producer(producer), do: Extreme.EventProducer.subscribe(producer) - @spec unsubscribe_producer(producer :: pid()) :: :unsubscribed + @spec unsubscribe_producer(producer :: pid()) :: :ok def unsubscribe_producer(producer), do: Extreme.EventProducer.unsubscribe(producer) @@ -131,7 +131,7 @@ defmodule Extreme do @doc """ TODO """ - @callback unsubscribe(subscription :: pid()) :: :unsubscribed + @callback unsubscribe(subscription :: pid()) :: :ok @doc """ TODO diff --git a/lib/extreme/connection.ex b/lib/extreme/connection.ex index 8ac5e1d..95f436c 100644 --- a/lib/extreme/connection.ex +++ b/lib/extreme/connection.ex @@ -18,7 +18,7 @@ defmodule Extreme.Connection do |> GenServer.cast({:execute, message}) end - @impl true + @impl GenServer def init({base_name, configuration}) do GenServer.cast(self(), {:connect, configuration, 1}) @@ -30,7 +30,7 @@ defmodule Extreme.Connection do {:ok, state} end - @impl true + @impl GenServer def handle_cast({:connect, configuration, attempt}, state) do configuration |> _connect(attempt) @@ -57,7 +57,7 @@ defmodule Extreme.Connection do end end - @impl true + @impl GenServer def handle_info({:tcp, socket, pkg}, %State{socket: socket} = state) do {:ok, state} = Impl.receive_package(pkg, state) {:noreply, state} @@ -66,7 +66,7 @@ defmodule Extreme.Connection do def handle_info({:tcp_closed, _port}, state), do: {:stop, :tcp_closed, state} - @impl true + @impl GenServer def terminate(reason, state) do Logger.warning("[Extreme] Connection terminated: #{inspect(reason)}") RequestManager.kill_all_subscriptions(state.base_name) diff --git a/lib/extreme/connection_impl.ex b/lib/extreme/connection_impl.ex index 5579ee1..b623e4e 100644 --- a/lib/extreme/connection_impl.ex +++ b/lib/extreme/connection_impl.ex @@ -12,22 +12,22 @@ defmodule Extreme.ConnectionImpl do def receive_package(pkg, %State{socket: socket, received_data: received_data} = state) do :inet.setopts(socket, active: :once) - state = _process_package(state, received_data <> pkg) + state = _process_package(received_data <> pkg, state) {:ok, state} end defp _process_package( - state, <> + rest::binary>>, + %State{} = state ) do # Handle binary data containing zero, one or many messages # All messages start with a 32 bit unsigned little endian integer of the content length + a binary body of that size :ok = RequestManager.process_server_message(state.base_name, content) - _process_package(state, rest) + _process_package(rest, state) end # No full message left, keep state in GenServer to reprocess once more data arrives - defp _process_package(state, package_with_incomplete_message), + defp _process_package(package_with_incomplete_message, %State{} = state), do: %{state | received_data: package_with_incomplete_message} end diff --git a/lib/extreme/event_producer.ex b/lib/extreme/event_producer.ex index 8a6fd08..e842b64 100644 --- a/lib/extreme/event_producer.ex +++ b/lib/extreme/event_producer.ex @@ -19,7 +19,7 @@ defmodule Extreme.EventProducer do do: GenServer.cast(pid, :subscribe) def unsubscribe(pid), - do: GenServer.call(pid, :unsubscribe) + do: GenServer.cast(pid, :unsubscribe) def on_sync_event(pid, event, timeout), do: GenServer.call(pid, {:on_sync_event, event}, timeout) @@ -56,11 +56,6 @@ defmodule Extreme.EventProducer do {:reply, response, state} end - def handle_call(:unsubscribe, _from, %State{} = state) do - response = EventBuffer.unsubscribe(state.buffer_pid) - {:reply, response, state} - end - def handle_call(:subscription_status, _from, %State{} = state) do response = EventBuffer.subscription_status(state.buffer_pid) {:reply, response, state} @@ -92,4 +87,9 @@ defmodule Extreme.EventProducer do {:noreply, state} end + + def handle_cast(:unsubscribe, %State{} = state) do + :ok = EventBuffer.unsubscribe(state.buffer_pid) + {:noreply, state} + end end diff --git a/lib/extreme/event_producer/event_buffer.ex b/lib/extreme/event_producer/event_buffer.ex index 0217c19..6676320 100644 --- a/lib/extreme/event_producer/event_buffer.ex +++ b/lib/extreme/event_producer/event_buffer.ex @@ -15,7 +15,7 @@ defmodule Extreme.EventProducer.EventBuffer do do: GenServer.cast(pid, {:ack, response}) def unsubscribe(pid), - do: GenServer.call(pid, :unsubscribe) + do: GenServer.cast(pid, :unsubscribe) def subscribe(pid), do: GenServer.cast(pid, :subscribe) @@ -61,11 +61,31 @@ defmodule Extreme.EventProducer.EventBuffer do end @impl GenServer - def handle_cast(:subscribe, state) do + def handle_cast(:unsubscribe, %State{subscription: nil} = state), + do: {:noreply, state} + + def handle_cast(:unsubscribe, %State{} = state) do + Logger.debug("Received `:unsubscribe` request") + :ok = Subscription.unsubscribe(state.subscription) + state = %State{state | subscription: nil, subscription_ref: nil, status: :disconnected} + + {:noreply, state} + end + + def handle_cast(:subscribe, %{status: :disconnected} = state) do Logger.debug( "Subscribing to #{state.stream} starting with #{state.last_buffered_event_number}" ) + handle_cast(:resubscribe, state) + end + + def handle_cast(:subscribe, %{} = state) do + Logger.warning("(noop) Subscribe attempted while in #{state.status} status") + {:noreply, state} + end + + def handle_cast(:resubscribe, %{} = state) do {:ok, subscription} = Extreme.RequestManager.read_and_stay_subscribed( state.base_name, @@ -97,17 +117,6 @@ defmodule Extreme.EventProducer.EventBuffer do end @impl GenServer - def handle_call(:unsubscribe, _from, %State{subscription: nil} = state), - do: {:reply, :unsubscribed, state} - - def handle_call(:unsubscribe, _from, %State{} = state) do - Logger.debug("Received `:unsubscribe` request") - response = Subscription.unsubscribe(state.subscription) - state = %State{state | subscription: nil, subscription_ref: nil, status: :disconnected} - - {:reply, response, state} - end - def handle_call(:subscription_status, _from, %State{} = state), do: {:reply, state.status, state} @@ -126,19 +135,24 @@ defmodule Extreme.EventProducer.EventBuffer do if buffer_size == 1, do: :ok = EventProducer.on_async_event(state.producer_pid, event) - response = + state = %State{ + state + | buffered_events: buffered_events, + last_buffered_event_number: event_number + } + + {response, state} = if buffer_size >= state.max_buffered do - Logger.warning( + Logger.info( "Event buffer is full (#{inspect(buffer_size)}). Turning off subscription on #{state.stream}" ) - :stop + {:stop, %State{state | subscription: nil, subscription_ref: nil, status: :paused}} else - :ok + {:ok, state} end - {:reply, response, - %State{state | buffered_events: buffered_events, last_buffered_event_number: event_number}} + {:reply, response, state} end def handle_call({:on_event, _event}, _from, %State{} = state) do @@ -159,7 +173,6 @@ defmodule Extreme.EventProducer.EventBuffer do %State{subscription: subscription, subscription_ref: ref} = state ) when reason in [ - :processing_of_event_requested_stopping_subscription, :processing_of_read_events_failed, :processing_of_buffered_events_failed, :unsubscribed, @@ -171,7 +184,7 @@ defmodule Extreme.EventProducer.EventBuffer do def handle_info( {:DOWN, _ref, :process, _subscription, {:shutdown, _reason}}, - %State{subscription: nil, subscription_ref: nil, status: :disconnected} = state + %State{subscription: nil, subscription_ref: nil} = state ) do # we are already aware of that {:noreply, state} @@ -185,7 +198,7 @@ defmodule Extreme.EventProducer.EventBuffer do ) :timer.sleep(reconnect_delay) - GenServer.cast(self(), :subscribe) + GenServer.cast(self(), :resubscribe) {:noreply, %State{state | subscription: nil, subscription_ref: nil, status: :paused}} end @@ -202,11 +215,14 @@ defmodule Extreme.EventProducer.EventBuffer do {:ok, event_number} = _get_event_number(event) {:reply, :ok, %State{state | last_buffered_event_number: event_number}} - {:ok, event_number} -> + {:ok, event_number} when is_integer(event_number) -> {:reply, :ok, %State{state | last_buffered_event_number: event_number}} :stop -> - {:reply, :stop, %State{state | status: :paused}} + :ok = Subscription.unsubscribe(state.subscription) + + {:reply, :stop, + %State{state | subscription: nil, subscription_ref: nil, status: :disconnected}} end end @@ -236,7 +252,7 @@ defmodule Extreme.EventProducer.EventBuffer do # in cahtching up mode while we still have buffered events if state.status == :paused and :queue.len(buffered_events) == 0 do Logger.info("Resuming subscription on #{state.stream}") - :ok = GenServer.cast(self(), :subscribe) + :ok = GenServer.cast(self(), :resubscribe) end {:noreply, %State{state | buffered_events: buffered_events}} diff --git a/lib/extreme/persistent_subscription.ex b/lib/extreme/persistent_subscription.ex index 7344e6d..cc3cfe8 100644 --- a/lib/extreme/persistent_subscription.ex +++ b/lib/extreme/persistent_subscription.ex @@ -76,7 +76,7 @@ defmodule Extreme.PersistentSubscription do end def handle_call(:unsubscribe, _from, state) do - :unsubscribed = MyExtremeClientModule.unsubscribe(state.subscription_pid) + :ok = MyExtremeClientModule.unsubscribe(state.subscription_pid) {:reply, :ok, state} end @@ -218,7 +218,7 @@ defmodule Extreme.PersistentSubscription do end @doc false - @impl true + @impl GenServer def init({base_name, correlation_id, subscriber, stream, group, allowed_in_flight_messages}) do state = %State{ base_name: base_name, @@ -235,13 +235,12 @@ defmodule Extreme.PersistentSubscription do {:ok, state} end - @impl true - def handle_call(:unsubscribe, from, state) do - :ok = Shared.unsubscribe(from, state) + @impl GenServer + def handle_cast(:unsubscribe, state) do + :ok = Shared.unsubscribe(state) {:noreply, state} end - @impl true def handle_cast(:subscribe, state) do Msg.ConnectToPersistentSubscription.new( subscription_id: state.group, @@ -279,13 +278,6 @@ defmodule Extreme.PersistentSubscription do {:noreply, state} end - def handle_cast(:unsubscribe, state) do - Msg.UnsubscribeFromStream.new() - |> cast_request_manager(state.base_name, state.correlation_id) - - {:noreply, state} - end - defp cast_request_manager(message, base_name, correlation_id) do base_name |> RequestManager._name() diff --git a/lib/extreme/reading_subscription.ex b/lib/extreme/reading_subscription.ex index b04a7f1..8efb75e 100644 --- a/lib/extreme/reading_subscription.ex +++ b/lib/extreme/reading_subscription.ex @@ -19,7 +19,7 @@ defmodule Extreme.ReadingSubscription do ) end - @impl true + @impl GenServer def init( {base_name, correlation_id, subscriber, {stream, from_event_number, per_page, resolve_link_tos, require_master, ack_timeout}} @@ -50,13 +50,12 @@ defmodule Extreme.ReadingSubscription do {:ok, %State{state | read_until: read_until}} end - @impl true - def handle_call(:unsubscribe, from, state) do - :ok = Shared.unsubscribe(from, state) + @impl GenServer + def handle_cast(:unsubscribe, state) do + :ok = Shared.unsubscribe(state) {:noreply, state} end - @impl true def handle_cast({:process_push, fun}, %{status: :subscribed} = state), do: Shared.process_push(fun, state) diff --git a/lib/extreme/request.ex b/lib/extreme/request.ex index 04384f7..725009e 100644 --- a/lib/extreme/request.ex +++ b/lib/extreme/request.ex @@ -26,6 +26,7 @@ defmodule Extreme.Request do def prepare(protobuf_msg, credentials, correlation_id) do cmd = protobuf_msg.__struct__ + # IO.inspect([correlation_id, cmd], label: "Sending to ES") data = cmd.encode(protobuf_msg) _to_binary(cmd, correlation_id, credentials, data) end diff --git a/lib/extreme/request_manager.ex b/lib/extreme/request_manager.ex index c7fba26..374e186 100644 --- a/lib/extreme/request_manager.ex +++ b/lib/extreme/request_manager.ex @@ -1,5 +1,6 @@ defmodule Extreme.RequestManager do use GenServer + alias Extreme.SubscriptionsSupervisor alias Extreme.{Tools, Configuration, Request, Response, Connection} require Logger @@ -14,7 +15,7 @@ defmodule Extreme.RequestManager do ] defmodule State do - defstruct ~w(base_name credentials requests subscriptions read_only stream_event_buffers)a + defstruct ~w(base_name credentials requests subscriptions pid_to_correlation_id read_only)a end def _name(base_name), do: Module.concat(base_name, RequestManager) @@ -25,42 +26,6 @@ defmodule Extreme.RequestManager do def start_link(base_name, configuration), do: GenServer.start_link(__MODULE__, {base_name, configuration}, name: _name(base_name)) - @doc """ - Send IdentifyClient message to EventStore. Called when connection is established. - """ - def identify_client(connection_name, base_name) do - base_name - |> _name() - |> GenServer.cast({:identify_client, connection_name}) - end - - def send_heartbeat_response(base_name, correlation_id) do - base_name - |> _name() - |> GenServer.cast({:send_heartbeat_response, correlation_id}) - end - - @doc """ - Processes server message as soon as it is completely received via tcp. - This function is run in `connection` process. - """ - def process_server_message(base_name, message) do - :ok = - base_name - |> _name() - |> GenServer.cast({:process_server_message, message}) - end - - @doc """ - Sends `message` as a response to pending request or as a push on subscription. - `correlation_id` is used to find pending request/subscription. - """ - def respond_with_server_message(base_name, correlation_id, response) do - base_name - |> _name() - |> GenServer.cast({:respond_with_server_message, correlation_id, response}) - end - def ping(base_name, correlation_id) do base_name |> _name() @@ -79,12 +44,6 @@ defmodule Extreme.RequestManager do |> GenServer.call({:subscribe_to, stream, subscriber, resolve_link_tos, ack_timeout}) end - def unregister_subscription(base_name, correlation_id) do - base_name - |> _name() - |> GenServer.cast({:unregister_subscription, correlation_id}) - end - def read_and_stay_subscribed(base_name, subscriber, params) do base_name |> _name() @@ -105,6 +64,34 @@ defmodule Extreme.RequestManager do ) end + def unregister_subscription(base_name, correlation_id) do + base_name + |> _name() + |> GenServer.cast({:unregister_subscription, correlation_id}) + end + + ## Called only from `Connection` + + @doc """ + Send IdentifyClient message to EventStore. Called by `connection` process when connection is established. + """ + def identify_client(connection_name, base_name) do + base_name + |> _name() + |> GenServer.cast({:identify_client, connection_name}) + end + + @doc """ + Processes server message as soon as it is completely received via tcp. + This function is run in `connection` process. + """ + def process_server_message(base_name, message) do + :ok = + base_name + |> _name() + |> GenServer.cast({:process_server_message, message}) + end + def kill_all_subscriptions(base_name) do base_name |> _name() @@ -113,7 +100,7 @@ defmodule Extreme.RequestManager do ## Server callbacks - @impl true + @impl GenServer def init({base_name, configuration}) do # link me with SubscriptionsSupervisor, since I'm subscription register. true = @@ -127,12 +114,12 @@ defmodule Extreme.RequestManager do credentials: Configuration.prepare_credentials(configuration), requests: %{}, subscriptions: %{}, - stream_event_buffers: %{}, + pid_to_correlation_id: %{}, read_only: Keyword.get(configuration, :read_only, false) }} end - @impl true + @impl GenServer def handle_call({:ping, correlation_id}, from, %State{} = state) do state = %State{state | requests: Map.put(state.requests, correlation_id, from)} @@ -158,6 +145,7 @@ defmodule Extreme.RequestManager do _in_task(state.base_name, fn -> {:ok, message} = Request.prepare(message, state.credentials, correlation_id) + :ok = Connection.push(state.base_name, message) end) @@ -185,7 +173,7 @@ defmodule Extreme.RequestManager do def handle_call({:read_and_stay_subscribed, subscriber, read_params}, from, %State{} = state) do _start_subscription(self(), from, state.base_name, fn correlation_id -> - Extreme.SubscriptionsSupervisor.start_subscription( + Extreme.SubscriptionsSupervisor.start_reading_subscription( state.base_name, correlation_id, subscriber, @@ -220,15 +208,23 @@ defmodule Extreme.RequestManager do _in_task(base_name, fn -> correlation_id = Tools.generate_uuid() - {:ok, subscription} = fun.(correlation_id) + GenServer.cast(req_manager, {:prepare_subscription, correlation_id}) - GenServer.cast(req_manager, {:register_subscription, correlation_id, subscription}) + correlation_id + |> fun.() + |> case do + {:ok, subscription} -> + GenServer.cast(req_manager, {:confirm_subscription, correlation_id, subscription}) + GenServer.reply(from, {:ok, subscription}) - GenServer.reply(from, {:ok, subscription}) + error -> + :ok = GenServer.cast(req_manager, {:unregister_subscription, correlation_id}) + GenServer.reply(from, error) + end end) end - @impl true + @impl GenServer def handle_cast({:execute, correlation_id, message}, %State{} = state) do _in_task(state.base_name, fn -> {:ok, message} = Request.prepare(message, state.credentials, correlation_id) @@ -249,7 +245,8 @@ defmodule Extreme.RequestManager do message |> Response.get_correlation_id() - state = + %State{} = + state = state.subscriptions[correlation_id] |> _process_server_message(message, state) @@ -263,29 +260,58 @@ defmodule Extreme.RequestManager do end def handle_cast({:respond_with_server_message, correlation_id, response}, %State{} = state) do - state = - case Map.get(state.requests, correlation_id) do - from when not is_nil(from) -> + %State{} = + state = + state.requests + |> Map.get(correlation_id) + |> case do + nil -> + state + + from -> requests = Map.delete(state.requests, correlation_id) :ok = GenServer.reply(from, response) %{state | requests: requests} - - nil -> - state end {:noreply, state} end - def handle_cast({:register_subscription, correlation_id, subscription}, %State{} = state) do - subscriptions = Map.put(state.subscriptions, correlation_id, subscription) + def handle_cast({:prepare_subscription, correlation_id}, %State{} = state) do + subscriptions = Map.put(state.subscriptions, correlation_id, {:pending, correlation_id, []}) + {:noreply, %State{state | subscriptions: subscriptions}} end + def handle_cast({:confirm_subscription, correlation_id, subscription}, %State{} = state) do + Process.monitor(subscription) + {:pending, ^correlation_id, buffer} = Map.get(state.subscriptions, correlation_id) + + buffer + |> Enum.reverse() + |> Enum.each(&GenServer.cast(subscription, {:process_push, fn -> Response.parse(&1) end})) + + subscriptions = Map.put(state.subscriptions, correlation_id, subscription) + pid_to_correlation_id = Map.put(state.pid_to_correlation_id, subscription, correlation_id) + + {:noreply, + %State{state | subscriptions: subscriptions, pid_to_correlation_id: pid_to_correlation_id}} + end + def handle_cast({:unregister_subscription, correlation_id}, %State{} = state) do - subscriptions = Map.delete(state.subscriptions, correlation_id) + {subscription, subscriptions} = Map.pop(state.subscriptions, correlation_id) + pid_to_correlation_id = Map.delete(state.pid_to_correlation_id, subscription) + SubscriptionsSupervisor.stop_subscription(state.base_name, subscription) requests = Map.delete(state.requests, correlation_id) - {:noreply, %State{state | requests: requests, subscriptions: subscriptions}} + + state = %State{ + state + | requests: requests, + subscriptions: subscriptions, + pid_to_correlation_id: pid_to_correlation_id + } + + {:noreply, state} end def handle_cast(:kill_all_subscriptions, %State{} = state) do @@ -294,7 +320,28 @@ defmodule Extreme.RequestManager do state.base_name |> Extreme.SubscriptionsSupervisor.kill_all_subscriptions() - {:noreply, %State{state | subscriptions: %{}}} + {:noreply, %State{state | subscriptions: %{}, pid_to_correlation_id: %{}}} + end + + @impl GenServer + def handle_info({:DOWN, _, :process, pid, _reason}, state) do + {correlation_id, pid_to_correlation_id} = Map.pop(state.pid_to_correlation_id, pid) + subscriptions = Map.delete(state.subscriptions, correlation_id) + requests = Map.delete(state.requests, correlation_id) + + state = %State{ + state + | subscriptions: subscriptions, + pid_to_correlation_id: pid_to_correlation_id, + requests: requests + } + + {:noreply, state} + end + + def handle_info(msg, state) do + Logger.warning("[Extreme.RequestManager] Unhandled message received: #{inspect(msg)}") + {:noreply, state} end ## Helper functions @@ -305,19 +352,37 @@ defmodule Extreme.RequestManager do |> Task.Supervisor.start_child(fun) end - # we got StreamEventAppeared before subscription was registered + # we got message for still unregistered subscription so we need to buffer it defp _process_server_message( - nil, + {:pending, correlation_id, buffer}, <<0xC2, _auth, correlation_id::16-binary, _data::binary>> = message, - %State{stream_event_buffers: buffers} = state + %State{} = state ) do - buffer = Map.get(buffers, correlation_id, []) + subscriptions = + state.subscriptions + |> Map.put( + correlation_id, + {:pending, correlation_id, [message | buffer]} + ) - %State{state | stream_event_buffers: Map.put(buffers, correlation_id, [message | buffer])} + %State{state | subscriptions: subscriptions} end + # message is response for new subscription request + defp _process_server_message({:pending, _correlation_id, _buffer}, message, state), + do: _process_response(message, state) + # message is response to pending request - defp _process_server_message(nil, message, state) do + defp _process_server_message(nil, message, state), + do: _process_response(message, state) + + # message is for subscription, decoding needs to be done there so we keep the order of incoming messages + defp _process_server_message(subscription, message, state) do + GenServer.cast(subscription, {:process_push, fn -> Response.parse(message) end}) + state + end + + defp _process_response(message, state) do _in_task(state.base_name, fn -> message |> Response.parse() @@ -327,39 +392,38 @@ defmodule Extreme.RequestManager do state end - # message is for subscription, decoding needs to be done there so we keep the order of incoming messages - defp _process_server_message( - subscription, - message, - %State{stream_event_buffers: buffers} = state - ) do - correlation_id = Response.get_correlation_id(message) - buffer = Map.get(buffers, correlation_id, []) - - [message | buffer] - |> Enum.reverse() - |> Enum.each(&GenServer.cast(subscription, {:process_push, fn -> Response.parse(&1) end})) - - %State{state | stream_event_buffers: Map.delete(buffers, correlation_id)} - end - defp _respond_on({:client_identified, _correlation_id}, _), do: :ok defp _respond_on({:heartbeat_request, correlation_id}, base_name), - do: :ok = send_heartbeat_response(base_name, correlation_id) + do: :ok = _send_heartbeat_response(base_name, correlation_id) defp _respond_on({:pong, correlation_id}, base_name), - do: :ok = respond_with_server_message(base_name, correlation_id, :pong) + do: :ok = _respond_with_server_message(base_name, correlation_id, :pong) defp _respond_on({:error, :not_authenticated, correlation_id}, base_name), - do: :ok = respond_with_server_message(base_name, correlation_id, {:error, :not_authenticated}) + do: + :ok = _respond_with_server_message(base_name, correlation_id, {:error, :not_authenticated}) defp _respond_on({:error, :bad_request, correlation_id}, base_name), - do: :ok = respond_with_server_message(base_name, correlation_id, {:error, :bad_request}) + do: :ok = _respond_with_server_message(base_name, correlation_id, {:error, :bad_request}) defp _respond_on({_auth, correlation_id, message}, base_name) do response = Response.reply(message, correlation_id) - :ok = respond_with_server_message(base_name, correlation_id, response) + :ok = _respond_with_server_message(base_name, correlation_id, response) + end + + # Sends `message` as a response to pending request or as a push on subscription. + # `correlation_id` is used to find pending request/subscription. + defp _respond_with_server_message(base_name, correlation_id, response) do + base_name + |> _name() + |> GenServer.cast({:respond_with_server_message, correlation_id, response}) + end + + defp _send_heartbeat_response(base_name, correlation_id) do + base_name + |> _name() + |> GenServer.cast({:send_heartbeat_response, correlation_id}) end end diff --git a/lib/extreme/response.ex b/lib/extreme/response.ex index 15f29b1..151fcce 100644 --- a/lib/extreme/response.ex +++ b/lib/extreme/response.ex @@ -4,8 +4,8 @@ defmodule Extreme.Response do def get_correlation_id(<<_message_type, _auth, correlation_id::16-binary, _data::binary>>), do: correlation_id - def parse(<>) do - case Extreme.MessageResolver.decode_cmd(message_type) do + def parse(<>) do + case Extreme.MessageResolver.decode_cmd(message_code) do :not_authenticated -> {:error, :not_authenticated, correlation_id} @@ -23,6 +23,7 @@ defmodule Extreme.Response do response_struct -> data = response_struct.decode(data) + # IO.inspect([correlation_id, data.__struct__], label: "Received from ES") {auth, correlation_id, data} end end diff --git a/lib/extreme/shared_subscription.ex b/lib/extreme/shared_subscription.ex index 5fde19c..99b172b 100644 --- a/lib/extreme/shared_subscription.ex +++ b/lib/extreme/shared_subscription.ex @@ -29,18 +29,20 @@ defmodule Extreme.SharedSubscription do end @doc """ - Sends unsubscribe request and remembers who it should respond to when response is received. - Response will arrive to subscription as push message. + Sends unsubscribe "fire and forget" request to EventStoreDB. """ - def unsubscribe(from, state) do - message = Msg.UnsubscribeFromStream.new() - + def unsubscribe(state) do spawn_link(fn -> - state.base_name - |> RequestManager.execute(message, state.correlation_id) - end) + message = Msg.UnsubscribeFromStream.new() - Process.put(:reply_to, from) + :ok = + state.base_name + |> RequestManager.execute(message, state.correlation_id) + + :ok = + state.base_name + |> RequestManager.unregister_subscription(state.correlation_id) + end) :ok end @@ -80,7 +82,7 @@ defmodule Extreme.SharedSubscription do {:noreply, state} :stop -> - Logger.warning("Processing of event requested stopping subscription") + Logger.info("Processing of event requested stopping subscription") RequestManager.unregister_subscription(state.base_name, state.correlation_id) {:stop, {:shutdown, :processing_of_event_requested_stopping_subscription}, state} end @@ -92,12 +94,6 @@ defmodule Extreme.SharedSubscription do ) do send(state.subscriber, {:extreme, reason}) RequestManager.unregister_subscription(state.base_name, state.correlation_id) - - case Process.get(:reply_to) do - nil -> :ok - from -> GenServer.reply(from, reason) - end - {:stop, {:shutdown, reason}, state} end diff --git a/lib/extreme/subscription.ex b/lib/extreme/subscription.ex index 25b054c..4107753 100644 --- a/lib/extreme/subscription.ex +++ b/lib/extreme/subscription.ex @@ -25,9 +25,9 @@ defmodule Extreme.Subscription do Calls `server` with :unsubscribe message. `server` can be either `Subscription` or `ReadingSubscription`. """ def unsubscribe(server), - do: GenServer.call(server, :unsubscribe) + do: GenServer.cast(server, :unsubscribe) - @impl true + @impl GenServer def init({base_name, correlation_id, subscriber, stream, resolve_link_tos, ack_timeout}) do read_params = %{stream: stream, resolve_link_tos: resolve_link_tos, ack_timeout: ack_timeout} @@ -44,13 +44,12 @@ defmodule Extreme.Subscription do {:ok, state} end - @impl true - def handle_call(:unsubscribe, from, state) do - :ok = Shared.unsubscribe(from, state) + @impl GenServer + def handle_cast(:unsubscribe, state) do + :ok = Shared.unsubscribe(state) {:noreply, state} end - @impl true def handle_cast({:process_push, fun}, state), do: Shared.process_push(fun, state) end diff --git a/lib/extreme/subscriptions_supervisor.ex b/lib/extreme/subscriptions_supervisor.ex index 4979933..c77b45f 100644 --- a/lib/extreme/subscriptions_supervisor.ex +++ b/lib/extreme/subscriptions_supervisor.ex @@ -30,7 +30,7 @@ defmodule Extreme.SubscriptionsSupervisor do }) end - def start_subscription(base_name, correlation_id, subscriber, read_params) do + def start_reading_subscription(base_name, correlation_id, subscriber, read_params) do base_name |> _name() |> DynamicSupervisor.start_child(%{ @@ -61,6 +61,15 @@ defmodule Extreme.SubscriptionsSupervisor do }) end + def stop_subscription(_base_name, nil), + do: :ok + + def stop_subscription(base_name, pid) do + base_name + |> _name() + |> DynamicSupervisor.terminate_child(pid) + end + def kill_all_subscriptions(base_name) do name = _name(base_name) diff --git a/lib/fanout_listener.ex b/lib/fanout_listener.ex index ae8c29a..f2cc265 100644 --- a/lib/fanout_listener.ex +++ b/lib/fanout_listener.ex @@ -75,7 +75,7 @@ defmodule Extreme.FanoutListener do def resubscribe(server), do: GenServer.call(server, :resubscribe) - def unsubscribe(server), do: GenServer.call(server, :unsubscribe) + def unsubscribe(server), do: GenServer.cast(server, :unsubscribe) @impl true def init({extreme, stream_name}) do @@ -96,12 +96,6 @@ defmodule Extreme.FanoutListener do {:reply, :ok, %{state | subscription: subscription, subscription_ref: ref}} end - def handle_call(:unsubscribe, from, state) do - true = Process.demonitor(state.subscription_ref) - :unsubscribed = state.extreme.unsubscribe(state.subscription) - {:reply, :ok, %{state | subscription: nil, subscription_ref: nil}} - end - @impl true def handle_cast(:subscribe, state) do {:ok, subscription, ref} = _subscribe(state) @@ -109,6 +103,12 @@ defmodule Extreme.FanoutListener do {:noreply, %{state | subscription: subscription, subscription_ref: ref}} end + def handle_cast(:unsubscribe, state) do + true = Process.demonitor(state.subscription_ref) + :ok = state.extreme.unsubscribe(state.subscription) + {:noreply, %{state | subscription: nil, subscription_ref: nil}} + end + @impl true def handle_info({:DOWN, ref, :process, _pid, _reason}, %{subscription_ref: ref} = state) do reconnect_delay = 1_000 diff --git a/lib/listener.ex b/lib/listener.ex index efc1cc6..d2efc4b 100644 --- a/lib/listener.ex +++ b/lib/listener.ex @@ -100,6 +100,9 @@ defmodule Extreme.Listener do def handle_call(:subscribed?, _from, state), do: {:reply, !!state.subscription, state} + defp _unsubscribe(%{subscription: nil, subscription_ref: nil} = state), + do: {:reply, :ok, state} + defp _unsubscribe(state) do Logger.info( "#{__MODULE__} unsubscribed from #{state.stream_name}. Last processed event: #{state.last_event}" @@ -107,7 +110,7 @@ defmodule Extreme.Listener do true = Process.demonitor(state.subscription_ref) - :unsubscribed = state.extreme.unsubscribe(state.subscription) + :ok = state.extreme.unsubscribe(state.subscription) {:reply, :ok, %{state | subscription: nil, subscription_ref: nil}} end diff --git a/lib/listener_with_backpressure.ex b/lib/listener_with_backpressure.ex index 6bec952..dac150f 100644 --- a/lib/listener_with_backpressure.ex +++ b/lib/listener_with_backpressure.ex @@ -6,9 +6,23 @@ defmodule Extreme.ListenerWithBackPressure do The way it works is that there's intermediate process which turns off subscription when `max_buffer` is reached and creates new subscription when all buffered events are processed. """ + + @callback on_init(opts_from_start_link :: Keyword.t()) :: :ok | {:ok, client_state :: any()} + @callback get_last_event(strream_name :: String.t(), client_state :: any()) :: + last_event :: integer() + @callback process_push( + push_from_es :: Extreme.Messages.ResolvedEvent.t(), + stream_name :: String.t(), + client_state :: any() + ) :: + {:ok, event_number :: non_neg_integer()} + | :ok + | :stop + defmacro __using__(_) do quote location: :keep do use GenServer + @behaviour Extreme.ListenerWithBackPressure require Logger def child_spec([extreme, stream_name | opts]) do @@ -39,7 +53,7 @@ defmodule Extreme.ListenerWithBackPressure do end def subscribe(server \\ __MODULE__), do: GenServer.call(server, :subscribe) - def unsubscribe(server \\ __MODULE__), do: GenServer.call(server, :unsubscribe) + def unsubscribe(server \\ __MODULE__), do: GenServer.cast(server, :unsubscribe) def subscribed?(server \\ __MODULE__), do: GenServer.call(server, :subscribed?) def producer_status(server \\ __MODULE__), do: GenServer.call(server, :producer_status) @@ -90,11 +104,6 @@ defmodule Extreme.ListenerWithBackPressure do {:reply, response, state} end - def handle_call(:unsubscribe, _from, state) do - :unsubscribed = state.extreme.unsubscribe_producer(state.producer) - {:reply, :ok, state} - end - def handle_call(:subscribed?, _from, state) do response = state.extreme.producer_subscription_status(state.producer) != :disconnected {:reply, response, state} @@ -105,10 +114,17 @@ defmodule Extreme.ListenerWithBackPressure do {:reply, response, state} end + @impl GenServer + def handle_cast(:unsubscribe, state) do + :ok = state.extreme.unsubscribe_producer(state.producer) + {:noreply, state} + end + @impl GenServer def handle_info(_, %{} = state), do: {:noreply, state} + @impl Extreme.ListenerWithBackPressure def on_init(_), do: :ok defoverridable on_init: 1 diff --git a/mix.exs b/mix.exs index 55a2c89..f7a6320 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Extreme.Mixfile do def project do [ app: :extreme, - version: "1.1.0-rc3", + version: "1.1.0-rc6", elixir: "~> 1.11", elixirc_paths: _elixirc_paths(Mix.env()), source_url: "https://github.com/exponentially/extreme", @@ -13,6 +13,11 @@ defmodule Extreme.Mixfile do """, package: _package(), start_permanent: Mix.env() == :prod, + test_coverage: [ + summary: [ + threshold: 83 + ] + ], preferred_cli_env: [ vcr: :test, "vcr.delete": :test, diff --git a/mix.lock b/mix.lock index adcd7f7..896959c 100644 --- a/mix.lock +++ b/mix.lock @@ -2,7 +2,7 @@ "earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, - "ex_doc": {:hex, :ex_doc, "0.34.0", "ab95e0775db3df71d30cf8d78728dd9261c355c81382bcd4cefdc74610bef13e", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "60734fb4c1353f270c3286df4a0d51e65a2c1d9fba66af3940847cc65a8066d7"}, + "ex_doc": {:hex, :ex_doc, "0.34.1", "9751a0419bc15bc7580c73fde506b17b07f6402a1e5243be9e0f05a68c723368", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d441f1a86a235f59088978eff870de2e815e290e44a8bd976fe5d64470a4c9d2"}, "exactor": {:hex, :exactor, "2.2.4", "5efb4ddeb2c48d9a1d7c9b465a6fffdd82300eb9618ece5d34c3334d5d7245b1", [:mix], [], "hexpm", "1222419f706e01bfa1095aec9acf6421367dcfab798a6f67c54cf784733cd6b5"}, "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "32e95820a97cffea67830e91514a2ad53b888850442d6d395f53a1ac60c82e07"}, "exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"}, diff --git a/test/extreme/event_producer_test.exs b/test/extreme/event_producer_test.exs index c5ebf53..3aac848 100644 --- a/test/extreme/event_producer_test.exs +++ b/test/extreme/event_producer_test.exs @@ -526,8 +526,6 @@ defmodule Extreme.EventProducerTest do :ok = TestConn.subscribe_producer(producer) # we should get previously stopped event and the one after that for _ <- 1..2, do: assert_receive({:on_event, _event}) - assert :catching_up = TestConn.producer_subscription_status(producer) - Process.sleep(50) assert :live = TestConn.producer_subscription_status(producer) @@ -535,7 +533,7 @@ defmodule Extreme.EventProducerTest do assert Subscriber.received_events(subscriber) == events1 # We can manually stop subscription without killing subscriber or producer - :unsubscribed = TestConn.unsubscribe_producer(producer) + :ok = TestConn.unsubscribe_producer(producer) assert :disconnected = TestConn.producer_subscription_status(producer) # then new events are not delivered to subscriber @@ -551,7 +549,57 @@ defmodule Extreme.EventProducerTest do # and then when we resubscribe, we'll get pending event :ok = TestConn.subscribe_producer(producer) assert_receive({:on_event, _event}) - assert :catching_up = TestConn.producer_subscription_status(producer) + Process.sleep(50) + assert :live = TestConn.producer_subscription_status(producer) + + Helpers.unsubscribe(TestConn, producer) + end + + test "subscription is turned off when buffer is full and then turned on when it is empty" do + stream = Helpers.random_stream_name() + num_events = 200 + # prepopulate stream + events1 = + 1..num_events + |> Enum.map(fn _ -> %Event.SlowProcessingEventHappened{sleep: 5} end) + + spawn(fn -> + Enum.each(events1, fn e -> + {:ok, %ExMsg.WriteEventsCompleted{}} = + TestConn.execute(Helpers.write_events(stream, [e])) + end) + + Logger.debug("All events written") + end) + + # subscribe to existing stream + {:ok, subscriber} = Subscriber.start_link() + + opts = [ + from_event_number: -1, + per_page: 2, + resolve_link_tos: true, + require_master: false, + ack_timeout: 5_000, + max_buffered: 2, + auto_subscribe: true + ] + + {:ok, producer} = TestConn.start_event_producer(stream, subscriber, opts) + + for _ <- 1..num_events, do: assert_receive({:on_event, _event}) + + # assert :caught_up is received when existing events are read + assert_receive :caught_up + + # check if events came in correct order. + assert Subscriber.received_events(subscriber) == events1 + + {:ok, %ExMsg.ReadStreamEventsCompleted{} = response} = + TestConn.execute(Helpers.read_events(stream, 0, 2_000)) + + assert events1 == + Enum.map(response.events, fn event -> :erlang.binary_to_term(event.event.data) end) Helpers.unsubscribe(TestConn, producer) end diff --git a/test/extreme/persistent_subscription_test.exs b/test/extreme/persistent_subscription_test.exs index 87fc47f..e8e535f 100644 --- a/test/extreme/persistent_subscription_test.exs +++ b/test/extreme/persistent_subscription_test.exs @@ -30,7 +30,7 @@ defmodule Extreme.PersistentSubscriptionTest do end def handle_call(:unsubscribe, _from, state) do - :unsubscribed = TestConn.unsubscribe(state.subscription_pid) + :ok = TestConn.unsubscribe(state.subscription_pid) {:reply, :ok, state} end diff --git a/test/extreme_test.exs b/test/extreme_test.exs index 20f1a60..c92d058 100644 --- a/test/extreme_test.exs +++ b/test/extreme_test.exs @@ -113,7 +113,7 @@ defmodule ExtremeTest do }} = TestConn.execute(Helpers.write_events(stream)) end - test "for hard deleted stream is refused" do + test "for hard deleted stream is rejected" do stream = Helpers.random_stream_name() assert {:ok, %ExMsg.WriteEventsCompleted{}} = TestConn.execute(Helpers.write_events(stream)) diff --git a/test/listener_test.exs b/test/listener_test.exs index 59790e7..55e5464 100644 --- a/test/listener_test.exs +++ b/test/listener_test.exs @@ -81,6 +81,8 @@ defmodule Extreme.ListenerTest do :ok = MyListener.unsubscribe(listener) refute MyListener.subscribed?(listener) + # unsubscribe is idempotent + :ok = MyListener.unsubscribe(listener) Helpers.assert_no_leaks(TestConn) end diff --git a/test/listener_with_backpressure_test.exs b/test/listener_with_backpressure_test.exs index 03d24ce..1a74767 100644 --- a/test/listener_with_backpressure_test.exs +++ b/test/listener_with_backpressure_test.exs @@ -9,15 +9,19 @@ defmodule Extreme.ListenerWithBackPressureTest do use Extreme.ListenerWithBackPressure alias ExtremeTest.DB + @impl Extreme.ListenerWithBackPressure def on_init(_opts) do client_state = nil {:ok, client_state} end - defp get_last_event(stream_name, _client_state), + @doc false + @impl Extreme.ListenerWithBackPressure + def get_last_event(stream_name, _client_state), do: DB.get_last_event(MyListenerWithBackPressure, stream_name) - defp process_push(push, stream_name, _client_state) do + @impl Extreme.ListenerWithBackPressure + def process_push(push, stream_name, _client_state) do event_number = push.event.event_number # for indexed stream we need to follow link event_number: diff --git a/test/subscriptions_test.exs b/test/subscriptions_test.exs index 72aa6b1..a159b5e 100644 --- a/test/subscriptions_test.exs +++ b/test/subscriptions_test.exs @@ -487,15 +487,14 @@ defmodule ExtremeSubscriptionsTest do {:ok, subscriber} = Subscriber.start_link() {:ok, subscription} = TestConn.read_and_stay_subscribed(stream, subscriber, 0, 2) - Logger.debug("Second pack of events written") - # assert first events are received for _ <- 1..num_events, do: assert_receive({:on_event, _event}) Logger.debug("First pack of events received") # assert second pack of events is received as well - for _ <- 1..num_events, do: assert_receive({:on_event, _event}, 1_000) + for _ <- 1..num_events, do: assert_receive({:on_event, _event}) + Logger.debug("Second pack of events received") # assert :caught_up is received when existing events are read assert_receive :caught_up @@ -546,8 +545,6 @@ defmodule ExtremeSubscriptionsTest do {:ok, subscription} = TestConn.read_and_stay_subscribed(stream, subscriber, 0, 2) - Logger.debug("Second pack of events written") - # assert first events are received for _ <- 1..num_events, do: assert_receive({:on_event, _event}) @@ -555,6 +552,7 @@ defmodule ExtremeSubscriptionsTest do # assert second pack of events is received as well for _ <- 1..num_events, do: assert_receive({:on_event, _event}) + Logger.debug("Second pack of events received") # assert :caught_up is received when existing events are read assert_receive :caught_up @@ -620,51 +618,5 @@ defmodule ExtremeSubscriptionsTest do Helpers.unsubscribe(TestConn, subscription) end - - test "backpressure" do - stream = Helpers.random_stream_name() - num_events = 1_000 - # prepopulate stream - events = - 1..num_events - |> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x}"} end) - - {:ok, %ExMsg.WriteEventsCompleted{}} = - TestConn.execute(Helpers.write_events(stream, events)) - - # subscribe to existing stream - read_per_batch = 100 - {:ok, subscriber} = Subscriber.start_link() - - {:ok, subscription} = - TestConn.read_and_stay_subscribed(stream, subscriber, 0, read_per_batch) - - connection = - Extreme.Connection._name(TestConn) - |> Process.whereis() - - request_manager = - Extreme.RequestManager._name(TestConn) - |> Process.whereis() - - for _ <- 1..num_events do - {:message_queue_len, len} = Process.info(subscriber, :message_queue_len) - assert len < 2 - {:message_queue_len, len} = Process.info(subscription, :message_queue_len) - assert len < 2 - {:message_queue_len, len} = Process.info(connection, :message_queue_len) - assert len < 2 - {:message_queue_len, len} = Process.info(request_manager, :message_queue_len) - assert len < 2 - - assert_receive({:on_event, _event}) - Process.sleep(10) - end - - # assert :caught_up is received when all events are read - assert_receive :caught_up - - Helpers.unsubscribe(TestConn, subscription) - end end end diff --git a/test/support/helpers.ex b/test/support/helpers.ex index fdc1394..302ab3f 100644 --- a/test/support/helpers.ex +++ b/test/support/helpers.ex @@ -11,6 +11,7 @@ defmodule ExtremeTest.Helpers do alias Extreme.Messages, as: ExMsg alias ExtremeTest.Events, as: Event require ExUnit.Assertions + require Logger import ExUnit.Assertions def random_stream_name, do: "extreme_test-" <> to_string(UUID.uuid1()) @@ -108,22 +109,32 @@ defmodule ExtremeTest.Helpers do end def unsubscribe(extreme, subscription) do - :unsubscribed = extreme.unsubscribe(subscription) + :ok = extreme.unsubscribe(subscription) + assert_no_leaks(extreme) end def assert_no_leaks(base_name) do + # it takes some time for unsubscribe response from ES + Process.sleep(50) + assert( %{received_data: ""} = Extreme.Connection._name(base_name) |> :sys.get_state(), - "There are unprocessed received data in connection process" + "There is unprocessed received data in connection process" ) %{requests: requests, subscriptions: subscriptions} = Extreme.RequestManager._name(base_name) |> :sys.get_state() + unless Enum.empty?(requests), + do: Logger.error("Requests registered in RequestManager: #{inspect(requests)}") + assert Enum.empty?(requests), "There are #{Enum.count(requests)} waiting requests in request manager" + unless Enum.empty?(subscriptions), + do: Logger.error("Subscriptions registered in RequestManager: #{inspect(subscriptions)}") + assert Enum.empty?(subscriptions), "There are #{Enum.count(subscriptions)} opened subscriptions in request manager"