diff --git a/lib/xtb_client/streaming_message.ex b/lib/xtb_client/streaming_message.ex index 5398f76..3eafc36 100644 --- a/lib/xtb_client/streaming_message.ex +++ b/lib/xtb_client/streaming_message.ex @@ -6,40 +6,30 @@ defmodule XtbClient.StreamingMessage do @type t :: %__MODULE__{ method: String.t(), response_method: String.t(), + metadata: map(), params: map() | nil } - @type token :: {:method, String.t()} | {:hashed_params, String.t(), String.t()} - @enforce_keys [:method, :response_method, :params] + @enforce_keys [:method, :response_method, :metadata, :params] defstruct method: "", response_method: "", + metadata: %{}, params: nil - def new(method, response_method, params \\ nil) do + def new(method, response_method, metadata, params \\ nil) do %__MODULE__{ method: method, response_method: response_method, + metadata: metadata, params: params } end - def encode_token(%__MODULE__{method: "getTrades" = method_name}) do - {:method, method_name} + def get_method_name(%__MODULE__{method: method_name}) do + method_name end - def encode_token(%__MODULE__{method: method_name, params: %{symbol: symbol}}) do - {:hashed_params, method_name, symbol} - end - - def encode_token(%__MODULE__{method: method_name}) do - {:method, method_name} - end - - def decode_method_name({:method, method}) do - method - end - - def decode_method_name({:hashed_params, method, _symbol}) do - method + def get_metadata(%__MODULE__{metadata: metadata}) do + metadata end end diff --git a/lib/xtb_client/streaming_socket.ex b/lib/xtb_client/streaming_socket.ex index 5ce2418..1b0919b 100644 --- a/lib/xtb_client/streaming_socket.ex +++ b/lib/xtb_client/streaming_socket.ex @@ -25,6 +25,8 @@ defmodule XtbClient.StreamingSocket do @ping_interval 30 * 1000 + @type metadata :: map() + defmodule Config do @moduledoc false @@ -79,10 +81,13 @@ defmodule XtbClient.StreamingSocket do ## Params: - `token` - unique token of the subscribed method & params, - `message` - struct with response data + - `metadata` - map with additional context data attached to subscription + """ @callback handle_message( - token :: StreamingMessage.token(), - message :: struct() + token :: StreamingMessage.t(), + message :: struct(), + metadata :: metadata() ) :: :ok @doc """ @@ -90,8 +95,10 @@ defmodule XtbClient.StreamingSocket do ## Params: - `error` - struct with error data + - `metadata` - map with additional context data attached to subscription + """ - @callback handle_error(error :: Error.t()) :: :ok + @callback handle_error(error :: Error.t(), metadata :: metadata()) :: :ok @doc false defmacro __using__(_opts) do @@ -99,16 +106,16 @@ defmodule XtbClient.StreamingSocket do @behaviour XtbClient.StreamingSocket @doc false - def handle_message(token, message) do + def handle_message(token, message, _metadata) do raise "No handle_message/2 clause in #{__MODULE__} provided for #{inspect(message)}" end @doc false - def handle_error(error) do + def handle_error(error, _metadata) do raise "No handle_error/1 clause in #{__MODULE__} provided for #{inspect(error)}" end - defoverridable handle_message: 2, handle_error: 1 + defoverridable handle_message: 3, handle_error: 2 end end @@ -156,13 +163,12 @@ defmodule XtbClient.StreamingSocket do Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. When the new data are available, the `XtbClient.Messages.BalanceInfo` struct is sent via `handle_message/2` callback. """ - @spec subscribe_get_balance(socket :: GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_balance(socket) do - with message <- StreamingMessage.new("getBalance", "balance"), - token <- StreamingMessage.encode_token(message), + @spec subscribe_get_balance(socket :: GenServer.server(), metadata :: metadata()) :: + {:ok, StreamingMessage.t()} | {:error, term()} + def subscribe_get_balance(socket, metadata \\ %{}) do + with message <- StreamingMessage.new("getBalance", "balance", metadata), :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -172,12 +178,11 @@ defmodule XtbClient.StreamingSocket do Unsubscribes from stream of account indicators. """ @spec unsubscribe_get_balance(socket :: GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} + {:ok, StreamingMessage.t()} | {:error, term()} def unsubscribe_get_balance(socket) do - with message <- StreamingMessage.new("stopBalance", "balance"), - token <- StreamingMessage.encode_token(message), + with message <- StreamingMessage.new("stopBalance", "balance", %{}), :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -192,13 +197,13 @@ defmodule XtbClient.StreamingSocket do """ @spec subscribe_get_candles( GenServer.server(), - XtbClient.Messages.Candles.Query.t() - ) :: {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_candles(socket, %Messages.Candles.Query{} = params) do - with message <- StreamingMessage.new("getCandles", "candle", params), - token <- StreamingMessage.encode_token(message), + XtbClient.Messages.Candles.Query.t(), + metadata :: metadata() + ) :: {:ok, StreamingMessage.t()} | {:error, term()} + def subscribe_get_candles(socket, %Messages.Candles.Query{} = params, metadata \\ %{}) do + with message <- StreamingMessage.new("getCandles", "candle", metadata, params), :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -210,12 +215,11 @@ defmodule XtbClient.StreamingSocket do @spec unsubscribe_get_candles( GenServer.server(), XtbClient.Messages.Candles.Query.t() - ) :: {:ok, StreamingMessage.token()} | {:error, term()} + ) :: {:ok, StreamingMessage.t()} | {:error, term()} def unsubscribe_get_candles(socket, %Messages.Candles.Query{} = params) do - with message <- StreamingMessage.new("stopCandles", "candle", params), - token <- StreamingMessage.encode_token(message), + with message <- StreamingMessage.new("stopCandles", "candle", %{}, params), :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -228,13 +232,12 @@ defmodule XtbClient.StreamingSocket do Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. When the new data are available, the `XtbClient.Messages.KeepAlive` struct is sent via `handle_message/2` callback. """ - @spec subscribe_keep_alive(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_keep_alive(socket) do - with message <- StreamingMessage.new("getKeepAlive", "keepAlive"), - token <- StreamingMessage.encode_token(message), + @spec subscribe_keep_alive(GenServer.server(), metadata :: metadata()) :: + {:ok, StreamingMessage.t()} | {:error, term()} + def subscribe_keep_alive(socket, metadata \\ %{}) do + with message <- StreamingMessage.new("getKeepAlive", "keepAlive", metadata), :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -244,12 +247,11 @@ defmodule XtbClient.StreamingSocket do Unsubscribes from `keep alive` messages. """ @spec unsubscribe_keep_alive(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} + {:ok, StreamingMessage.t()} | {:error, term()} def unsubscribe_keep_alive(socket) do - with message <- StreamingMessage.new("stopKeepAlive", "keepAlive"), - token <- StreamingMessage.encode_token(message), + with message <- StreamingMessage.new("stopKeepAlive", "keepAlive", %{}), :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -261,13 +263,12 @@ defmodule XtbClient.StreamingSocket do Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. When the new data are available, the `XtbClient.Messages.NewsInfos` struct is sent via `handle_message/2` callback. """ - @spec subscribe_get_news(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_news(socket) do - with message <- StreamingMessage.new("getNews", "news"), - token <- StreamingMessage.encode_token(message), + @spec subscribe_get_news(GenServer.server(), metadata :: metadata()) :: + {:ok, StreamingMessage.t()} | {:error, term()} + def subscribe_get_news(socket, metadata \\ %{}) do + with message <- StreamingMessage.new("getNews", "news", metadata), :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -277,12 +278,11 @@ defmodule XtbClient.StreamingSocket do Unsubscribes from news stream. """ @spec unsubscribe_get_news(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} + {:ok, StreamingMessage.t()} | {:error, term()} def unsubscribe_get_news(socket) do - with message <- StreamingMessage.new("stopNews", "news"), - token <- StreamingMessage.encode_token(message), + with message <- StreamingMessage.new("stopNews", "news", %{}), :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -294,13 +294,12 @@ defmodule XtbClient.StreamingSocket do Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. When the new data are available, the `XtbClient.Messages.ProfitInfo` struct is sent via `handle_message/2` callback. """ - @spec subscribe_get_profits(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_profits(socket) do - with message <- StreamingMessage.new("getProfits", "profit"), - token <- StreamingMessage.encode_token(message), + @spec subscribe_get_profits(GenServer.server(), metadata :: metadata()) :: + {:ok, StreamingMessage.t()} | {:error, term()} + def subscribe_get_profits(socket, metadata \\ %{}) do + with message <- StreamingMessage.new("getProfits", "profit", metadata), :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -310,12 +309,11 @@ defmodule XtbClient.StreamingSocket do Unsubscribes from profits stream. """ @spec unsubscribe_get_profits(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} + {:ok, StreamingMessage.t()} | {:error, term()} def unsubscribe_get_profits(socket) do - with message <- StreamingMessage.new("stopProfits", "profit"), - token <- StreamingMessage.encode_token(message), + with message <- StreamingMessage.new("stopProfits", "profit", %{}), :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -331,14 +329,14 @@ defmodule XtbClient.StreamingSocket do """ @spec subscribe_get_tick_prices( GenServer.server(), - XtbClient.Messages.Quotations.Query.t() + XtbClient.Messages.Quotations.Query.t(), + metadata :: metadata() ) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_tick_prices(socket, %Messages.Quotations.Query{} = params) do - with message <- StreamingMessage.new("getTickPrices", "tickPrices", params), - token <- StreamingMessage.encode_token(message), + {:ok, StreamingMessage.t()} | {:error, term()} + def subscribe_get_tick_prices(socket, %Messages.Quotations.Query{} = params, metadata \\ %{}) do + with message <- StreamingMessage.new("getTickPrices", "tickPrices", metadata, params), :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -351,12 +349,11 @@ defmodule XtbClient.StreamingSocket do GenServer.server(), XtbClient.Messages.Quotations.Query.t() ) :: - {:ok, StreamingMessage.token()} | {:error, term()} + {:ok, StreamingMessage.t()} | {:error, term()} def unsubscribe_get_tick_prices(socket, %Messages.Quotations.Query{} = params) do - with message <- StreamingMessage.new("stopTickPrices", "tickPrices", params), - token <- StreamingMessage.encode_token(message), + with message <- StreamingMessage.new("stopTickPrices", "tickPrices", %{}, params), :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -369,13 +366,12 @@ defmodule XtbClient.StreamingSocket do Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. When the new data are available, the `XtbClient.Messages.TradeInfos` struct is sent via `handle_message/2` callback. """ - @spec subscribe_get_trades(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_trades(socket) do - with message <- StreamingMessage.new("getTrades", "trade"), - token <- StreamingMessage.encode_token(message), + @spec subscribe_get_trades(GenServer.server(), metadata :: metadata()) :: + {:ok, StreamingMessage.t()} | {:error, term()} + def subscribe_get_trades(socket, metadata \\ %{}) do + with message <- StreamingMessage.new("getTrades", "trade", metadata), :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -385,12 +381,11 @@ defmodule XtbClient.StreamingSocket do Unsubscribes from user trade status stream. """ @spec unsubscribe_get_trades(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} + {:ok, StreamingMessage.t()} | {:error, term()} def unsubscribe_get_trades(socket) do - with message <- StreamingMessage.new("stopTrades", "trade"), - token <- StreamingMessage.encode_token(message), + with message <- StreamingMessage.new("stopTrades", "trade", %{}), :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -403,13 +398,12 @@ defmodule XtbClient.StreamingSocket do Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. When the new data are available, the `XtbClient.Messages.TradeStatus` struct is sent via `handle_message/2` callback. """ - @spec subscribe_get_trade_status(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_trade_status(socket) do - with message <- StreamingMessage.new("getTradeStatus", "tradeStatus"), - token <- StreamingMessage.encode_token(message), + @spec subscribe_get_trade_status(GenServer.server(), metadata :: metadata()) :: + {:ok, StreamingMessage.t()} | {:error, term()} + def subscribe_get_trade_status(socket, metadata \\ %{}) do + with message <- StreamingMessage.new("getTradeStatus", "tradeStatus", metadata), :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -419,12 +413,11 @@ defmodule XtbClient.StreamingSocket do Unsubscribes from status for sent trade requests stream. """ @spec unsubscribe_get_trade_status(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} + {:ok, StreamingMessage.t()} | {:error, term()} def unsubscribe_get_trade_status(socket) do - with message <- StreamingMessage.new("stopTradeStatus", "tradeStatus"), - token <- StreamingMessage.encode_token(message), + with message <- StreamingMessage.new("stopTradeStatus", "tradeStatus", %{}), :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do - {:ok, token} + {:ok, message} else err -> {:error, err} end @@ -452,7 +445,7 @@ defmodule XtbClient.StreamingSocket do Map.put( subscriptions, response_method, - StreamingMessage.encode_token(message) + message ) encoded_message = encode_streaming_command({method, params}, session_id) @@ -508,15 +501,15 @@ defmodule XtbClient.StreamingSocket do @impl WebSockex def handle_frame({:text, msg}, %State{module: module} = state) do with {:ok, resp} <- Jason.decode(msg), - {:ok, {token, message}} <- handle_response(resp, state), - :ok <- module.handle_message(token, message) do + {:ok, {token, message, metadata}} <- handle_response(resp, state), + :ok <- module.handle_message(token, message, metadata) do {:ok, state} else {:ok, _} = result -> result {:error, error} -> - module.handle_error(error) + module.handle_error(error, %{}) {:ok, state} end end @@ -526,9 +519,10 @@ defmodule XtbClient.StreamingSocket do %State{subscriptions: subscriptions} = _state ) do with token <- Map.get(subscriptions, response_method), - method <- StreamingMessage.decode_method_name(token), + method <- StreamingMessage.get_method_name(token), + metadata <- StreamingMessage.get_metadata(token), result <- Messages.decode_message(method, data) do - {:ok, {token, result}} + {:ok, {token, result, metadata}} end end diff --git a/test/support/fixtures/main_socket_e2e_fixtures.ex b/test/support/fixtures/main_socket_e2e_fixtures.ex index ca50ec3..4c38736 100644 --- a/test/support/fixtures/main_socket_e2e_fixtures.ex +++ b/test/support/fixtures/main_socket_e2e_fixtures.ex @@ -20,7 +20,7 @@ defmodule XtbClient.MainSocket.E2EFixtures do Map.merge( close_args, %{ - price: position_to_close.open_price - 0.01, + price: 0.01, order: position_to_close.order_opened } ) diff --git a/test/support/mocks/streaming_socket_mock.ex b/test/support/mocks/streaming_socket_mock.ex index 9cfc07e..a827a93 100644 --- a/test/support/mocks/streaming_socket_mock.ex +++ b/test/support/mocks/streaming_socket_mock.ex @@ -6,7 +6,7 @@ defmodule XtbClient.StreamingSocketMock do @store_mock XtbClient.StreamingTestStoreMock @impl XtbClient.StreamingSocket - def handle_message(_token, message) do + def handle_message(_token, message, metadata) do alive? = Process.whereis(@store_mock) case alive? do @@ -20,13 +20,13 @@ defmodule XtbClient.StreamingSocketMock do fn %{parent_pid: pid} -> pid end ) - send(parent_pid, {:ok, message}) + send(parent_pid, {:ok, message, metadata}) :ok end end @impl XtbClient.StreamingSocket - def handle_error(error) do + def handle_error(error, metadata) do alive? = Process.whereis(@store_mock) case alive? do @@ -40,7 +40,7 @@ defmodule XtbClient.StreamingSocketMock do fn %{parent_pid: pid} -> pid end ) - send(parent_pid, {:error, error}) + send(parent_pid, {:error, error, metadata}) :ok end end diff --git a/test/xtb_client/streaming_socket_test.exs b/test/xtb_client/streaming_socket_test.exs index 511e5ce..19dce11 100644 --- a/test/xtb_client/streaming_socket_test.exs +++ b/test/xtb_client/streaming_socket_test.exs @@ -100,7 +100,8 @@ defmodule XtbClient.StreamingSocketTest do end test "subscribe to get balance", %{pid: pid, main: main} do - assert {:ok, _} = StreamingSocket.subscribe_get_balance(pid) + metadata = %{hash: :rand.uniform(1_000)} + assert {:ok, _} = StreamingSocket.subscribe_get_balance(pid, metadata) buy_args = %{ operation: :buy, @@ -113,7 +114,8 @@ defmodule XtbClient.StreamingSocketTest do {:ok, %{order: order_id}} = open_trade(main, buy_args) - assert_receive {:ok, %Messages.BalanceInfo{}}, @default_wait_time + assert_receive {:ok, %Messages.BalanceInfo{}, received_metadata}, @default_wait_time + assert ^metadata = received_metadata close_args = %{ operation: :buy, @@ -125,7 +127,8 @@ defmodule XtbClient.StreamingSocketTest do {:ok, _} = close_trade(main, order_id, close_args) - assert_receive {:ok, %Messages.BalanceInfo{}}, @default_wait_time + assert_receive {:ok, %Messages.BalanceInfo{}, received_metadata}, @default_wait_time + assert ^metadata = received_metadata end test "unsubscribe from get balance", %{pid: pid, main: main} do @@ -142,7 +145,7 @@ defmodule XtbClient.StreamingSocketTest do {:ok, %{order: order_id}} = open_trade(main, buy_args) - assert_receive {:ok, %Messages.BalanceInfo{}}, @default_wait_time + assert_receive {:ok, %Messages.BalanceInfo{}, _metadata}, @default_wait_time assert {:ok, _} = StreamingSocket.unsubscribe_get_balance(pid) @@ -156,16 +159,18 @@ defmodule XtbClient.StreamingSocketTest do {:ok, _} = close_trade(main, order_id, close_args) - refute_receive {:ok, %Messages.BalanceInfo{}}, 100 + refute_receive {:ok, %Messages.BalanceInfo{}, _metadata}, 100 end @tag timeout: @default_wait_time test "subscribe to get candles", %{pid: pid} do + metadata = %{hash: :rand.uniform(1_000)} args = "LITECOIN" query = Messages.Candles.Query.new(args) - assert {:ok, _} = StreamingSocket.subscribe_get_candles(pid, query) + assert {:ok, _} = StreamingSocket.subscribe_get_candles(pid, query, metadata) - assert_receive {:ok, %Messages.Candle{}}, @default_wait_time + assert_receive {:ok, %Messages.Candle{}, received_metadata}, @default_wait_time + assert ^metadata = received_metadata end @tag timeout: @default_wait_time * 2 @@ -174,58 +179,63 @@ defmodule XtbClient.StreamingSocketTest do query = Messages.Candles.Query.new(args) assert {:ok, _} = StreamingSocket.subscribe_get_candles(pid, query) - assert_receive {:ok, %Messages.Candle{}}, @default_wait_time + assert_receive {:ok, %Messages.Candle{}, _metadata}, @default_wait_time # wait for already received messages flush() assert {:ok, _} = StreamingSocket.unsubscribe_get_candles(pid, query) - refute_receive {:ok, %Messages.Candle{}}, 100 + refute_receive {:ok, %Messages.Candle{}, _metadata}, 100 end test "subscribe to keep alive", %{pid: pid} do - assert {:ok, _} = StreamingSocket.subscribe_keep_alive(pid) + metadata = %{hash: :rand.uniform(1_000)} + assert {:ok, _} = StreamingSocket.subscribe_keep_alive(pid, metadata) - assert_receive {:ok, %Messages.KeepAlive{}}, @default_wait_time + assert_receive {:ok, %Messages.KeepAlive{}, received_metadata}, @default_wait_time + assert ^metadata = received_metadata end test "unsubscribe from keep alive", %{pid: pid} do assert {:ok, _} = StreamingSocket.subscribe_keep_alive(pid) - assert_receive {:ok, %Messages.KeepAlive{}}, @default_wait_time + assert_receive {:ok, %Messages.KeepAlive{}, _metadata}, @default_wait_time # wait for already received messages flush() assert {:ok, _} = StreamingSocket.unsubscribe_keep_alive(pid) - refute_receive {:ok, %Messages.KeepAlive{}}, 100 + refute_receive {:ok, %Messages.KeepAlive{}, _metadata}, 100 end @tag skip: true test "subscribe to get news", %{pid: pid} do - assert {:ok, _} = StreamingSocket.subscribe_get_news(pid) + metadata = %{hash: :rand.uniform(1_000)} + assert {:ok, _} = StreamingSocket.subscribe_get_news(pid, metadata) - assert_receive {:ok, %Messages.NewsInfo{}}, @default_wait_time + assert_receive {:ok, %Messages.NewsInfo{}, received_metadata}, @default_wait_time + assert ^metadata = received_metadata end @tag skip: true test "unsubscribe from get news", %{pid: pid} do assert {:ok, _} = StreamingSocket.subscribe_get_news(pid) - assert_receive {:ok, %Messages.NewsInfo{}}, @default_wait_time + assert_receive {:ok, %Messages.NewsInfo{}, _metadata}, @default_wait_time # wait for already received messages flush() assert {:ok, _} = StreamingSocket.unsubscribe_get_news(pid) - refute_receive {:ok, %Messages.NewsInfo{}}, 100 + refute_receive {:ok, %Messages.NewsInfo{}, _metadata}, 100 end test "subscribe to get profits", %{pid: pid, main: main} do - assert {:ok, _} = StreamingSocket.subscribe_get_profits(pid) + metadata = %{hash: :rand.uniform(1_000)} + assert {:ok, _} = StreamingSocket.subscribe_get_profits(pid, metadata) buy_args = %{ operation: :buy, @@ -238,7 +248,8 @@ defmodule XtbClient.StreamingSocketTest do {:ok, %{order: order_id}} = open_trade(main, buy_args) - assert_receive {:ok, %Messages.ProfitInfo{}}, @default_wait_time + assert_receive {:ok, %Messages.ProfitInfo{}, received_metadata}, @default_wait_time + assert ^metadata = received_metadata close_args = %{ operation: :buy, @@ -250,7 +261,8 @@ defmodule XtbClient.StreamingSocketTest do {:ok, _} = close_trade(main, order_id, close_args) - assert_receive {:ok, %Messages.ProfitInfo{}}, @default_wait_time + assert_receive {:ok, %Messages.ProfitInfo{}, received_metadata}, @default_wait_time + assert ^metadata = received_metadata end test "unsubscribe from get profits", %{pid: pid, main: main} do @@ -267,7 +279,7 @@ defmodule XtbClient.StreamingSocketTest do {:ok, %{order: order_id}} = open_trade(main, buy_args) - assert_receive {:ok, %Messages.ProfitInfo{}}, @default_wait_time + assert_receive {:ok, %Messages.ProfitInfo{}, _metadata}, @default_wait_time assert {:ok, _} = StreamingSocket.unsubscribe_get_profits(pid) @@ -281,15 +293,17 @@ defmodule XtbClient.StreamingSocketTest do {:ok, _} = close_trade(main, order_id, close_args) - refute_receive {:ok, %Messages.ProfitInfo{}}, 100 + refute_receive {:ok, %Messages.ProfitInfo{}, _metadata}, 100 end test "subscribe to get tick prices", %{pid: pid} do + metadata = %{hash: :rand.uniform(1_000)} args = %{symbol: "LITECOIN"} query = Messages.Quotations.Query.new(args) - assert {:ok, _} = StreamingSocket.subscribe_get_tick_prices(pid, query) + assert {:ok, _} = StreamingSocket.subscribe_get_tick_prices(pid, query, metadata) - assert_receive {:ok, %Messages.TickPrice{}}, @default_wait_time + assert_receive {:ok, %Messages.TickPrice{}, received_metadata}, @default_wait_time + assert ^metadata = received_metadata end test "unsubscribe from get tick prices", %{pid: pid} do @@ -297,18 +311,19 @@ defmodule XtbClient.StreamingSocketTest do query = Messages.Quotations.Query.new(args) assert {:ok, _} = StreamingSocket.subscribe_get_tick_prices(pid, query) - assert_receive {:ok, %Messages.TickPrice{}}, @default_wait_time + assert_receive {:ok, %Messages.TickPrice{}, _metadata}, @default_wait_time # wait for already received messages flush() assert {:ok, _} = StreamingSocket.unsubscribe_get_tick_prices(pid, query) - refute_receive {:ok, %Messages.TickPrice{}}, 100 + refute_receive {:ok, %Messages.TickPrice{}, _metadata}, 100 end test "subscribe to get trades", %{pid: pid, main: main} do - assert {:ok, _} = StreamingSocket.subscribe_get_trades(pid) + metadata = %{hash: :rand.uniform(1_000)} + assert {:ok, _} = StreamingSocket.subscribe_get_trades(pid, metadata) buy_args = %{ operation: :buy, @@ -321,9 +336,11 @@ defmodule XtbClient.StreamingSocketTest do {:ok, %{order: order_id}} = open_trade(main, buy_args) - assert_receive {:ok, %Messages.TradeInfo{operation: :buy}}, + assert_receive {:ok, %Messages.TradeInfo{operation: :buy}, received_metadata}, @default_wait_time + assert ^metadata = received_metadata + close_args = %{ operation: :buy, custom_comment: "Close transaction", @@ -334,7 +351,10 @@ defmodule XtbClient.StreamingSocketTest do {:ok, _} = close_trade(main, order_id, close_args) - assert_receive {:ok, %Messages.TradeInfo{operation: :sell}}, @default_wait_time + assert_receive {:ok, %Messages.TradeInfo{operation: :sell}, received_metadata}, + @default_wait_time + + assert ^metadata = received_metadata end test "unsubscribe from get trades", %{pid: pid, main: main} do @@ -351,7 +371,7 @@ defmodule XtbClient.StreamingSocketTest do {:ok, %{order: order_id}} = open_trade(main, buy_args) - assert_receive {:ok, %Messages.TradeInfo{operation: :buy}}, + assert_receive {:ok, %Messages.TradeInfo{operation: :buy}, _metadata}, @default_wait_time assert {:ok, _} = StreamingSocket.unsubscribe_get_trades(pid) @@ -366,11 +386,12 @@ defmodule XtbClient.StreamingSocketTest do {:ok, _} = close_trade(main, order_id, close_args) - refute_receive {:ok, %Messages.TradeInfo{operation: :sell}}, 100 + refute_receive {:ok, %Messages.TradeInfo{operation: :sell}, _metadata}, 100 end test "subscribe to trade status", %{pid: pid, main: main} do - assert {:ok, _} = StreamingSocket.subscribe_get_trade_status(pid) + metadata = %{hash: :rand.uniform(1_000)} + assert {:ok, _} = StreamingSocket.subscribe_get_trade_status(pid, metadata) buy_args = %{ operation: :buy, @@ -383,7 +404,8 @@ defmodule XtbClient.StreamingSocketTest do {:ok, %{order: order_id}} = open_trade(main, buy_args) - assert_receive {:ok, %Messages.TradeStatus{}}, @default_wait_time + assert_receive {:ok, %Messages.TradeStatus{}, received_metadata}, @default_wait_time + assert ^metadata = received_metadata close_args = %{ operation: :buy, @@ -395,7 +417,8 @@ defmodule XtbClient.StreamingSocketTest do {:ok, _} = close_trade(main, order_id, close_args) - assert_receive {:ok, %Messages.TradeStatus{}}, @default_wait_time + assert_receive {:ok, %Messages.TradeStatus{}, received_metadata}, @default_wait_time + assert ^metadata = received_metadata end test "unsubscribe from trade status", %{pid: pid, main: main} do @@ -412,7 +435,7 @@ defmodule XtbClient.StreamingSocketTest do {:ok, %{order: order_id}} = open_trade(main, buy_args) - assert_receive {:ok, %Messages.TradeStatus{}}, @default_wait_time + assert_receive {:ok, %Messages.TradeStatus{}, _metadata}, @default_wait_time assert {:ok, _} = StreamingSocket.unsubscribe_get_trade_status(pid) @@ -426,7 +449,7 @@ defmodule XtbClient.StreamingSocketTest do {:ok, _} = close_trade(main, order_id, close_args) - refute_receive {:ok, %Messages.TradeStatus{}}, 100 + refute_receive {:ok, %Messages.TradeStatus{}, _metadata}, 100 end end